# Introduzione a Spark.
In questo notebook verranno illustrate la basi di Spark tramite semplici esempi. Per la documentazione fare riferimento a [questa pagina](https://spark.apache.org/docs/latest/api/python/user_guide/index.html).


Spark può essere utilizzato in Colab installando, oltre pyspark, findspark una libreria che consente di importare pyspark come una normale libreria.

In [1]:
# Installa le due librerie
!pip install pyspark
!pip install findspark



## Configurazione

Vediamo le caratteristiche che ci mette a disposizione Colab

In [14]:
import multiprocessing
import psutil


cores = multiprocessing.cpu_count()

print(f"Memoria disponibile (dovrebbero essere 12GB): {(psutil.virtual_memory().total/2**30):.2f}")
print(f"Core disponibili (dovrebbero essere 2): {cores}")

Memoria disponibile (dovrebbero essere 12GB): 12.67
Core disponibili (dovrebbero essere 2): 2


Sulla base di questo definiamo la configurazione di Spark.

Impostiamo due executor, diamo 2 GB di memoria al driver e 5 GB di memoria ad ogni executor.

Di default Spark imposta 1 GB di memoria per ogni executor.

Impostiamo il parallelismo a 6.

In [16]:
import pyspark

conf = pyspark.SparkConf()\
    .setAppName("clusterSpark")\
    .set("spark.executor.memory", "5g")\
    .set("spark.driver.memory", "2g")\
    .set("spark.executor.instances", "2")\
    .set("spark.default.parallelism", "6")

Dopodiché, si può inizializzare findspark e avviare Spark creando lo SparkContext.

Visualizzando sc si dovrebbe avere accesso ad alcune informazioni, come la versione di Spark

In [17]:
import findspark
findspark.init()

sc = pyspark.SparkContext(conf=conf)

In [18]:
sc

Possiamo vedere anche quale sia il parallelismo impostato di default

In [19]:
sc.defaultParallelism

6

Creiamo ora un array di dati random

In [20]:
import random
import time
random.seed(1234)

data = [random.randrange(1, 30, 1) for _ in range(1000)] #Crea un array con elementi random

<b>data</b> è un array che contiene valori interi. Si può usare il metodo <i>parallelize</i> dello SparkContext per distribuire i dati agli executor

In [21]:
dataRDD = sc.parallelize(data)

Possiamo usare il metodo <i>cache</i> per rendere persistente l'RDD.
Dopodiché, possiamo contare il numero di elementi contenuti all'interno dell'RDD con il metodo <i>count</i>.
Questa è un'azione che forzerà la creazione dell'RDD che verrà quindi poi salvato nella cache.

In [22]:
dataRDD.persist()
start = time.time()
dataRDD.count()
end = time.time()
print("Exec time "+str((end-start)))

Exec time 5.546101093292236


Si può vedere come la seconda volta che si esegue l'operazione, questa sarà molto più veloce, poiché l'RDD è stato salvato nella cache.

In [23]:
start = time.time()
dataRDD.count()
end = time.time()
print("Exec time "+str((end-start)))

Exec time 1.3816101551055908


Possiamo visualizzare alcuni elementi dell'RDD con il metodo *take*




In [24]:
dataRDD.take(10)

[25, 15, 4, 1, 3, 26, 19, 2, 22, 23]

Possiamo prendere i 10 elementi più grandi con <i>takeOrdered</i>. Di default takeOrdered prende gli elementi in ordine crescente, bisogna specificare manualmente l'ordine, in questo caso negando l'elemento su cui verrà poi applicato l'ordine crescente.

In [25]:
dataRDD.takeOrdered(10, lambda x: -x)

[29, 29, 29, 29, 29, 29, 29, 29, 29, 29]

Possiamo usare *sum* per ottenere la somma dei valori. Anche questa è un'azione.

In [26]:
dataRDD.sum()

15154

La stessa operazione può essere fatta con *reduce*



In [27]:
dataRDD.reduce(lambda x, y: x+y)

15154

Possiamo usare *distinct()* in combinazione con *count()* per contare il numero di elementi distinti.

In [28]:
dataRDD.distinct().count()

29

<h1>Word Count</h1><br />
Vediamo ora un classico esempio di utilizzo di Spark e del calcolo distribuito: contare la frequenza dei termini in un documento.
Innanzi tutto carichiamo alcuni dati, in particolare il documento contiene i Sonetti di Shakespeare

In [29]:
import urllib.request as urllib2

lines = []
stream = urllib2.urlopen('https://dbgroup.ing.unimore.it/spark/shakespeare.txt')
for line in stream:
    lines.append(line.decode('utf-8'))

Nuovamente, possiamo parallelizzare *lines* con parallelize.



In [30]:
linesRDD = sc.parallelize(lines)
linesRDD.cache()
linesRDD.take(10)

['1609\n',
 '\n',
 'THE SONNETS\n',
 '\n',
 'by William Shakespeare\n',
 '\n',
 '\n',
 '\n',
 '                     1\n',
 '  From fairest creatures we desire increase,\n']

Come è possibile notare molte righe contengono caratteri speciali ('\n') e spazi bianchi, devono essere rimossi.
Possiamo scrivere una funzione di Python e applicarla agli elementi dell'RDD.

In [31]:
"""
  Data una stringa rimuove il carattere di fine linea
  e gli spazi bianchi a inizio/fine.
"""
def clean(line):
  return line.replace("\n", "").strip()

In [32]:
linesClean = linesRDD.map(clean)
linesClean.take(10)

['1609',
 '',
 'THE SONNETS',
 '',
 'by William Shakespeare',
 '',
 '',
 '',
 '1',
 'From fairest creatures we desire increase,']

Come si può notare vi sono linee vuote. Le possiamo rimuovere utilizzando *filter* e la funzione *len* di Python

In [33]:
linesNoBlanks = linesClean.filter(lambda x: len(x) > 0)

Adesso che i dati sono pronti, sarà necessario tokenizzare le linee di testo.

Possiamo usare la funzione <b>tokenize</b> per ottenere i token. <br />
<b>Attenzione:</b> splitFunction ritorna un array di token, quindi se si vuole ottenere un RDD di token sarà necessario utilizzare il metodo <b>flatMap</b>.

In [34]:
import re
"""
Data una stringa ritorna i token che la compongono, suddividendola per punteggiatura o spazi bianchi
"""
def tokenize(str):
  return filter(lambda token: len(token) > 0, map(lambda token: token.lower(), re.split('\W+', str)))

Possiamo vedere che se usiamo *map* non otterremo un RDD di token, ma un RDD di array di token

In [35]:
linesNoBlanks.map(tokenize).map(list).take(5)

[['1609'],
 ['the', 'sonnets'],
 ['by', 'william', 'shakespeare'],
 ['1'],
 ['from', 'fairest', 'creatures', 'we', 'desire', 'increase']]

Dobbiamo usare *flatMap*

In [36]:
tokens = linesNoBlanks.flatMap(tokenize)
tokens.cache()
tokens.count()
tokens.take(5)

['1609', 'the', 'sonnets', 'by', 'william']

Se ora vogliamo contare i token dobbiamo creare delle coppie (token, conteggio).

Inizialmente, ogni token ha valore 1, quindi bisogna creare delle coppie (token, 1)

In [37]:
tokens1 = tokens.map(lambda t: (t, 1))
tokens1.take(5)

[('1609', 1), ('the', 1), ('sonnets', 1), ('by', 1), ('william', 1)]

Ora possiamo raggruppare i valori e sommare gli 1.
Si potrebbe fare con *groupByKey()* seguito da un *map*.
Questo però **non è efficiente**, perché esegue lo shuffle di tutti gli 1, non eseguendo alcuna aggregazione locale.

In [38]:
gruppi = tokens1.groupByKey()
v = gruppi.take(1)[0]
print(v[0], ",", list(v[1]))

print(v[0], ",", sum(list(v[1])))

by , [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1

Risulta più efficace eseguire un *reduceByKey* implementando la somma. In questo modo se nella stessa partizione ci sono coppie con la stessa chiave, vengono aggregate localmente prima di fare lo shuffle.

Ad esempio, se abbiamo una partizione che contiene [(by, 1), (by, 1), (by, 1), (by, 1)] invece che trasmettere tutte queste coppie, trasmette solo una coppia (by, 4)

In [39]:
frequencies = tokens1.reduceByKey(lambda x, y: x+y)

Possiamo vedere quali sono i token più frequenti

In [40]:
frequencies.takeOrdered(10, lambda x: -x[1])

[('the', 27378),
 ('and', 26084),
 ('i', 22538),
 ('to', 19771),
 ('of', 17481),
 ('a', 14725),
 ('you', 13826),
 ('my', 12490),
 ('that', 11318),
 ('in', 11112)]

Come si può notare sono tutte stopwords, ossia termini di uso comune (articoli, congiunzioni, etc).
Possiamo sfruttare l'elenco <b>english_stopwords</b> per rimuovere tutti i token che sono considerati stopwords.

In [41]:
english_stopwords = ["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now", "d", "o"]

Essendo <b>english_stopwords</b> una struttura dati, per poterla utilizzare per filtrare l'RDD <b>frequencies</b>, innanzi tutto è necessario distribuirla ai worker utilizzando la funzione <i>broadcast</i>.

In [42]:
stopwordsBroadcast = sc.broadcast(english_stopwords)

Adesso possiamo <b>english_stopwords</b> per rimuovere da frequencies tutti i token che sono stopwords.
Per verificare se un elemento è presente in una collezione di Python si può usare la sintassi: *elemento* **in** *collezione*; oppure per verificare che non sia presente *elemento* **not in** *collezione*

Ricordarsi che per accedere al contenuto di una variabile in broadcast bisogna usare *variabileBroadcast.value*

In [43]:
frequenciesNoStopwords = frequencies.filter(lambda x: x[0] not in stopwordsBroadcast.value)

Possiamo ora ripetere ora l'operazione per visualizzare le prime 10 parole più usate da Shakespeare nei suoi Sonetti.

In [44]:
frequenciesNoStopwords.takeOrdered(10, lambda x: -x[1])

[('thou', 5549),
 ('thy', 4034),
 ('shall', 3600),
 ('thee', 3181),
 ('lord', 3094),
 ('king', 3041),
 ('good', 2834),
 ('sir', 2764),
 ('come', 2519),
 ('ll', 2409)]