# Streaming

**Deadline:** ...

In deze oefening wordt een streaming applicatie gemaakt door middel van een python applicatie dat data verstuurd over een socket.
Deze opgave bestaat uit de volgende delen:
* Maak een producer dat data verstuurd
* Maak een consumer die de data uitleest en verwerkt

Indien je data moet/wil bewaren op het hdfs, doe dit dan in de **Oefeningen/Streaming** folder en maak deze folder ook leeg voor je begint.

In [None]:
# aanmaken/leegmaken van de data op het hdfs

## Producer

Download een zelfgekozen boek in txt formaat, bijvoorbeeld aan de hand van [deze link](https://www.gutenberg.org/).
Schrijf nu een python producer applicatie in de code cell hieronder dat lijn per lijn dit boek verstuurd over een socket.
Zorg ervoor dat de lijn als json verstuurd wordt en voeg ook een timestamp toe aan dit json-object.
Zorg ervoor dat er tussen 0.5 seconden en 1.5 seconden (willekeurig en verschild van lijn tot lijn) gewacht wordt tussen het versturen van opeenvolgende lijnen. 
(Bij het testen kan je de code vroegtuidig stoppen door de kernel te onderbreken).
Door de file tag bovenaan wordt deze applicatie weggeschreven naar een python file.
Hierdoor kan de python file ook gestart worden via terminal, zodat je de python kernel in deze notebook niet moet herstarten.

**Let op:** De socket start maar als er ook naar geluisterd wordt. Om de applicatie te testen is er onderaan deze sectie een code-block om een python script te schrijven om naar de socket te luisteren en de producer te testen. Schrijf hierin de code om de ontvangen tekst uit te printen.

In [14]:
%%file producer.py
import socket
import time
import json
from datetime import datetime

def send_book(file_path, port):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
        server_socket.bind(('localhost', port))
        server_socket.listen(1)
        print("Server listening on port", port)
        conn, addr = server_socket.accept()
        print("Connected to client:", addr)
        
        for line in lines:
            data = {
                "timestamp": datetime.now().isoformat(),
                "text": line.strip()
            }  # Create JSON object with timestamp
            json_data = json.dumps(data) + "\n"  # Convert to JSON string
            print("Sending:", json_data.strip())
            conn.sendall(json_data.encode())
            time.sleep(0.5)  # Add a fixed delay between each line

        conn.close()

if __name__ == "__main__":
    file_path = "boek.txt"  # Change this to your book's file path
    port_number = 10000
    send_book(file_path, port_number)


Overwriting producer.py


Schrijf hieronder het nodige commando om de python applicatie te starten in de file kafka_producer.py
Om te testen op runtime error kan je het hier uitvoeren, echter is het beter om dit in een aparte terminal uit te voeren.

In [None]:
# commando om de producer te starten
!python3 producer.py

In onderstaande codecell kan je een python script schrijven om bovenstaande producer te testen

In [3]:
# test python producer
import socket

def receive_book(host, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
        client_socket.connect((host, port))
        print("Connected to server at {}:{}".format(host, port))

        while True:
            data = client_socket.recv(1024).decode()
            if not data:
                break
            print("Received:", data)

server_host = "localhost"  # Change this to your server's host
server_port = 10000
receive_book(server_host, server_port)

Connected to server at localhost:10000
Received: ﻿

Received: 

Received: *******************************************************************

Received: THIS EBOOK WAS ONE OF PROJECT GUTENBERG'S EARLY FILES PRODUCED AT A

Received: TIME WHEN PROOFING METHODS AND TOOLS WERE NOT WELL DEVELOPED. THERE

Received: IS AN IMPROVED EDITION OF THIS TITLE WHICH MAY BE VIEWED AS EBOOK

Received: (#1513) at https://www.gutenberg.org/ebooks/1513

Received: *******************************************************************

Received: 

Received: 

Received: This Etext file is presented by Project Gutenberg, in

Received: cooperation with World Library, Inc., from their Library of the

Received: Future and Shakespeare CDROMS.  Project Gutenberg often releases

Received: Etexts that are NOT placed in the Public Domain!!

Received: 

Received: *This Etext has certain copyright implications you should read!*

Received: 

Received: <<THIS ELECTRONIC VERSION OF THE COMPLETE WORKS OF WILLIAM

Received: SH

KeyboardInterrupt: 

# Consumer

Nadat je erin geslaagd bent om lijnen uit de boek naar een socket te schrijven, kan je verdergaan naar de volgende stap.
Hierbij ga je een pyspark-applicatie schrijven die data binnenkrijgt via de socket en deze verwerkt door middel van het structured streaming concept in pyspark.
Meer informatie over structured streaming kan je [hier](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) vinden.
Deze applicatie voert de volgende stapen uit:
* Print het schema van de dataframes ontvangen met het structured streaming concept
* Zorg ervoor dat dit dataframe na het verwerken van elke batch uitgeprint wordt in een console.
* Zorg ervoor dat je een window gebruikt van 200 seconden dat elke 5 seconden opschuift. (Dit bevat dus een twee-hondertal lijnen van een boek)
* Voeg de volgende kolommen toe aan de dataframes:
  * Aantal karakters in een window
  * Aantal woorden in een window
  * Aantal keer dat elke letter voorkomt (enkel letters) en tel per letter zowel kleine als hoofdletters mee (dit geeft dus 26 kolommen) per window. Tip: bereken eerst welke letters voorkomen in een zin in een rij, zet in een tweede stap deze rij om naar meerdere kolommen.
  * Het meest voorkomende woord dat begint met een hooofdletter.
 

Opmerkingen:
* Maak voor de laatste twee types kolommen die je moet toevoegen gebruik van userdefined functions. Het aantal karakters en aantal woorden in een window bereken je zonder gebruik te maken van userdefined functions
* Vergeet niet de vragen onder de code te beantwoorden!
* Zorg ervoor dat je de volgende kolommen hebt als eindresultaat: window|total_characters|total_words|most_common_uppercase_word_across_window|a  |f  |b  |c  |d  |e  |f  |g  |h  |i  |j  |k  |l  |m  |n  |o  |p  |q  |r  |s  |t  |u  |v  |w  |x  |y  |z

In [1]:
%%file consumer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("JSONSocketStructuredStreaming") \
    .getOrCreate()

# Define schema for JSON data
schema = StructType([
    StructField("text", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

@udf(returnType=MapType(StringType(), IntegerType()))
def count_letters(text):
    letter_counts = {}
    for char in text:
        if char.isalpha():
            if char in letter_counts:
                letter_counts[char] += 1
            else:
                letter_counts[char] = 1
    return letter_counts

@udf(returnType=StringType())
def most_common_uppercase_word_across_window(words_list):
    words = [word for sublist in words_list if sublist for word in sublist]
    uppercase_words = {}
    for w in words:
        if len(w) > 0 and w[0].isupper():
            if w in uppercase_words:
                uppercase_words[w.lower()] += 1
            else:
                uppercase_words[w.lower()] = 1
                
    max_value = 0
    max_word = ""

    for w, v in uppercase_words.items():
        if v > max_value:
            max_value = v
            max_word = w
    return max_word

# Read streaming data from the socket as JSON objects
lines_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 10000) \
    .load()

# Parse JSON objects
parsed_df = lines_df.select(from_json(lines_df.value, schema).alias("data")) \
                    .select("data.*")

# Perform further processing as needed
parsed_df.printSchema()

# Add columns
processed_df = parsed_df \
    .withColumn("num_characters", length(col("text"))) \
    .withColumn("num_words", array_size(split(col("text"), " "))) \
    .withColumn("letter_counts", count_letters(col("text"))) \
    .withColumn("all_words", split(col("text"), " "))

aggregations = []
for letter in "abcdefghijklmnopqrstuvwxyz":
    processed_df = processed_df.withColumn(letter, processed_df["letter_counts"].getItem(letter))
    aggregations.append(
         sum(letter).alias(letter)
    )

windowed_df = processed_df \
    .groupBy(window("timestamp", "200 seconds", "5 seconds")) \
    .agg(sum("num_characters").alias("total_characters"),
         sum("num_words").alias("total_words"),
         collect_list("all_words").alias("all_words"),
        *aggregations)

# Apply UDF to find the most common uppercase word across the window
windowed_df = windowed_df.withColumn("most_common_uppercase_word_across_window", most_common_uppercase_word_across_window(col("all_words")))
windowed_df = windowed_df.select("window", "total_characters", "total_words",  "most_common_uppercase_word_across_window", *"afbcdefghijklmnopqrstuvwxyz")

# Start the streaming query
query = windowed_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()


Overwriting consumer.py


Schrijf hieronder het nodige commando om de consumer te starten. 
Let op dat je eerst de producer start in een terminal voor je dit commando uitvoert.

Maak een screenshot van een deel van de output (na initialisatie) en sla het op onder de naam **Streaming_output_1.png**.

In [None]:
# start consumer (start eerst de producer)
!python3 consumer.py

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/07 16:48:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/07 16:48:45 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
root
 |-- text: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

24/03/07 16:48:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-12a73966-c634-4fe9-acb1-b483f8e17a36. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/07 16:48:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrame

**Vragen:**
* Welke uitvoer modes zijn er allemaal bij Spark Streaming. Wat is het effect van deze outputmodes? Kan je in de producer hierboven ook de append output mode gebruiken? Waarom (niet)?
* Kan de uitvoer van dit resultaat gebruikt worden voor een pipeline met een estimator?

**Antwoorden:**
* Vraag 1:
* Vraag 2: