<a href="https://colab.research.google.com/github/3394424051/COVID-19/blob/master/Corso_CONSOB_2223_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark
La libreria PySpark mette a disposizione su Python l'ambiente per lo sviluppo e l'esecuzione di codice Spark in Python. La libreria pyspark non è disponibile per default, per cui dobbiamo innanzitutto scaricarla ed installarla.

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=627c7d5e5831e8466731683ff7477347213092600ba17b47c194843b24e57064
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Una volta scaricato ed installato pyspark, utilizziamo le seguenti istruzioni per farlo partire.
Nota importante: il codice scritto per pyspark può funzionare in tre diverse modalità:


*   modalità locale, sfruttando eventualmente tutti i processori presenti sul nostro computer
*   modalità cluster, sfruttando un sistema distribuito basato su Spark
*   modalità yarn, sfruttando un sistema distribuito basato su Hadoop

Durante la lezione, utilizzeremo la sola modalità locale (local). Tuttavia, il codice che andremo a scrivere funzionerà identicamente su un sistema distribuito vero e proprio. La modalità di funzionamento è quella indicata alla voce **master** nelle istruzioni che seguono. Nello specificare la modalità local, possiamo anche indicare quanti processori andranno utilizzati. Indichiamo il valore * se vogliamo usarli tutti. 




In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab")\
        .getOrCreate()


sc = spark.sparkContext

Una volta eseguito il codice soprastante, Spark è stato lanciato in esecuzione ed è pronto per essere utilizzato.
Spark mette a disposizione diverse librerie e diversi approcci per il calcolo distribuito. Noi approfondiremo il caso delle RDD.

## RDD
Le RDD sono delle strutture dati distribuite disponibili con Spark che consentono di mantenere una collezione di valori, oppure una collezione di coppie (chiave, valore), di taglia estremamente grande utilizzando un approccio distribuito. A grandi linee, sono come dei vettori. Avremo l'impressione di creare e manipolare RDD all'interno della nostra applicazione, in realtà le RDD si trovano altrove, si trovano su Spark. Quello che noi abbiamo sotto gli occhi, in realtà, è soltanto uno *stub*, una sorta di fantoccio che prende ordini da noi per poi girarli a Spark. 

Vediamo intanto come si crea una RDD. Due le opzioni principali: la creo a partire da una collezione preesistente oppure la creo a partire da un file.

Nel primo caso, posso utilizzare il metodo **parallelize** di SparkContext per trasformare una qualsiasi collezione in una RDD.

In [None]:
valori = [4,3,7,2,33,44,11]
qualcosa = sc.parallelize(valori)

In [None]:
qualcosa

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

Spark offre due strumenti per lavorare con le RDD:
* Azioni: processano il contenuto di una RDD e restituiscono all'applicazione driver uno o più valori
* Trasformazioni: processano il contenuto di una RDD e restituiscono altre RDD

In [None]:
# Esempi di azioni

# la taglia della RDD
qualcosa.count()
# l'elemento di valore minimo
qualcosa.min()
# l'elemento di valore massimo
qualcosa.max()

44

Se proviamo ad accedere fisicamente e direttamente al contenuto di una RDD non otteniamo nulla, poiché la RDD (sulla nostra macchina) non contiene alcun dato, i dati sono altrove.

In [None]:
qualcosa[0]

Nel caso ci fosse bisogno di riavere alcuni o tutti i dati presenti in una RDD, posso riottenerli attraverso le seguenti azioni

In [None]:
# take mi restituisce i primi n elementi
qualcosa.take(3)
# collect mi restituisce tutti gli elementi
qualcosa.collect()

[4, 3, 7, 2, 33, 44, 11]

Nella realtà, se usiamo Spark e se usiamo le RDD è perché i nostri dataset sono giganteschi, per cui probabilmente nella mia applicazioe non ci sarà abbastanza spazio per contenerli tutti. Per questo motivo, usare il metodo collect non è una buona idea.

Ci soffermiamo piuttosto sulle trasformazioni, sono quelle più interessanti, poiché ci consentono di operare in distribuito su big data sfruttando il parallelismo reso possibile dalla presenza di numerosi computer/processori.

Per approfondire questo secondo caso, introduciamo innanzitutto l'utilizzo di file esterni. Spark mette a disposizione il metodo **textFile** attraverso il quale posso caricare il contenuto di un file di testo in una RDD in cui ciascun elemento è una stringa corrispondente ad una delle righe del file letto. Nell'esempio sottostante carichiamo il contenuto del file frutta.txt, per distinguere le variabili RDD dalle altre, faccio precedere il loro dal carattere 'd'.

In [None]:
dFrutta = sc.textFile('frutta.txt')

In [None]:
dFrutta.count()
dFrutta.take(3)

Problema 1: determinare la frequenza con cui ricorre il nome di ciascun frutto.
Lo risolviamo in due passi. Nel primo, accostiamo ad ogni nome di frutto il valore 1 (la sua frequenza).
Nel secondo, aggreghiamo coppie che hanno la stessa chiave, sommando le rispettive frequenze.

In [None]:
dFrutta.take(3)

Implementiamo il primo passo attraverso una trasformazione map, questa processerà ciascun elemento di dFrutta accostando ad esso il valore 1. La logica della funzione, per ogni elemento x in input, è restituire in output la coppia (x,1). E' possibile codificare questa operazione utilizzando le funzioni *lambda* di Python.

In [None]:
dFrutta1 = dFrutta.map(lambda x: (x,1))

In [None]:
dFrutta1.count()
dFrutta1.take(6)

[('Arance', 1),
 ('Susine', 1),
 ('Carote', 1),
 ('Zucchine', 1),
 ('Susine', 1),
 ('Susine', 1)]

Nel secondo passo dell'algoritmo, aggreghiamo tutte le coppie che presentano la stessa chiave, sommando le rispettive frequenze. Questa trasformazione si implementa in Spark attraverso una funzione *reduce*. In particolare, dovendo aggregare le coppie che condividono la stessa chiave, utilizziamo **reduceByKey**.

In [None]:
dFrutta2 = dFrutta1.reduceByKey(lambda x, y:x+y)

In [None]:
dFrutta2.count()
dFrutta2.take(8)

[('Arance', 3),
 ('Carote', 3),
 ('Zucchine', 1),
 ('Mele', 2),
 ('Kiwi', 2),
 ('Susine', 4),
 ('Banane', 1),
 ('Polpelmi', 1)]

**Problema 2:** implementare l'algoritmo di word counting su un file di testo generico.

In [None]:
dTesto = sc.textFile('testo.txt')

In [None]:
dTesto.count()
dTesto.take(4)

["Naso d'Argento",
 '(di Italo Calvino)',
 '',
 "C'era una lavandaia che era rimasta vedova con tre figliole. S'ingegnavano tutte e quattro a lavar roba piu' che potevano, ma pativano la fame lo stesso. Un giorno la figlia maggiore disse alla madre: "]

Da un primo esame, notiamo che in questo caso ciascuna riga contiene numerose parole. Utilizziamo quindi una trasformazione map per estrapolare da questa riga le parole in esse contenute. A differenza di prima, questa map potenzialmente restituisce per ogni valore in input più valori in output. Per gestire questo caso si utilizza una variante di **map** chiamata **flatMap**. Inoltre, la trasformazione che realizzeremo dovrà contenere al suo interno la logica per estrarre da una riga di testo le parole presenti in essa.

In [None]:
dParole1 = dTesto.flatMap(lambda x: x.split())

In [None]:
dParole1.count()
dParole1.take(20)

Osserviamo che siamo riusciti ad estrapolare le singole parole correttamente, tuttavia ci sono diverse parole da ripulire, perlomeno rimuovendo i segni di punteggiatura e portando tutto in minuscolo. Dal momento che queste trasformazioni sono da applicare ad ogni singola parola e restituiscono, per ciascuna parola un'altra parola, utilizzo una semplice **map** per trasformare tutto in minuscolo ed una semplice **map** per rimuovere i caratteri spuri.

In [None]:
dParole2 = dParole1.map(lambda x: x.lower())

In [None]:
dParole2.take(20)

In [None]:
dParole3 = dParole2.map(lambda x: x.strip('(),.;:-!?'))
dParole3= dParole3.filter(lambda x: len(x)>0)

In [None]:
dParole3.take(20)

In [None]:
dParole4 = dParole3.map(lambda x: (x,1))

In [None]:
dParole5 = dParole4.reduceByKey(lambda x,y: x+y)

In [None]:
dParole5.take(5)

[('naso', 4), ("d'argento", 4), ("c'era", 2), ('lavandaia', 1), ('vedova', 1)]

Avremmo risolto il problema, tuttavia la domanda che potrebbe sorgere è: qual è la parola più frequente? Possiamo dare una risposta aiutandoci con l'azione **max**, restituisce l'elemento massimo in una RDD. Tuttavia, questa azione restituisce in una RDD di tuple, la tupla la cui chiave è quella più grande. Essendo le chiavi delle stringhe, max restituisce il valore lessicograficamente più alto.

In [None]:
dParole5.max()

('voglio', 1)

L'azione **max**, come molte altre di Spark, può essere riprogrammata in modo da ragionare diversamente dallo standard. In questo caso, avrei bisogno di valutare l'elemento massimo non rispetto alla chiave ma rispetto al valore. Nell'esempio che segue, per ogni tupla x, valuto l'elemento massimo facendo perno su x[1], ossia, la sua frequenza.

In [None]:
dParole5.max(key=(lambda x: x[1]))

('che', 6)

# Esercizio 10
## Punto 1

In [None]:
# Contare le parole che contengono la sottostringa 'lo'

sottostringa = 'ci'

# Soluzione 1
dTesto = sc.textFile('testo1.txt')
dParole1 = dTesto.flatMap(lambda x: x.split())
dParole2 = dParole1.map(lambda x: x.lower())
dParole2bis = dParole2.filter(lambda x: sottostringa in x)
dParole2bis.count()
#dParole3 = dParole2bis.map(lambda x: x.strip('(),.;:-!?'))
#dParole3= dParole3.filter(lambda x: len(x)>0)
#dParole4 = dParole3.map(lambda x: (x,1))
#dParole5 = dParole4.reduceByKey(lambda x,y: x+y)


39

In [None]:
# Soluzione 2
dTesto = sc.textFile('testo1.txt')
dParole1 = dTesto.flatMap(lambda x: x.split())
dParole2 = dParole1.map(lambda x: x.lower())
dParole3 = dParole2.map(lambda x: x.strip('(),.;:-!?'))
dParole3= dParole3.filter(lambda x: len(x)>0)
dParole4 = dParole3.map(lambda x: (x,1))
dParole5 = dParole4.reduceByKey(lambda x,y: x+y)

dParole6 = dParole5.filter(lambda x: sottostringa in x[0])
dFrequenze = dParole6.values()
conteggio = dFrequenze.reduce(lambda x,y: x+y)


In [None]:
conteggio

39

## punto 2

In [None]:
dTestoA = sc.textFile('testo1.txt')
dTestoB = sc.textFile('testo2.txt')

dParoleA1 = dTestoA.flatMap(lambda x: x.split())
dParoleA2 = dParoleA1.map(lambda x: x.lower())
dParoleA3 = dParoleA2.map(lambda x: x.strip('(),.;:-!?'))
dParoleA3= dParoleA3.filter(lambda x: len(x)>0)
print(dParoleA3.count())

dParoleB1 = dTestoB.flatMap(lambda x: x.split())
dParoleB2 = dParoleB1.map(lambda x: x.lower())
dParoleB3 = dParoleB2.map(lambda x: x.strip('(),.;:-!?'))
dParoleB3= dParoleB3.filter(lambda x: len(x)>0)
print(dParoleB3.count())

dParoleA4 = dParoleA3.map(lambda x: (x, len(x)))
max_word_len = dParoleA4.max(key=lambda x: x[1])[1]
print(dParoleA4.filter(lambda x: x[1]==max_word_len).collect())

dParoleB4 = dParoleB3.map(lambda x: (x, len(x)))
max_word_len = dParoleB4.max(key=lambda x: x[1])[1]
print(dParoleB4.filter(lambda x: x[1]==max_word_len).collect())

dTestoAB = sc.union([dTestoA, dTestoB])
print(dTestoAB.count())

# Parole presenti in entrambi i file
# elenco delle parole per file senza ripetizioni

#dParoleAB = dParoleA3.join(dParoleB3)
dParoleA5 = dParoleA3.distinct().map(lambda x: (x,1))
dParoleB5 = dParoleB3.distinct().map(lambda x: (x,1))
dParoleAB = dParoleA5.join(dParoleB5)
dParoleAB.take(20)



In [None]:
dParoleAB.saveAsTextFile('dParoleAB.txt')