## PySpark Streaming

Voorheen hebben we gemerkt met een bestaande, volledige dataset die reeds aanwezig was in de cluster.
Echter is er vaak een nood aan data binnen te halen van verscheidene databronnen, deze om te zetten naar een bruikbaar formaat en te bewaren in een datawarehouse.
Dit is exact wat er nodig is voor het ETL principe of Extract-Transform-Load.
Een belangrijk onderdeel hiervan is de Pyspark Streaming module.
De documentatie van deze module kan [hier](https://spark.apache.org/docs/latest/streaming-programming-guide.html) gevonden worden.
Het wordcount example kan ook hierin geschreven worden, namelijk:

In [1]:
%%file networkwordcount.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# streaming context aanmaken (gelijkaardig aan de sparksession)
sc = SparkContext("local[2]", "network_word_count")
sc.setLogLevel("ERROR") # only show errors (anders te veel overhead bij elke iteratie warnings)
ssc = StreamingContext(sc, 5) # elke 5 seconden

# data inlezen van poort 9999
lines = ssc.socketTextStream("localhost", 9999)
# lines gaat alle lijnen data bevatten die toekomen binnen 5 seconden intervallen
# flatmap -> alle lijnen omzetten naar 1 lijn
words = lines.flatMap(lambda line: line.split(" "))
# emit tuples (word, 1)
pairs = words.map(lambda word: (word, 1))
# som alle 1-tjes van elk woord/key
wordCount = pairs.reduceByKey(lambda x, y: x + y)

# tel hoeveel keer een woord met een bepaalde lengte toekomt
# input: hello world test
# 5 -> 2, 4 -> 1

# genereer output
wordCount.pprint()

# start applicatie
ssc.start()
ssc.awaitTermination() # zorg dat het blijft draaien


Writing networkwordcount.py


Om nu de werking te starten, open twee nieuwe terminals en voer in 1 ervan het volgende commando uit.
```console
    nc -lk 9999
```

Het wordcount programma kan nu gestart worden door het volgende commando uit te voeren in de tweede terminal (in de correcte directory):
```python
    python networkwordcount.py
```

## StreamingContext in plaats van SparkContext

Net zoals de context moet er een streaming context aangemaakt worden voor je de streaming api kan gebruiken.
Een aantal belangrijke punten om te onthouden zijn:
* Eens de context gestart is kan er geen nieuwe code toegevoegd worden
* Eens de context gestopt is kan de context niet opnieuw gestart worden. 
* Er kan maar 1 context tegelijkertijd actief zijn
* De spark context kan hergebruikt worden zolang de streaming context gestopt is voor er een nieuwe streamingcontext aangemaakt wordt.

## DStreams of Discretized Streams

De basis abstractielaag gebruikt en voorzien door Spark Streaming.
Dit stelt een continue datastroom voor die afkomstig is van de inputbron of de verwerkte datastroom van de transform stap.
Deze stream stelt een continue tijdreeks voor van RDD's.
Elk van deze RDD's stelt de ontvangen data tijdens een interval voor.

Deze steams kunnen van verscheidene bronnen komen. 
Indien je een niet-standaard geincludeerde bron wil gebruiken moet je een eigen **receiver** schrijven om de data van de bron op te halen.
Meer informatie over deze procedure vind je [hier](https://spark.apache.org/docs/latest/streaming-custom-receivers.html)

Belangrijk om te onthouden dat het aantal beschikbare cores groter moet zijn dan het aantal receivers/gebruikte databronnen.
Anders beschikt spark/de cluster niet over voldoende rekencapaciteiten om alles parallel uit te voeren.

**Beschikbare transformaties**

De meeste zaken die op een DataFrame/RDD uitgevoerd kunnen worden, kunnen ook op DStreams uitgevoerd worden. 
Een aantal operaties die extra aandacht vereisen zijn
* updateStateByKey()
* transform()
* Window operations
* Join operations

**UpdateStateByKey()**

Deze functie maakt het mogelijk om een algemene state bij te houden en up te daten bij het ontvangen van nieuwe informatie.
Voor het bovenstaande wordcount example kan dit bijvoorbeeld de wordcount van de volledige stream zijn ipv per lijn.
Pas nu het wordcount-example aan door deze twee zaken bij te houden.
Meer informatie kan je [hier](https://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation) vinden.

**Tip:** Het is nodig om checkpointing te configureren voor deze functie kan gebruikt worden (zodat het ergens kan bijgehouden worden). Dit gebeurt door de volgende lijn na het aanmaken van de streaming context te plaatsen:
    
    ssc.checkpoint("checkpoint")

In [8]:
%%file networkwordcount.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from datetime import datetime

# streaming context aanmaken (gelijkaardig aan de sparksession)
sc = SparkContext("local[2]", "network_word_count")
sc.setLogLevel("ERROR") # only show errors (anders te veel overhead bij elke iteratie warnings)
ssc = StreamingContext(sc, 5) # elke 5 seconden
ssc.checkpoint("checkpoint")

# data inlezen van poort 9999
lines = ssc.socketTextStream("localhost", 9999)
# lines gaat alle lijnen data bevatten die toekomen binnen 5 seconden intervallen
# flatmap -> alle lijnen omzetten naar 1 lijn
words = lines.flatMap(lambda line: line.split(" "))
# emit tuples (word, 1)
pairs = words.map(lambda word: (word, 1))

##### DIT IS VERANDERD
# hou een globale state bij
#def updateFunctie(newValues, runningCount):
#    if runningCount is None:
#        runningCount = 0
#    return sum(newValues, runningCount)

# een andere manier is
def updateFunctie(newValues, runningCount):
    # (runningCount or 0) doet hetzelfde als die if hierboven
    return sum(numValues) + (runningCount or 0)

def updateFunctie(newValues, runningState):
    return datetime.now()

# een geschiedenis bijhouden kan ook
def updateFunctie(newValues, timestamps):
    if timestamps is None:
        timestamps = []
    timestamps.append(datetime.now())
    return timestamps

state = pairs.updateStateByKey(updateFunctie)

state.pprint()

# start applicatie
ssc.start()
ssc.awaitTermination() # zorg dat het blijft draaien


Overwriting networkwordcount.py


**Transform operation**

De transform operations laat je toe om een RDD-to-RDD functie toe te passenop een DStream.
Dit laat je toe om alle RDD operaties toe te passen die niet zouden aangeboden worden door de Stream API
Het is hierbij belangrijk om op te merken dat deze functie elke batch opgeroepen wordt en dus dat het mogelijk is om parameters te wijzigen tussen de verschillende batches (aantal partities, broadcasted variabelen, ...)

    cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

**Window operations**

Een belangrijke eigenschap van streams is ook dat alle informatie binnen een bepaald tijdsvenster belangrijk kan zijn.
Binnen spark streams zijn er een verscheidene WindowOperations om data binnen een bepaald window te aggregeren en te verwerken.
Hieronder staat een voorbeeld om een reduce toe te passen om de 10 seconden op data dat in de laatste 30 seconden is binnengekomen.

    windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

**Join operations**

Twee streams kunnen gecombineerd worden door middel van de .join() functie.
Dit doet standaard een inner join maar andere mogelijkheden kunnen ook gekozen worden.

## Sending data to external systems

De foreachRDD functie is een krachtige functie dat toegepast wordt op elke RDD dat aangemaakt wordt door een DStream. 
Dit maakt het mogelijk om de data uit te sturen naar externe systemen en zorgt dus voor de Load-stap binnen ETL.
Een simplistische oplossing is als volgt:

    def sendRecord(rdd):
        connection = createNewConnection()  # executed at the driver
        rdd.foreach(lambda record: connection.send(record))
        connection.close()

    dstream.foreachRDD(sendRecord)
    
Bovenstaande gaat niet werken omdat de connectie door de driver aangemaakt wordt.
Deze connectie gaat geserializeerd worden en doorgestuurd naar de nodes maar dit gaat zelden correct lukken.
Connectieproblemen bij het verzenden van data komen bijna steeds voort uit het correct aanmaken op de juiste nodes van de connectie.
Een oplossing hiervoor is het volgende:

    def sendRecord(record):
        connection = createNewConnection()
        connection.send(record)
        connection.close()

    dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
    
Dit gaat correct werken maar is echter suboptimaal omdat in deze code, een nieuwe connectie aangemaakt wordt voor elke rij in de stream wat voor heel veel overhead zorgt.
Een andere mogelijkheid is als volgt

    def sendPartition(iter):
        connection = createNewConnection()
        for record in iter:
            connection.send(record)
        connection.close()

    dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
    
Dit is reeds beter omdat de connectie reeds gedeeltelijk hergebruikt wordt maar wordt nog steeds herhaadelijk geopend en gesloten.
Dit zorgt nog steeds voor onnodige overhead.
De beste oplossing is door gebruik te maken van een connectionPool() waaruit connectie kunnen hergebruikt worden.
Deze pool maakt automatisch connecties uit en sluit de bestaande connecties enkel indien ze voldoende lang ongebruikt worden.

    def sendPartition(iter):
        # ConnectionPool is a static, lazily initialized pool of connections
        connection = ConnectionPool.getConnection()
        for record in iter:
            connection.send(record)
        # return to the pool for future reuse
        ConnectionPool.returnConnection(connection)

    dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

## Checkpoints

Checkpoints is een manier om informatie op te slaan op de cluster om spark fout-tolerant te worden voor crashes van zowel de driver als individuele node.
Dit gebeurt door de nodige informatie op te slaan in een directory.

Checkpointing moet ge-enabled worden wanneer je
* een state wilt bijhouden
* een fout-tolerante applicatie wil

Let wel op dat het geen garantie is dat alle data behouden blijft maar het merendeel zou correct moeten opgevangen worden.

Checkpointing toevoegen aan je applicatie gebeurt door een directory mee te gevan aan de sparkContext waar de checkpoints in een fout-tolerant gedistribueerd opslagsysteem kunnen bijgehouden worden.
Dit gebeurd als volgt

    ssc.checkpoint("checkpoints") 

**Shared variables with checkpoints**

Accumulators en broadcasted variabelen worden niet opgeslagen door het checkpointing systeem in Spark. 
Dit kan opgelost worden door singleton instances te maken zodat ze kunnen geherinstantieerd worden nadat de driver restart na een failure.
Onderstaande code is een voorbeeld van hoe dit uit te voeren in een wordcount example.
Dit voorbeeld maakt gebruik van globals() wat de globale variabelen bijhoudt.
In dit voorbeeld wordt er gebruik gemaakt van een broadcasted array om een lijst mee te geven met beginletters van woorden die genegeerd worden.
Daarnaast wordt een accumulator gebruikt om het totaal aantal genegeerde woorden te tellen.

In [11]:
%%file networkwordcount.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from datetime import datetime

###### maak shared variabelen aan
def getWordExcludeList(sparkContext):
    if "worldExcludeList" not in globals():
        globals()["worldExcludeList"] = sparkContext.broadcast(["Hello"])
    return globals()["worldExcludeList"]

def getDroppedWordCounters(sparkContext):
    if "droppedWordCounters" not in globals():
        globals()["droppedWordCounters"] = sparkContext.accumulator(0)
    return globals()["droppedWordCounters"]

# streaming context aanmaken (gelijkaardig aan de sparksession)
sc = SparkContext("local[2]", "network_word_count")
sc.setLogLevel("ERROR") # only show errors (anders te veel overhead bij elke iteratie warnings)
ssc = StreamingContext(sc, 5) # elke 5 seconden
ssc.checkpoint("checkpoint")

# data inlezen van poort 9999
lines = ssc.socketTextStream("localhost", 9999)
# lines gaat alle lijnen data bevatten die toekomen binnen 5 seconden intervallen
# flatmap -> alle lijnen omzetten naar 1 lijn
words = lines.flatMap(lambda line: line.split(" "))
# emit tuples (word, 1)
pairs = words.map(lambda word: (word, 1))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x+y)

def skipWords(time, rdd):
    # haal de shared variabelen op
    excludeList = getWordExcludeList(rdd.context)
    droppedWordsCounter = getDroppedWordCounters(rdd.context)
    
    def func(row):
        word = row[0]
        if word in excludeList.value:
            droppedWordsCounter.add(row[1])
            return False
        else:
            return True
    
    f = rdd.filter(func)
    
    print("# Genegeerde woorden:", droppedWordsCounter.value)
    print("Gefilterede rdd:", f.collect())
    
    pass

wordCounts.foreachRDD(skipWords)

# start applicatie
ssc.start()
ssc.awaitTermination() # zorg dat het blijft draaien

Overwriting networkwordcount.py


### Oefening

Schrijf een streaming applicatie dat de volgende kenmerken heeft
* Bereken het aantal woorden dat toekomt met een bepaalde lengte: bvb: "Hello world over there" => 1 woorden met 4 karakters, 3 woorden met 5 karakters
* Zorg voor checkpoints in een temp_state directory
* Zorg ervoor dat dit berekend worden in sliding windows van 5 seconden dat elke 2 seconden opschuift
* Hou een algemene state bij met het totaal aantal gelezen woorden, gebruik je hiervoor een accumulator of een update state by key?

### Oplossing

In [12]:
%%file oefening.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# streaming context aanmaken (gelijkaardig aan de sparksession)
sc = SparkContext("local[2]", "network_word_count")
sc.setLogLevel("ERROR") # only show errors (anders te veel overhead bij elke iteratie warnings)
ssc = StreamingContext(sc, 1) # elke 1 second (aangezien het om de twee seconden moet opschijven en je window 5 seconden is)
# als het meer dan 1 second is moet je een batch in twee knippen
ssc.checkpoint("temp_state")

# data inlezen van poort 9999
lines = ssc.socketTextStream("localhost", 9999)
windowLines = lines.window(5, 2)
# lines gaat alle lijnen data bevatten die toekomen binnen 5 seconden intervallen
# flatmap -> alle lijnen omzetten naar 1 lijn
words = windowLines.flatMap(lambda line: line.split(" "))
# emit tuples (word, 1)
pairs = words.map(lambda word: (len(word), 1))
# som alle 1-tjes van elk woord/key
wordCount = pairs.reduceByKey(lambda x, y: x + y)
# genereer output
wordCount.pprint()

# totaal aantal woorden
def updateFunctie(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

total_WC = pairs.updateStateByKey(updateFunctie)
# total_WC -> (word, total_count)
# total_counts optellen
# eerst eruit halen met de map
# reduce om ze op te tellen
total_WC.map(lambda kv1: kv1[1]).reduce(lambda x,y: x+y).pprint()

# start applicatie
ssc.start()
ssc.awaitTermination() # zorg dat het blijft draaien


## we geven input op tijdstip 0
# tijdstip 0 wordt in gelezen 0
# op tijdstip 1 wordt er niets gedaan (sliding interval 2)
# tijdstip 2 lees opnieuw de data in (vanaf tijdstip -3 want window heeft lengte 5)
# op tijdstip 3 wordt er niets gedaan (sliding interval 2)
# tijdstip 4 lees opnieuw de data in (vanaf tijdstip -1 want window heeft lengte 5)
# op tijdstip 5 wordt er niets gedaan (sliding interval 2)
# tijdstip 6 lees je geen data in (vanaf tijdstip 1 want window heeft lengte 5)
# op tijdstip 7 wordt er niets gedaan (sliding interval 2)


Writing oefening.py


Zijn de resultaten wat je verwacht had?
Indien niet, hou zou je het kunnen oplossen?

In [None]:
# antwoord

## Structured streaming

Naast het originele streaming systeem van spark gebruikmakende van DStreams, is er ook een [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) variant.
Het grootste verschil is dat terwijl DStreams gebaseerd zijn op RDD's objecten maakt structured streaming gebruik van DataFrames.
Het networkcount dat we eerst aangehaald hadden hierboven ziet er als volgt uit met structured streaming:

In [13]:
%%file structuredNetworkCount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession.builder.master("local").appName("Structured Streaming les").getOrCreate()

# extract
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# transform
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()

# load
app = wordCounts.writeStream.outputMode("complete").format("console").start()
app.awaitTermination()

Writing structuredNetworkCount.py


Merk op dat we hier de outputMode complete hebben gebruikt.
Het resulterende gedrag is gelijkaardig aan de state van bij DStreams.
Als we echter ook de wordcount per batch willen weten kunnen we gebruik maken van de andere modes.
Onderstaande voorbeeld toont hoe het networkCount voorbeeld na te bootsen (namelijk via de update mode).

In [14]:
%%file structuredNetworkCount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession.builder.master("local").appName("Structured Streaming les").getOrCreate()

# extract
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

# transform
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()

# load #### dit is aangepast
app = wordCounts.writeStream.outputMode("update").format("console").start()
app.awaitTermination()

Overwriting structuredNetworkCount.py


### Oefening

Gebruik de informatie uit [deze link](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) en hermaak de vorige oefening van DStreams.
Aangezien structured streaming een globaal dataframe bijhoudt moet er een timestamp toegevoegd worden om het te kunnen verdelen in windows.
Dit moet gedaan worden in de source dus kunnen we hier met deze data geen windows bestuderen.

In [None]:
%%file oefeningStructured.py