# il Resilient Distributed Dataset (RDD)
Il Resilient Distributed Dataset (RDD) è l'astrazione principale di Spark, una collezione di elementi partizionati tra i nodi del cluster che possono essere operati in parallelo. In questo notebook vederemo le operazioni principali che possiamo eseguire su un RDD.

## Inizializzazione di Spark

Per inizializzazione un'applicazione dobbiamo creare un'oggetto *SparkContext*, che indicherà a spark come accedere al cluster, l'oggetto ha bisogno di una configurazione, che possiamo creare con la classe *SparkConf*. All'interno della configurazione dovremo specificare almeno:
* **nome dell'applicazione**: tramite il metodo *setAppName(string)*
* **indirizzo del cluster**: tramite il metodo *setMaster(string)*, nel caso in cui usiamo la nostra macchina locale, possiamo specificare 'local'.

In [2]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("basic").setMaster("local")
sc = SparkContext(conf=conf)

## Creazione di un RDD
Possiamo creare un RDD passando una lista al metodo *.parallelize(list)* dell'istanza della classe *SparkContext*.

In [3]:
data = [0,1,2,3,4,5,6,7,8,9]
dataDist = sc.parallelize(data)
type(dataDist)

pyspark.rdd.RDD

## Azioni sul RDD
Possiamo raccogliere i dati distribuiti dal RDD in una lista utilizzando il metodo *.collect()*.

In [4]:
dataList = dataDist.collect()
print(type(dataList))
print(dataList)

<class 'list'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


Se invece volessimo ottenere soltanto n elementi, possiamo utilizzare il metodo *.take(n)*, ad esempio selezioniamo soltato 3 elementi.

In [65]:
dataList = dataDist.take(3)
print(type(dataList))
print(dataList)

<class 'list'>
[0, 1, 2]


Per contare il numero di elementi di un RDD possiamo usare il metodo *.count()*.

In [5]:
dataDist.count()

10

Se invece volessimo contare il numero di elementi unici possiamo usare il metodo .countByValue(), il risultato sarà un oggetto *defaultdict* che mappa ogni elemento del RDD al numero delle volte che questo elemento viene trovato all'interno del RDD.

In [11]:
dataDist.countByValue()

defaultdict(int, {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1})

Possiamo ottenere gli n valori maggiori all'interno del RDD usando il metodo *top(n)*

In [16]:
dataDist.top(5)

[9, 8, 7, 6, 5]

## Map e Reduce
Le applicazioni principali del RDD, come per qualsiasi altro tipo di oggetto distribuito, sono **Map** e **Reduce**.
<br><br>
**Map** ci permette di applicare un'operazione ad ogni elemento del RDD, passando al suo interno la funzione da applicare, facciamo un'esempio con una funzione che calcola il quadrato di ogni valore all'interno del RDD.

In [17]:
def compute_pow(d):
    return d*d

powDist = dataDist.map(compute_pow)
powDist.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Quando la funzione da applicare non contiene più di un'istruzione, possiamo anche definirla tramite una **funzione lambda**.

In [18]:
powDist = dataDist.map(lambda d: d*d)
powDist.collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Spark mette a disposizione anche un metodo *.flatMap(func)*, che ritorna gli elementi del RDD all'interno di un'unica lista. Facciamo un'esempio ! Creiamo un nuovo RDD con 3 brevi frasi come elementi, ora usiamo il metodo *map* per dividere le parole all'interno di una frase.

In [36]:
s = ["Questo corso spacca !", "Ho messo mi piace alla pagina di ProfessionAI","Seguo Giuseppe Gullo su Youtube"]
sDist = sc.parallelize(s)

lensDist = sDist.map(lambda w: w.split())
lensDist.collect()

[['Questo', 'corso', 'spacca', '!'],
 ['Ho', 'messo', 'mi', 'piace', 'alla', 'pagina', 'di', 'ProfessionAI'],
 ['Seguo', 'Giuseppe', 'Gullo', 'su', 'Youtube']]

Il risulato è una lista con 3 elementi, quindi la stessa dimensione della lista iniziale, in cui ogni elemento della lista è a sua volta una lista con le parole che compongono la frase. Proviamo a fare la stessa cosa usando il metodo *flatMap*.

In [37]:
sDist = sc.parallelize(s)

lensDist = wordsDist.flatMap(lambda w: w.split())
lensDist.collect()

['Questo',
 'corso',
 'spacca',
 '!',
 'Ho',
 'messo',
 'mi',
 'piace',
 'alla',
 'pagina',
 'di',
 'ProfessionAI',
 'Seguo',
 'Giuseppe',
 'Gullo',
 'su',
 'Youtube']

Il risultato questa volta è una lista con tutte le parole di tutte le frasi al suo interno, in sostanza flatMap esegue l'appiattimento **(flattening)** dell'ouput.

Passiamo a **Reduce**, che ci permette di aggregare gli elementi all'interno del RDD in base ad una funzione definita da noi, ad esempio utilizziamo per sommare tutti gli elementi all'interno del nostro RDD iniziale.

In [32]:
def add(a,b):
    return a+b

dataSum = dataDist.reduce(add)
print(dataSum)

45


Anche in questo caso possiamo utilizzare una lambda function.

In [33]:
dataSum = dataDist.reduce(lambda a,b: a+b)
print(dataSum)

45


Oppure la funzione *add(a,b)* del modulo *operator* di Python, il risultato sarà sempre lo stesso,

In [35]:
from operator import add

dataSum = dataDist.reduce(add)
print(dataSum)

45


## Trasformazioni sul RDD
Vediamo alcuni metodi utili che ci permettono di eseguire trasformazioni su di un RDD.

### Filter
Il metodo filter ci permette di filtrare gli elementi del RDD in base ad una funzione definita da noi, ad esempio creiamo un nuovo RDD con 10 parole e filtriamo quelle che hanno una lunghezza superiore a 15 caratteri.

In [42]:
words = ["Artificial Intelligence","Machine Learning", "Reinforcement Learning"
         "Deep Learning","Computer Vision", "Natural Language Processing",
        "Augmented Reality", "Blockchain", "Robotic", "Cyber Security"]

wordsDist = sc.parallelize(words)

filterDist = wordsDist.filter(lambda w: len(w)>15)
filterDist.collect()

['Artificial Intelligence',
 'Machine Learning',
 'Reinforcement LearningDeep Learning',
 'Natural Language Processing',
 'Augmented Reality']

Oppure filtriamo solo quelle che cominciamo per una vocale

In [44]:
filterDist = wordsDist.filter(lambda w: (w[0].lower() in "aeiou"))
filterDist.collect()

['Artificial Intelligence', 'Augmented Reality']

### Distinct

Il metodo *.dinstrinct()* ci permette di ridurre il contenuto del RDD ad elementi unici, rimuovendo eventuali doppi.

In [45]:
namesDist = sc.parallelize(["Giuseppe","Francesco","Antonio","Antonio","Giuseppe"])

uniqueDist = namesDist.distinct()
uniqueDist.collect()

['Giuseppe', 'Francesco', 'Antonio']

### Sample
Il metodo *.sample(withReplacement, fraction)* ci permette di selezionare casualmente dal RDD degli elementi, questo metodo ha bisogno di due parametri:
* **withReplacement**: va settato a True se un elemento può essere selezionato più di una volta, a False altrimenti.
* **fraction**: probabilità che un elemento ha di essere selezionato, una probabilità di 0 ci ritornerà un rdd vuoto, una probabilità di 0.5 indica che ogni elemento ha il 50% di possibilità di essere selezionato, una probabilità di 1 ritornerà l'RDD originale.

In [53]:
wordsDist.sample(withReplacement=False, fraction=0.5).collect()

['Computer Vision',
 'Natural Language Processing',
 'Blockchain',
 'Cyber Security']

## Operazioni su RDD
Vediamo altre operazioni che possiamo eseguire sugli RDD. Definiamo due nuovi RDD.

In [54]:
dist1 = sc.parallelize([1,2,3,4,5])
dist2 = sc.parallelize([5,6,7,8,9])

### Union
Ci permette di unire due RDD in un unico RDD.

In [56]:
dist3 = dist1.union(dist2)
dist3.collect()

[1, 2, 3, 4, 5, 5, 6, 7, 8, 9]

### Intersection
Ci permette di creare un nuovo RDD contenente solo gli elementi presenti in entrambi gli RDD.

In [59]:
dist3 = dist1.intersection(dist2)
dist3.collect()

[5]

### Subtract
Ci permette di creare un nuovo RDD con gli elementi del primo RDD non presenti anche nel secondo RDD.

In [62]:
dist3 = dist1.subtract(dist2)
dist3.collect()

[2, 4, 1, 3]

### Cartesian
Il risultato è un nuovo RDD composto da tutte le combinazioni di 2 coppie di elementi presi dai due RDD.

In [61]:
dist3 = dist1.cartesian(dist2)
dist3.collect()

[(1, 5),
 (1, 6),
 (1, 7),
 (1, 8),
 (1, 9),
 (2, 5),
 (2, 6),
 (2, 7),
 (2, 8),
 (2, 9),
 (3, 5),
 (3, 6),
 (3, 7),
 (3, 8),
 (3, 9),
 (4, 5),
 (4, 6),
 (4, 7),
 (4, 8),
 (4, 9),
 (5, 5),
 (5, 6),
 (5, 7),
 (5, 8),
 (5, 9)]

## Approfondimenti e link utili
* [Documentazione della classe RDD di PySpark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)
* [Per approfondire la differenza tra map e flatMap](https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/Difference-between-flatMap()-and-map()-on-an-RDD)
* [Le funzioni Lambda di Python](https://www.meccanismocomplesso.org/lessons-lezioni-di-python-le-funzioni-lambda-functions/)