# Introduzione a Spark Streaming
Spark Streaming è un'estensione delle API di Spark che consente l'elaborazione di dati in streaming scalabile, ad alta velocità e resistente agli errori. Spark Streaming è in grado di prendere i dati da numerose fonti differenti : Kafka, Flume, Kinesis, S3, Socket TPC etc. I dati vengono elaborati in batch e inseriti all'interno di uno oggetto chiamato DStream.
<br><br>
In questo notebook introdurremo Spark Streaming in maniera pratica con alcuni esempi di base.

## Inizializziamo Spark
Inizializziamo due contesti differenti, il contesto base di Spark (SparkContext) e il contesto per lo Streaming (StreamingContext). Per inizializzare lo StreamingContext dobbiamo usare due parametri, il primo è l'instanza della classe SparkContext ed il secondo è il tempo di campionamento di ogni batch.

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, 5)

Fatto questo creiamo un DStream che conterrà i dati in streaming da una connessione TCP, specificando l'hostname e la porta.
**NOTA BENE** Dato che lavoriamo in locale non dobbiamo proccuparci di un eventuale firewall, basta scegliere una porta che sia libera, solitamente le porte con un numero alto sono sempre libere.

In [3]:
lines = ssc.socketTextStream("localhost", 9999)
type(lines)

pyspark.streaming.dstream.DStream

## Echo Streaming
Come primo esempio non faremo altro che prendere il messaggio che viene trasmesso al DStream, dividerlo per parole e stamparlo su schermo. 

In [4]:
words = lines.flatMap(lambda text: text.split())
words.pprint()

Utilizziamo il programma nc da terminale per avviare un webserver e inviare dei messaggi via socket, è sufficente digitare il seguente comando da terminale:
<br><br>
**nc -lk 9999**
<br><br>
Ora avviamo lo StreamingContext usando il metodo *.start()* e lasciamolo in ascolto usando il metodo *.awaitTermination()*.

In [5]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2019-07-11 10:54:20
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:54:25
-------------------------------------------
ciao

-------------------------------------------
Time: 2019-07-11 10:54:30
-------------------------------------------
come
stai
?

-------------------------------------------
Time: 2019-07-11 10:54:35
-------------------------------------------
molto
bene
grazie



KeyboardInterrupt: 

posizionati sul terminale dove c'è nc in esecuzione ed invia dei messaggi, se hai fatto tutto correttamente le parole del tuo messaggio verranno stampate qui sopra. Appena abbiamo finito di giocare stoppiamo il contesto, altrimenti non potremmo proseguire con gli altri esempi, per sicurezza killiamo anche un'eventuale applicazione in ascolto sulla porta 9999.



In [6]:
ssc.stop()
!sudo kill $(sudo lsof -t -i:9999)

## Keywords monitoring
Ora facciamo un'esempio lievemente più complesso, monitoriamo due keyword all'interno dei messaggi, verificando se sono presenti ed eventualmente contandone le occorrenze. Per poter proseguire dobbiamo reinizializzare il contesto e ricreare il DStream

In [7]:
sc = SparkContext()
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)

Ora, sul nostro DStream, dividiamo i messaggi in base agli spazi, in modo da ottenere le parole, esattamente come fatto in precedenza, poi filtriamo solo i messaggi che contengono le keyword che stiamo monitorando, infine eseguiamo un map e reduceByKey per sommare il numero totale di volte che tale keyword è presente (se è presente).

In [8]:
keywords = ["professionai","fastai"]

lines.flatMap(lambda text: text.split()) \
.filter(lambda word: (word.lower() in keywords)) \
.map(lambda word: (word.lower(), 1)) \
.reduceByKey(lambda a,b: a+b) \
.pprint()

Di nuovo da terminale avviamo nc (**nc -lk 9999**) poi avviamo lo StreamingContext e mettiamoci in ascolto

In [9]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2019-07-11 10:55:20
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:55:25
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:55:30
-------------------------------------------
('professionai', 1)

-------------------------------------------
Time: 2019-07-11 10:55:35
-------------------------------------------
('professionai', 1)
('fastai', 1)

-------------------------------------------
Time: 2019-07-11 10:55:40
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:55:45
-------------------------------------------
('fastai', 1)

-------------------------------------------
Time: 2019-07-11 10:55:50
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:55:55
-------------------------------------------

------------

KeyboardInterrupt: 

inviamo dei messaggi da nc, inserendo alcune volte le nostre keywords. Se hai fatto tutto correttamente, ogni volta che invii un messaggio contenente la keyword dovresti ottenere in output una tupla con la keyword e il numero di occorrenze. Fai attenzione che il numero di occorrenze è riferito soltanto al batch corrente, quindi non abbiamo una somma totale, ma soltato il numero di volte che le keyword sono presenti nell'ultimo batch di messaggi. 

In [10]:
ssc.stop()
!sudo kill $(sudo lsof -t -i:9999)

## Keyword counter
Vediamo come possiamo bypassare il limite presente e contare il numero totale di volte che le keyword appaiono in tutti i messaggi che inviamo, in tempo reale. Reinizializziamo i contesti e in DStream.

In [11]:
sc = SparkContext()
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)

Definiamo una semplice funzione per aggregare i countatori

In [12]:
def aggregate_count(new_values, total_sum):
    add_1 = sum(new_values)
    add_2 = (total_sum or 0) # se ci troiamo al primo batch total_sum sarà null, quindi torniamo 0
    return add_1+add_2

Sostituiamo il metodo *.updateByKey(func)* con *.updateStateByKey(func)*, che ci permette di creare uno stato permanente tra i diversi batch, passiamo a questo metodo la funzione che abbiamo appena definito per eseguire la somma dei contatori.

In [13]:
lines.flatMap(lambda text: text.split()) \
    .filter(lambda word: (word.lower() in keywords)) \
    .map(lambda word: (word.lower(), 1)) \
    .updateStateByKey(aggregate_count) \
    .pprint()

Per poter utilizzare uno stato dobbiamo creare una directory di checkpoint su disco, che permetterà a Spark di recuperare i dati in caso di errori, per farlo ci basta utilizzare il metodo *.checkpoint(path)* della classe StreamingContext, passando come parametro il path alla directory dove vogliamo creare il checkpoint.

In [14]:
ssc.checkpoint("/home/ubuntu")

Avviamo nc, poi avviamo lo StreamingContext e mettiamoci in ascolto

In [15]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2019-07-11 10:56:50
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:56:55
-------------------------------------------

-------------------------------------------
Time: 2019-07-11 10:57:00
-------------------------------------------
('professionai', 1)

-------------------------------------------
Time: 2019-07-11 10:57:05
-------------------------------------------
('professionai', 2)
('fastai', 1)

-------------------------------------------
Time: 2019-07-11 10:57:10
-------------------------------------------
('professionai', 2)
('fastai', 1)

-------------------------------------------
Time: 2019-07-11 10:57:15
-------------------------------------------
('professionai', 4)
('fastai', 1)



KeyboardInterrupt: 

-------------------------------------------
Time: 2019-07-11 10:57:20
-------------------------------------------
('professionai', 4)
('fastai', 1)



Fai qualche esperimento sempre usando nc, vedrai che ora il contatore è diventato permanente !

In [31]:
ssc.stop()
!sudo kill $(sudo lsof -t -i:9999)

## Registrare su una tabella SQL temporanea
Certe volte può essere utili salvare i batch del DStream in una tabella SQL temporanea (cioè una view) per poi poter eseguire le nostre analisi. Vediamo come farlo. Creiamo ancora una volta i contesti e il DStream.

In [32]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext()
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)

Definiamo una funzione che ci permetterà di accedere globalmente al contesto SQL, questo è importante per poter inserire i dati di ogni batch dentro alla view.

In [33]:
from pyspark.sql import SQLContext

def get_sql_context(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

Creiamo una funzione che prende in input un timestamp ed una RDD, l'RDD conterrà una parte del nostro DStream. Utilizziamo l'RDD per creare un Dataframe ed il Dataframe per creare una view.

In [35]:
from pyspark.sql import Row

def process_rdd(time, rdd):
    if(rdd.count()>0):
        sql_context = get_sql_context(rdd.context)
        row_rdd = rdd.map(lambda w: Row(name=w[0], count=w[1])) # usiamo l'RDD per creare una lista di righe
        df = sql_context.createDataFrame(row_rdd) # Creiamo il Dataframe
        df = df.createOrReplaceTempView("Popularity") # Creiamo o aggiorniamo la View

Ora piuttosto che stampare il DStream, utilizziamo il metodo *.foreachRDD(func)* per dividere il DStream in diversi RDD e passarli alla funzione che abbiamo definito sopra.

In [36]:
keywords = ["professionai","fastai","coursera","edx","udacity"]

lines.flatMap(lambda text: text.split()) \
  .filter(lambda word: (word.lower() in keywords)) \
  .map(lambda word: (word.lower(), 1)) \
  .updateStateByKey(aggregate_count) \
  .foreachRDD(process_rdd)

Avviamo di nuovo nc, fatto questo, impostiamo il checkpoint e avviamo lo Streaming in maniera non bloccante, cioè senza chiamare il metodo *.awaitTermination()*

In [37]:
ssc.checkpoint("/home/ubuntu")
ssc.start()

Giochiamo un po' con nc da terminale per popolare la nostra tabella, quando siamo soddisfatti possiamo vederne il contenuto lanciando una query SQL.

In [41]:
get_sql_context(sc).sql("SELECT * FROM Popularity ORDER BY count DESC").show()

+-----+------------+
|count|        name|
+-----+------------+
|    5|professionai|
|    3|    coursera|
|    2|      fastai|
|    2|     udacity|
+-----+------------+



Non dimenticarti che lo streaming sta continunado a girare in background, quindi quando hai finito terminalo.

In [42]:
ssc.stop()
!sudo kill $(sudo lsof -t -i:9999)