# il Resilient Distributed Dataset (RDD)
Il Resilient Distributed Dataset (RDD) è l'astrazione principale di Spark, una collezione di elementi partizionati tra i nodi del cluster che possono essere operati in parallelo. In questo notebook vederemo le operazioni principali che possiamo eseguire su un RDD.

## Inizializzazione di Spark

Per inizializzazione un'applicazione dobbiamo creare un'oggetto *SparkContext*, che indicherà a spark come accedere al cluster che abbiamo creato attraverso il dockercompose

In [2]:
%pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

## Creazione di un RDD
Possiamo creare un RDD passando una lista al metodo *.parallelize(list)* dell'istanza della classe *SparkContext*.

In [4]:
%pyspark
data = [0,1,2,3,4,5,6,7,8,9]
dataDist = sc.parallelize(data)
type(dataDist)

## Azioni principali sul RDD
Possiamo raccogliere i dati distribuiti dal RDD in una lista utilizzando il metodo *.collect()*.

In [6]:
%pyspark
dataList = dataDist.collect()
print(type(dataList))
print(dataList)

Se invece volessimo ottenere soltanto n elementi, possiamo utilizzare il metodo *.take(n)*, ad esempio selezioniamo soltato 3 elementi.

In [8]:
%pyspark
dataList = dataDist.take(3)
print(type(dataList))
print(dataList)

Per contare il numero di elementi di un RDD possiamo usare il metodo *.count()*.

In [10]:
%pyspark
dataDist.count()

Se invece volessimo contare il numero di elementi unici possiamo usare il metodo .countByValue(), il risultato sarà un oggetto *defaultdict* che mappa ogni elemento del RDD al numero delle volte che questo elemento viene trovato all'interno del RDD.

In [12]:
%pyspark
dataDist.countByValue()

Possiamo ottenere gli n valori maggiori all'interno del RDD usando il metodo *top(n)*

In [14]:
%pyspark
dataDist.top(5)

## Altre azioni sul RDD
Vediamo altre azioni che possiamo eseguire sugli RDD. Definiamo due nuovi RDD.

In [16]:
%pyspark
dist1 = sc.parallelize([1,2,3,4,5])
dist2 = sc.parallelize([5,6,7,8,9])

### Union
Ci permette di unire due RDD in un unico RDD.

In [18]:
%pyspark
dist3 = dist1.union(dist2)
dist3.collect()

### Intersection
Ci permette di creare un nuovo RDD contenente solo gli elementi presenti in entrambi gli RDD.

In [20]:
%pyspark
dist3 = dist1.intersection(dist2)
dist3.collect()

### Subtract
Ci permette di creare un nuovo RDD con gli elementi del primo RDD non presenti anche nel secondo RDD.

In [22]:
%pyspark
dist3 = dist1.subtract(dist2)
dist3.collect()

### Cartesian
Il risultato è un nuovo RDD composto da tutte le combinazioni di 2 coppie di elementi presi dai due RDD.

In [24]:
%pyspark
dist3 = dist1.cartesian(dist2)
dist3.collect()

## Map e Reduce
Le applicazioni principali del RDD, come per qualsiasi altro tipo di oggetto distribuito, sono **Map** e **Reduce**.
<br><br>
**Map** ci permette di applicare un'operazione ad ogni elemento del RDD, passando al suo interno la funzione da applicare, facciamo un'esempio con una funzione che calcola il quadrato di ogni valore all'interno del RDD.

In [26]:
%pyspark
def compute_pow(d):
    return d*d

powDist = dataDist.map(compute_pow)
powDist.collect()

Quando la funzione da applicare non contiene più di un'istruzione, possiamo anche definirla tramite una **funzione lambda**.

In [28]:
%pyspark
powDist = dataDist.map(lambda d: d*d)
powDist.collect()

Spark mette a disposizione anche un metodo *.flatMap(func)*, che ritorna gli elementi del RDD all'interno di un'unica lista. Facciamo un'esempio ! Creiamo un nuovo RDD con 3 brevi frasi come elementi, ora usiamo il metodo *map* per dividere le parole all'interno di una frase.

In [30]:
%pyspark
s = ["Questo corso spacca !", "Ho messo mi piace alla pagina di ProfessionAI","Diventerò un grande datascientist"]
sDist = sc.parallelize(s)

lensDist = sDist.map(lambda w: w.split())
lensDist.collect()

Il risulato è una lista con 3 elementi, quindi la stessa dimensione della lista iniziale, in cui ogni elemento della lista è a sua volta una lista con le parole che compongono la frase. Proviamo a fare la stessa cosa usando il metodo *flatMap*.

In [32]:
%pyspark
sDist = sc.parallelize(s)

wordsDist = sDist.flatMap(lambda w: w.split())
wordsDist.collect()

Il risultato questa volta è una lista con tutte le parole di tutte le frasi al suo interno, in sostanza flatMap esegue l'appiattimento **(flattening)** dell'ouput.

Passiamo a **Reduce**, che ci permette di aggregare gli elementi all'interno del RDD in base ad una funzione definita da noi, ad esempio utilizziamo per sommare tutti gli elementi all'interno del nostro RDD iniziale.

In [35]:
%pyspark
def add(a,b):
    return a+b

dataSum = dataDist.reduce(add)
print(dataSum)

Anche in questo caso possiamo utilizzare una lambda function.

In [37]:
%pyspark
dataSum = dataDist.reduce(lambda a,b: a+b)
print(dataSum)

Oppure la funzione *add(a,b)* del modulo *operator* di Python, il risultato sarà sempre lo stesso,

In [39]:
%pyspark
from operator import add

dataSum = dataDist.reduce(add)
print(dataSum)

## Trasformazioni sul RDD
Vediamo alcuni metodi utili che ci permettono di eseguire trasformazioni su di un RDD.

### Filter
Il metodo filter ci permette di filtrare gli elementi del RDD in base ad una funzione definita da noi, ad esempio creiamo un nuovo RDD con 10 parole e filtriamo quelle che hanno una lunghezza superiore a 15 caratteri.

In [42]:
%pyspark
words = ["Artificial Intelligence","Machine Learning", "Reinforcement Learning"
         "Deep Learning","Computer Vision", "Natural Language Processing",
        "Augmented Reality", "Blockchain", "Robotic", "Cyber Security"]

wordsDist = sc.parallelize(words)

filterDist = wordsDist.filter(lambda w: len(w)>15)
filterDist.collect()

Oppure filtriamo solo quelle che cominciamo per una vocale

In [44]:
%pyspark
filterDist = wordsDist.filter(lambda w: (w[0].lower() in "aeiou"))
filterDist.collect()

### Distinct
Il metodo *.dinstrinct()* ci permette di ridurre il contenuto del RDD ad elementi unici, rimuovendo eventuali doppi.

In [46]:
%pyspark
namesDist = sc.parallelize(["Giuseppe","Francesco","Antonio","Antonio","Giuseppe"])

uniqueDist = namesDist.distinct()
uniqueDist.collect()

### Sample
Il metodo *.sample(withReplacement, fraction)* ci permette di selezionare casualmente dal RDD degli elementi, questo metodo ha bisogno di due parametri:
* **withReplacement**: va settato a True se un elemento può essere selezionato più di una volta, a False altrimenti.
* **fraction**: probabilità che un elemento ha di essere selezionato, una probabilità di 0 ci ritornerà un rdd vuoto, una probabilità di 0.5 indica che ogni elemento ha il 50% di possibilità di essere selezionato, una probabilità di 1 ritornerà l'RDD originale.

In [48]:
%pyspark
wordsDist.sample(withReplacement=False, fraction=0.5).collect()

## Approfondimenti e link utili
* [Documentazione della classe RDD di PySpark](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)
* [Per approfondire la differenza tra map e flatMap](https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/Difference-between-flatMap()-and-map()-on-an-RDD)
* [Le funzioni Lambda di Python](https://www.meccanismocomplesso.org/lessons-lezioni-di-python-le-funzioni-lambda-functions/)