# Laboratorio di Big Data - a.a. 2022/2023

Materiale a cura di Roberto Grasso. 

roberto.grasso@phd.unict.it

# Lecture 1

In questa lezione studieremo le basi di Spark.

## Da MapReduce a Spark
Perché passare da MapReduce a Spark?

1. Prestazioni migliori: Spark è noto per avere prestazioni migliori rispetto a MapReduce grazie alla sua architettura in-memory e alla sua capacità di sfruttare meglio le risorse hardware.
2. Maggiore disponibilità di API: Spark offre un maggior numero di API rispetto a MapReduce. Questo rende possibile scrivere codice in diversi linguaggi di programmazione, tra cui Java, Scala, Python e R.
3. Ampia gamma di librerie: Spark offre un'ampia gamma di librerie integrate, tra cui Spark SQL per l'elaborazione di dati strutturati, MLlib per il machine learning e GraphX per l'elaborazione di grafi.
4. Facilità di utilizzo: Spark ha un'interfaccia utente più semplice rispetto a MapReduce, che lo rende più facile da utilizzare.
5. Supporto della comunità: Spark ha una grande e attiva comunità di sviluppatori che fornisce supporto e risorse per gli utenti.


## Il costrutto chiave: Resilient Distributed Dataset (RDD)

Il __Resilient Distributed Dataset__ (RDD) è una struttura di dati fondamentale in Spark.
Un RDD rappresenta una collezione *immutabile* e *distribuita* di oggetti che possono essere processati in parallelo attraverso un cluster.

Con il termine *immutabile* si indende che, una volta creato, l'RDD non può essere modificato ma solo trasformato in un nuovo RDD tramite delle operazioni che vedremo a breve. Questo significa che ogni operazione su un RDD produce un nuovo RDD senza influenzare l'originale. L'immutabilità consente una migliore gestione della concorrenza e una maggiore efficienza in alcune operazioni (poiché le parti di un RDD possono essere elaborate in parallelo senza dover gestire la sincronizzazione tra i thread).

Ci sono due modi principali per creare un RDD: leggere dati da una fonte esterna come Hadoop Distributed File System (HDFS) o da un database, o trasformare un RDD esistente attraverso le operazioni map, filter, reduce, aggregate, ecc...

### Operazioni base sugli RDD

Le operazioni sugli RDD si possono suddividere in trasformazioni e azioni.
* Trasformazioni: sono operazioni che consentono di costruire un RDD attraverso delle operazioni deterministiche su altri RDD (`map`, `filter`, `join`, `union`, `intersection`, `distinct`, ecc.). Queste operazioni sono _lazy_: non si calcola nulla fino a quando non è esplicitamente richiesto da un'azione.
* Azioni: sono operazioni che servono per restituire valori o esportare dati (`count`, `collect`, `reduce`, `save`, ecc.).  Queste azioni forzano le operazioni di trasformazione e restituiscono un nuovo RDD.

Cominciamo creando una sessione Spark e caricando un semplice file di testo.


In [1]:
# Librerie
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.functions import hash
from pyspark import StorageLevel
import random
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
import os
import shutil

In [2]:
# Sessione Spark
spark = SparkSession.builder.appName("LabBigData23").getOrCreate()
sc = spark.sparkContext

# Caricamento dati da un file di testo
rdd = sc.textFile("../data/file1.txt")

/usr/local/lib/python3.9/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/02 09:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


La funzione `collect()` ritorna gli elementi di un RDD come un array.
Va usata con cautela perché nelle applicazioni reali solitamente si lavora con dataset molto grandi. Di conseguenza questa operazione può essere molto onerosa.

In [3]:
print(rdd.collect())

['1,John,Doe,25,1000,IT', '2,Jane,Smith,32,1500,IT', '3,Bob,Johnson,45,2500,Sales', '4,Alice,Jones,28,1200,Sales', '5,Mark,Williams,38,2000,Sales']


La funzione `map()` applica un'operazione ad ogni elemento di un RDD e ne restituisce uno nuovo che contiene il risultato.

In [4]:
# Upper
rdd.map(lambda x: x.upper()).collect()


['1,JOHN,DOE,25,1000,IT',
 '2,JANE,SMITH,32,1500,IT',
 '3,BOB,JOHNSON,45,2500,SALES',
 '4,ALICE,JONES,28,1200,SALES',
 '5,MARK,WILLIAMS,38,2000,SALES']

La funzione `filter()` consente di selezionare un sottoinsieme degli elementi di un RDD in base ad una condizione booleana.

In [5]:
# Righe che contengono "J"
rdd.filter(lambda x: "J" in x).collect()


['1,John,Doe,25,1000,IT',
 '2,Jane,Smith,32,1500,IT',
 '3,Bob,Johnson,45,2500,Sales',
 '4,Alice,Jones,28,1200,Sales']

La funzione `flatMap()` applica una funzione ad ogni elemento dell'RDD, fa il _flattening_ del risultato e restituisce un nuovo RDD.

In [6]:
# Separiamo con la virgola e compattiamo in un unico array
rdd.flatMap(lambda x: x.split(',')).collect()

['1',
 'John',
 'Doe',
 '25',
 '1000',
 'IT',
 '2',
 'Jane',
 'Smith',
 '32',
 '1500',
 'IT',
 '3',
 'Bob',
 'Johnson',
 '45',
 '2500',
 'Sales',
 '4',
 'Alice',
 'Jones',
 '28',
 '1200',
 'Sales',
 '5',
 'Mark',
 'Williams',
 '38',
 '2000',
 'Sales']

La funzione `mapValues()` funziona solo su RDD organizzati in coppie `(key, value)`. Applica un'operazione solo al valore di ciascuna coppia `(key, value)` dell'RDD e restituisce il risultato delll'operazione 

In [7]:
# Per prima cosa creiamo un RDD con coppie (key, value).
mv_rdd = rdd.map(lambda x: x.split(',')).map(lambda x: (x[5], x[2]))

# Vediamo il contenuto del nuovo RDD.
print(mv_rdd.collect())

# Applichiamo la funzione mapValues per trasformare i valori.
print(mv_rdd.mapValues(lambda x: x.upper()).collect())

[('IT', 'Doe'), ('IT', 'Smith'), ('Sales', 'Johnson'), ('Sales', 'Jones'), ('Sales', 'Williams')]
[('IT', 'DOE'), ('IT', 'SMITH'), ('Sales', 'JOHNSON'), ('Sales', 'JONES'), ('Sales', 'WILLIAMS')]


Prima di procedere con le altre funzioni, organizziamo i nostri dati in modo da avere delle tuple e non delle semplici stringhe. Possiamo farlo usando la funzione `map()`.

In [8]:
# Splittiamo le stringhe con la virgola
new_rdd = rdd.map(lambda x: tuple(x.split(',')))

# Vediamo il contenuto del nuovo RDD
new_rdd.collect()

[('1', 'John', 'Doe', '25', '1000', 'IT'),
 ('2', 'Jane', 'Smith', '32', '1500', 'IT'),
 ('3', 'Bob', 'Johnson', '45', '2500', 'Sales'),
 ('4', 'Alice', 'Jones', '28', '1200', 'Sales'),
 ('5', 'Mark', 'Williams', '38', '2000', 'Sales')]

La funzione `filter()` viene utilizzata per filtrare gli elementi di un RDD di PySpark in base a una condizione. Restituisce un nuovo RDD con gli elementi che soddisfano la condizione.

In [9]:
# Filtriamo gli impiegati in base allo stipendio

# Questa riga di codice restituisce un errore perché di default tutti gli elementi sono stringhe
# new_rdd.filter(lambda x: x[4] > 1800).collect()

# Quindi, prima di filtrare, dobbiamo convertire lo stipendio in un numero
new_rdd.filter(lambda x: float(x[4]) > 1800).collect()

# Notate che la conversione in float è servita solo per il filtraggio (le modifiche non sono state apportate all'RDD)


[('3', 'Bob', 'Johnson', '45', '2500', 'Sales'),
 ('5', 'Mark', 'Williams', '38', '2000', 'Sales')]

La funzione `groupByKey()`, come la funzione `mapValues()`, funziona su coppie `(key, value)`. Come si può intuire dal nome, si limita a raggruppare gli elementi in base alla chiave.

In [10]:
# Creiamo un nuovo RDD organizzato in coppie (key, value)
kv_rdd = new_rdd.map(lambda x: (x[5], (x[2], float(x[4]))))

# Abbiamo creato un nuovo RDD in cui la chiave è il nome del reparto e il valore è una tupla contenente il cognome dell'impiegato e lo stipendio
print(kv_rdd.collect())

# Raggruppiamo i dati per reparto
print(kv_rdd.groupByKey().map(lambda x: (x[0], list(x[1]))).collect())

[('IT', ('Doe', 1000.0)), ('IT', ('Smith', 1500.0)), ('Sales', ('Johnson', 2500.0)), ('Sales', ('Jones', 1200.0)), ('Sales', ('Williams', 2000.0))]
[('Sales', [('Johnson', 2500.0), ('Jones', 1200.0), ('Williams', 2000.0)]), ('IT', [('Doe', 1000.0), ('Smith', 1500.0)])]


La funzione `reduceByKey()` restituisce un nuovo RDD con i valori ridotti per ogni chiave. Prende come argomento una funzione di riduzione che accetta due valori e ne restituisce uno solo. 

In [12]:
# Sommiamo gli stipendi per reparto
kv_rdd.reduceByKey(lambda x, y: ("", x[1] + y[1])).collect()

# La funzione di riduzione, in questo caso, prende in input due tuple e restituisce una tupla con il primo elemento vuoto e il secondo elemento dato dalla somma degli stipendi

[('Sales', ('', 5700.0)), ('IT', ('', 2500.0))]

La funzione `reduce()` è più generica. Infatti, non raggrupperà gli elementi dell'RDD in base alla chiave, ma applicherà la funzione di riduzione a tutti gli elementi in maniera indiscriminata.

In [13]:
# Sommiamo gli stipendi di tutti i dipendenti, indipendentemente dal reparto
kv_rdd.reduce(lambda x, y: ("", ("", x[1][1] + y[1][1])))

('', ('', 8200.0))

La join è un'operazione che consente di unire due RDD in base alle chiavi in comune. Ci sono diverse tiplogie di join.
* Inner join: restituisce solo le righe in cui il valore della chiave è presente in entrambi gli RDD.
* Left Outer Join: restituisce tutte le righe del primo RDD (quello di sinistra) e le righe corrispondenti del secondo RDD (quello di destra). Se una riga del primo RDD non ha una corrispondenza nel secondo RDD, viene restituito un valore `NULL` per le colonne del secondo RDD.
* Right Outer Join: restituisce tutte le righe del secondo RDD (quello di destra) e le righe corrispondenti del primo RDD (quello di sinistra). Se una riga del secondo RDD non ha una corrispondenza nel primo RDD, viene restituito un valore `NULL` per le colonne del primo RDD.
* Full Outer Join: restituisce tutte le righe dei due RDD. Se una riga del primo o del secondo RDD non ha una corrispondenza nell'RDD opposto, viene restituito un valore `NULL` per le colonne mancanti.

In [14]:
# Vediamo un esempio di inner join

# Creiamo un nuovo RDD in cui la chiave è la sigla del reparto e il valore è il nome del reparto
data =  [('IT', ('Information Technology Department')),
        ('Sales', ('Sales Department')),
        ('HR', ('Human Resources Department'))]

kv_rdd2 = sc.parallelize(data)

# Associamo a ciascun dipendente, il nome del reparto
kv_rdd.join(kv_rdd2).collect()



[('Sales', (('Johnson', 2500.0), 'Sales Department')),
 ('Sales', (('Jones', 1200.0), 'Sales Department')),
 ('Sales', (('Williams', 2000.0), 'Sales Department')),
 ('IT', (('Doe', 1000.0), 'Information Technology Department')),
 ('IT', (('Smith', 1500.0), 'Information Technology Department'))]

In [15]:
# Vediamo un esempio di right outer join
kv_rdd.rightOuterJoin(kv_rdd2).collect()

[('Sales', (('Johnson', 2500.0), 'Sales Department')),
 ('Sales', (('Jones', 1200.0), 'Sales Department')),
 ('Sales', (('Williams', 2000.0), 'Sales Department')),
 ('IT', (('Doe', 1000.0), 'Information Technology Department')),
 ('IT', (('Smith', 1500.0), 'Information Technology Department')),
 ('HR', (None, 'Human Resources Department'))]

La funzione `union()` viene utilizzata per combinare due RDD al fine di crearne uno nuovo che contiene tutti gli elementi di entrambi gli RDD. 

In [16]:
# Creiamo un nuovo RDD
data =  [('IT', ('Springsteen', 2000)), 
        ('IT', ('Dylan', 1800))]

kv_rdd3 = sc.parallelize(data)

# Union
kv_rdd.union(kv_rdd3).sortByKey().collect()


[('IT', ('Doe', 1000.0)),
 ('IT', ('Smith', 1500.0)),
 ('IT', ('Springsteen', 2000)),
 ('IT', ('Dylan', 1800)),
 ('Sales', ('Johnson', 2500.0)),
 ('Sales', ('Jones', 1200.0)),
 ('Sales', ('Williams', 2000.0))]

Per concludere, vediamo un esempio in cui contiamo il numero di parole contenute all'interno di un  file di testo.

In [17]:
# Leggiamo il file di testo
lorem_ipsum_rdd = sc.textFile("../data/file2.txt")

# Ciascuna riga viene vista come una stringa. Per contare il numero di parole, dobbiamo prima dividere le stringhe in parole
lorem_ipsum_rdd.flatMap(lambda x: x.split(' ')).count()

11388

Con una sola riga di codice (escludendo la lettura da file) siamo riusciti a contare il numero di parole.

Lo stesso possiamo fare per il conteggio del numero di caratteri.

In [18]:
# Contiamo il numero di caratteri (compresi gli spazi)
lorem_ipsum_rdd.map(lambda x: len(x)).reduce(lambda x, y: x + y)

75803

## Persistenza e Caching

La __persistenza__ è una tecnica di ottimizzazione usata da Spark per migliorare le performance di accesso ai dati. 
Si può scegliere di memorizzare i dati in memoria, su disco o di utilizzare un approccio _ibrido_.

Le possibili opzioni sono:
* `MEMORY_ONLY`: mantiene l'RDD in memoria (come un oggetto Java deserialized).
* `MEMORY_ONLY_SER`: mantiene l'RDD in memoria (come un oggetto Java serialized).
* `MEMORY_AND_DISK`: mantiene l'RDD in memoria (come un oggetto Java deserialized) e salva l'eccesso su disco.
* `MEMORY_AND_DISK_SER`: mantiene l'RDD in memoria (come un oggetto Java serialized) e salva l'eccesso su disco.
* `DISK_ONLY`: memorizza l'RDD su disco.

Per cambiare la persistenza di un RDD, basta chiamare la funzione `persist()`. Per rilasciare la memoria occupata da un RDD con persistenza, basta chiamare la funzione `unpersist()`.

In [19]:
# Creiamo un RDD e impostiamo il livello di persistenza
rdd = sc.parallelize(range(1000))
rdd.persist(StorageLevel.DISK_ONLY)

# Verifichiamo il livello di persistenza
print(rdd.getStorageLevel())

# Liberiamo la memoria
rdd.unpersist()

Disk Serialized 1x Replicated


PythonRDD[58] at RDD at PythonRDD.scala:53

La funzione `cache()` equivale alla `persistence(MEMORY_ONLY)`.

In [20]:
# Impostiamo il livello di persistenza a MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)

# Verifichiamo il livello di persistenza
print(rdd.getStorageLevel())

# Liberiamo la memoria
rdd.unpersist()

# Chiamiamo la funzione cache()
rdd.cache()

# Verifichiamo il livello di persistenza
print(rdd.getStorageLevel())

# Liberiamo la memoria
rdd.unpersist()

Memory Serialized 1x Replicated
Memory Serialized 1x Replicated


PythonRDD[58] at RDD at PythonRDD.scala:53

# DataFrame

I __DataFrame__ in Spark sono delle strutture dati immutabili e distribuite. Utilizzano un approccio tabellare in cui ogni colonna ha un nome e un tipo di dato, e tutti i dati all'interno di una colonna devono essere dello stesso tipo. Sono concettualmente equivalenti alle tabelle di un database relazionale e ai dataframe di R/Python.

I DataFrame possono essere creati a partire da diverse fonti di dati, come file CSV, file di testo, database esterni, RDD, ecc.

I DataFrame possono essere manipolati utilizzando una varietà di operazioni. Supportano sia operazioni SQL-like che operazioni come map, filter, groupBy, ecc. Tutte le operazioni di trasformazione producono un nuovo DataFrame, mantenendo l'originale immutato.

La creazione di un dataframe avviene con la funzione `createDataframe()`.  Si posono costruire a partire da: liste di liste, dizionari, `pyspark.sql.Row`, ecc...

Si può decidere di specificare uno _schema_ in cui sono definiti i tipi delle colonne del dataframe (se omesso viene inferito in automatico).

In [43]:
# Creiamo un DF senza specificare lo scherma
df = spark.createDataFrame([
    Row(id=1, first_name="John", last_name="Doe", age=25, salary=1000.0, department="IT"),
    Row(id=2, first_name="Jane", last_name="Smith", age=32, salary=1500.0, department="IT"),
    Row(id=3, first_name="Bob", last_name="Johnson", age=45, salary=2500.0, department="Sales"),
    Row(id=4, first_name="Alice", last_name="Jones", age=28, salary=1200.0, department="Sales"),
    Row(id=5, first_name="Mike", last_name="Williams", age=35, salary=1800.0, department="Sales"),
])

df.printSchema()

# Creiamo lo stesso DF specificando lo schema
df = spark.createDataFrame([
    (1, "John", "Doe", 25, 1000.0, "IT"),
    (2, "Jane", "Smith", 32, 1500.0, "IT"),
    (3, "Bob", "Johnson", 45, 2500.0, "Sales"),
    (4, "Alice", "Jones", 28, 1200.0, "Sales"),
    (5, "Mark", "Williams", 38, 2000.0, "Sales")
], schema='id int, first_name string, last_name string, age int, salary float, department string')

df.printSchema()

# Un altro modo per definire uno schema 
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("department", StringType(), True)
])

# crea il DataFrame
data = [
    (1, "John", "Doe", 25, 1000.0, "IT"),
    (2, "Jane", "Smith", 32, 1500.0, "IT"),
    (3, "Bob", "Johnson", 45, 2500.0, "Sales"),
    (4, "Alice", "Jones", 28, 1200.0, "Sales"),
    (5, "Mark", "Williams", 38, 2000.0, "Sales")
]
df = spark.createDataFrame(data, schema)

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: double (nullable = true)
 |-- department: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- department: string (nullable = true)

root
 |-- id: integer (nullable = false)
 |-- first_name: string (nullable = false)
 |-- last_name: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- salary: double (nullable = true)
 |-- department: string (nullable = true)



Vediamo alcune funzioni utili.

Le prime `k` righe di un DataFrame si possono visualizzare con la funzione `show(k)`. È inoltre possibile visualizzarle in verticale qualora dovessero essere troppo lunghe da visualizzare in orizzontale.

In [44]:
# Visualizziamo le prime due righe
df.show(2)

# Visualizziamo le prime due righe in verticale
df.show(2, vertical=True)

+---+----------+---------+---+------+----------+
| id|first_name|last_name|age|salary|department|
+---+----------+---------+---+------+----------+
|  1|      John|      Doe| 25|1000.0|        IT|
|  2|      Jane|    Smith| 32|1500.0|        IT|
+---+----------+---------+---+------+----------+
only showing top 2 rows

-RECORD 0------------
 id         | 1      
 first_name | John   
 last_name  | Doe    
 age        | 25     
 salary     | 1000.0 
 department | IT     
-RECORD 1------------
 id         | 2      
 first_name | Jane   
 last_name  | Smith  
 age        | 32     
 salary     | 1500.0 
 department | IT     
only showing top 2 rows



È possibile ottenere i nomi delle colonne di un dataFrame tramite `colunms`.

In [45]:
df.columns

['id', 'first_name', 'last_name', 'age', 'salary', 'department']

La funzione `select()` consente di selezionare alcune colonne del DataFrame.

In [46]:

# Selezioniamo le colonne "first_name" e "last_name"
df.select("first_name", "last_name").show()

# Un altro modo per selezionare le stesse colonne
df.select(df.first_name, df.last_name).show()



+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|    Smith|
|       Bob|  Johnson|
|     Alice|    Jones|
|      Mark| Williams|
+----------+---------+

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|    Smith|
|       Bob|  Johnson|
|     Alice|    Jones|
|      Mark| Williams|
+----------+---------+



Tramite la funzione `describe()` possiamo ottenere alcune informazioni sul DataFrame (utili soprattutto per le colonne numeriche).

In [47]:
df.select("age", "salary").describe().show()

+-------+-----------------+-----------------+
|summary|              age|           salary|
+-------+-----------------+-----------------+
|  count|                5|                5|
|   mean|             33.6|           1640.0|
| stddev|8.018728078691781|610.7372593840988|
|    min|               25|           1000.0|
|    max|               45|           2500.0|
+-------+-----------------+-----------------+



Come per gli RDD, anche nei DataFrame è presente la funzione `collect()`. È tuttavia preferibile usare funzioni come `tail()` o `take()`.

In [48]:
# Collezioniamo tutte le righe del dataframe in una lista e stampiamole
print(df.collect())

# Collezioniamo la prima riga del dataframe e stampiamola
print(df.take(1))

# Collezioniamo l'ultima riga del dataframe e stampiamola
print(df.tail(1))

[Row(id=1, first_name='John', last_name='Doe', age=25, salary=1000.0, department='IT'), Row(id=2, first_name='Jane', last_name='Smith', age=32, salary=1500.0, department='IT'), Row(id=3, first_name='Bob', last_name='Johnson', age=45, salary=2500.0, department='Sales'), Row(id=4, first_name='Alice', last_name='Jones', age=28, salary=1200.0, department='Sales'), Row(id=5, first_name='Mark', last_name='Williams', age=38, salary=2000.0, department='Sales')]
[Row(id=1, first_name='John', last_name='Doe', age=25, salary=1000.0, department='IT')]
[Row(id=5, first_name='Mark', last_name='Williams', age=38, salary=2000.0, department='Sales')]


Un'altra funzione da usare con cautela è `toPandas()` che, come si capisce dal nome, colleziona tutti gli elementi di un DataFrame Spark e li riversa su un DataFrame di Pandas (questo può causare problemi di memoria in caso di DataFrame molto grandi).

In [26]:
df.toPandas()

Unnamed: 0,id,first_name,last_name,age,salary,department
0,1,John,Doe,25,1000.0,IT
1,2,Jane,Smith,32,1500.0,IT
2,3,Bob,Johnson,45,2500.0,Sales
3,4,Alice,Jones,28,1200.0,Sales
4,5,Mark,Williams,38,2000.0,Sales


Come abbiamo già visto negli esempi precedenti, la funzione `select()` prende in input uno o più oggetti di tipo `Column` e restituisce un nuovo DataFrame. Infatti, per selezionare una colonna specifica di un DataFrame, non è sufficiente usare un oggetto di tipo `Column`.

In [27]:
print(df.salary)

Column<'salary'>


Tuttavia, gli oggetti di tipo `Column` vengono usati da moltissime funzioni. Ad esempio, per assegnare una nuova colonna al DataFrame usando la funzione `withColumn()`.

In [49]:
# Creiamo un nuovo DataFrame aggiungendo una nuova colonna (semplicemente moltiplicando per 12 la colonna "salary")
df.withColumn("annual_salary", df.salary * 12).show()

+---+----------+---------+---+------+----------+-------------+
| id|first_name|last_name|age|salary|department|annual_salary|
+---+----------+---------+---+------+----------+-------------+
|  1|      John|      Doe| 25|1000.0|        IT|      12000.0|
|  2|      Jane|    Smith| 32|1500.0|        IT|      18000.0|
|  3|       Bob|  Johnson| 45|2500.0|     Sales|      30000.0|
|  4|     Alice|    Jones| 28|1200.0|     Sales|      14400.0|
|  5|      Mark| Williams| 38|2000.0|     Sales|      24000.0|
+---+----------+---------+---+------+----------+-------------+



Come per gli RDD, anche con i DataFrame si possono usare le funzioni `filter()`, `groupBy()`, ecc...

In [50]:
# Raggruppiamo per dipartimento e calcoliamo lo stipendio medio
df.groupBy(df.department).agg({"salary": "avg"}).show()

# Selezioniamo gli impiegati che guadagnano più di 1800
df.filter(df.salary > 1800).show()


+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|        IT|     1250.0|
|     Sales|     1900.0|
+----------+-----------+

+---+----------+---------+---+------+----------+
| id|first_name|last_name|age|salary|department|
+---+----------+---------+---+------+----------+
|  3|       Bob|  Johnson| 45|2500.0|     Sales|
|  5|      Mark| Williams| 38|2000.0|     Sales|
+---+----------+---------+---+------+----------+



È possibile applicare delle `UDF` (User Defined Functions) alle colonne dei DataFrame.

In [51]:
# Esempio di UDF
def salary_level(salary):
    if salary <= 1200.0:
        return "LOW"
    elif salary > 1200.0 and salary < 1800.0:
        return "MEDIUM"
    else:
        return "HIGH"

    
salary_level_udf = udf(salary_level, StringType())

# Aggiungiamo una colonna al DataFrame
df.withColumn("salary_level", salary_level_udf(df.salary)).show()

+---+----------+---------+---+------+----------+------------+
| id|first_name|last_name|age|salary|department|salary_level|
+---+----------+---------+---+------+----------+------------+
|  1|      John|      Doe| 25|1000.0|        IT|         LOW|
|  2|      Jane|    Smith| 32|1500.0|        IT|      MEDIUM|
|  3|       Bob|  Johnson| 45|2500.0|     Sales|        HIGH|
|  4|     Alice|    Jones| 28|1200.0|     Sales|         LOW|
|  5|      Mark| Williams| 38|2000.0|     Sales|        HIGH|
+---+----------+---------+---+------+----------+------------+



Si possono anche utilizzare le `Pandas UDF`, che consentono di lavorare direttamente sulle `Series` e sui `DataFrame`.

In [30]:
# Definimao la Padas UDF
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def annual_avg_salary(s):
    return s.mean() * 12

# Calcoliamo lo stipendio medio annuo per reparto
df.groupby('department').agg(annual_avg_salary(df['salary']).alias('annual_avg_salary')).show()



+----------+-----------------+
|department|annual_avg_salary|
+----------+-----------------+
|        IT|          15000.0|
|     Sales|          22800.0|
+----------+-----------------+



Ovviamente i DataFrame possono anche essere letti da file (csv, ORC, Parquet, ecc...). Vediamo un esempio.

In [31]:
# Leggiamo un DataFrame da CSV
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("department", StringType(), True)
])

csv_df = spark.read.csv("../data/file3.csv", header=False, schema=schema)

# Visualizziamo il contenuto
csv_df.show()

+---+----------+---------+---+------+----------+
| id|first_name|last_name|age|salary|department|
+---+----------+---------+---+------+----------+
|  1|      John|      Doe| 25|1000.0|        IT|
|  2|      Jane|    Smith| 32|1500.0|        IT|
|  3|       Bob|  Johnson| 45|2500.0|     Sales|
|  4|     Alice|    Jones| 28|1200.0|     Sales|
|  5|      Mark| Williams| 38|2000.0|     Sales|
+---+----------+---------+---+------+----------+



Modifichiamolo e salviamolo su un altro file csv.

In [32]:
# Aggiungiamo il salario annuo
csv_df = csv_df.withColumn("annual_salary", csv_df.salary * 12)

# Visualizziamo il contenuto
csv_df.show()

# Puliamo la directory di output
out = "../data/file3_mod.csv"

def clean_output(out):
    if os.path.exists(out):
        if os.path.isdir(out):
            shutil.rmtree(out)
        else:
            os.remove(out)

clean_output(out)

# Salviamolo su un altro file
csv_df.write.csv(out)

+---+----------+---------+---+------+----------+-------------+
| id|first_name|last_name|age|salary|department|annual_salary|
+---+----------+---------+---+------+----------+-------------+
|  1|      John|      Doe| 25|1000.0|        IT|      12000.0|
|  2|      Jane|    Smith| 32|1500.0|        IT|      18000.0|
|  3|       Bob|  Johnson| 45|2500.0|     Sales|      30000.0|
|  4|     Alice|    Jones| 28|1200.0|     Sales|      14400.0|
|  5|      Mark| Williams| 38|2000.0|     Sales|      24000.0|
+---+----------+---------+---+------+----------+-------------+



Notiamo che sono stati salvati diversi file. Approfondiremo meglio l'argomento quando tratteremo i metodi di partizionamento. Prima però vediamo un altro modo per manipolare i DataFrame.

In Spark è possibile manipolare i DataFrame usando la sintassi SQL. Vediamo qualche esempio.


In [33]:
# Registriamo il DataFrame come una tabella
csv_df.createOrReplaceTempView("table_A")

# Visualizziamo nome, cognome ed età dei dipendenti che hanno un salario annuo maggiore di 200000
spark.sql("SELECT first_name, last_name, age FROM table_A WHERE annual_salary > 20000.0").show()

+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|       Bob|  Johnson| 45|
|      Mark| Williams| 38|
+----------+---------+---+



Inoltre, si possono definire ed applicare delle `UDF`.

In [34]:
# Definimao la Padas UDF
@pandas_udf('integer')
def add_one(s):
    return s + 1

# Registriamo la UDF
spark.udf.register("add_one", add_one)

# Sommiamo 1 all'età di ciascun dipendente
spark.sql("SELECT first_name, last_name, add_one(age) as age_plus_one FROM table_A").show()

+----------+---------+------------+
|first_name|last_name|age_plus_one|
+----------+---------+------------+
|      John|      Doe|          26|
|      Jane|    Smith|          33|
|       Bob|  Johnson|          46|
|     Alice|    Jones|          29|
|      Mark| Williams|          39|
+----------+---------+------------+



# Metodi di partizionamento
Esistono diversi metodi di partizionamento che possono essere utilizati a seconda delle esigenze. Non esiste un metodo migliore degli altri. Sarà necessario valutare il metodo da utilizzare caso per caso.



In [35]:
# Definiamo una funzione per stampare le partizioni.
def print_partitions(df):
    print(f"Number of partitions: {df.rdd.getNumPartitions()}")
    print("Content of each partition:")
    for i, part in enumerate(df.rdd.glom().collect()):
        print(f"Partition {i}: {part}")

# Creiamo un dataframe di esempio.
departments = ["Math", "Physics", "Medicine"]
ages = [20, 21, 22, 23]
data = []
for i in range(8):
    data.append({"ID": i, "Department": departments[i % 3],  "Student": random.randint(100000, 999999), "Age": ages[i%len(ages)]})

df = spark.createDataFrame(data)

# Visualizziamo i dati.
df.show()

+---+----------+---+-------+
|Age|Department| ID|Student|
+---+----------+---+-------+
| 20|      Math|  0| 465341|
| 21|   Physics|  1| 445955|
| 22|  Medicine|  2| 571367|
| 23|      Math|  3| 145105|
| 20|   Physics|  4| 292369|
| 21|  Medicine|  5| 803052|
| 22|      Math|  6| 599024|
| 23|   Physics|  7| 742335|
+---+----------+---+-------+



1. Hash Partitioning. I dati sono partizionati in base all'hash della chiave. In questo modo, gli elementi con la stessa chiave finiscono nella stessa partizione.


In [37]:
# Supponiamo di voler creare un dataframe con 3 partizioni e vogliamo che le partizioni siano create in base al campo Department.

# Definiamo il numero di partizioni:
numPartitions = 3

# Per comodità, aggiungiamo una colonna con un hash del campo Department
hash_df = df.withColumn("Hash", hash(df["Department"]))

# Visualizziamo il dataframe
hash_df.show()

# Ci aspettiamo che i dati con lo stesso valore di Department siano nella stessa partizione
hash_df = hash_df.repartition(numPartitions, "Department")

# Visualizziamo le partizioni
print_partitions(hash_df)

# Puliamo la directory di output
out = "../data/file4.csv"

clean_output(out)

# Salviamo il DataFrame
hash_df.write.csv(out)

# Notiamo come anche il file è stato opportunamente partizionato.

+---+----------+---+-------+----------+
|Age|Department| ID|Student|      Hash|
+---+----------+---+-------+----------+
| 20|      Math|  0| 465341|1445171467|
| 21|   Physics|  1| 445955|1130882753|
| 22|  Medicine|  2| 571367|2024429527|
| 23|      Math|  3| 145105|1445171467|
| 20|   Physics|  4| 292369|1130882753|
| 21|  Medicine|  5| 803052|2024429527|
| 22|      Math|  6| 599024|1445171467|
| 23|   Physics|  7| 742335|1130882753|
+---+----------+---+-------+----------+

Number of partitions: 3
Content of each partition:
Partition 0: []
Partition 1: [Row(Age=20, Department='Math', ID=0, Student=465341, Hash=1445171467), Row(Age=22, Department='Medicine', ID=2, Student=571367, Hash=2024429527), Row(Age=23, Department='Math', ID=3, Student=145105, Hash=1445171467), Row(Age=21, Department='Medicine', ID=5, Student=803052, Hash=2024429527), Row(Age=22, Department='Math', ID=6, Student=599024, Hash=1445171467)]
Partition 2: [Row(Age=21, Department='Physics', ID=1, Student=445955, Hash=

In [38]:
# Effettivamente, non ci sono due partizioni distinte contenenti studenti che frequentano lo stesso dipartimento.
# All'interno della stessa partizione, troviamo però studenti che frequentano dipartimenti diversi.
# Questo perché gli oggetti vengono distribuiti nelle partizioni in base a: hash(key) % numPartitions.
# Aggiungiamo un'altra colonna al dataframe.
hash_df = hash_df.withColumn("Hash % numPartitions", hash(hash_df["Department"])%numPartitions)

# Visualizziamo il dataframe.
hash_df.show()

# Visualizziamo le partizioni.
print_partitions(hash_df)

# E se volessimo partizioni contenenti esclusivamente studenti dello stesso dipartimento?
# Per farlo, dobbiamo utilizzare un altro metodo di partizionamento: il Custom Partitioning.

+---+----------+---+-------+----------+--------------------+
|Age|Department| ID|Student|      Hash|Hash % numPartitions|
+---+----------+---+-------+----------+--------------------+
| 20|      Math|  0| 465341|1445171467|                   1|
| 22|  Medicine|  2| 571367|2024429527|                   1|
| 23|      Math|  3| 145105|1445171467|                   1|
| 21|  Medicine|  5| 803052|2024429527|                   1|
| 22|      Math|  6| 599024|1445171467|                   1|
| 21|   Physics|  1| 445955|1130882753|                   2|
| 20|   Physics|  4| 292369|1130882753|                   2|
| 23|   Physics|  7| 742335|1130882753|                   2|
+---+----------+---+-------+----------+--------------------+

Number of partitions: 3
Content of each partition:
Partition 0: []
Partition 1: [Row(Age=20, Department='Math', ID=0, Student=465341, Hash=1445171467, Hash % numPartitions=1), Row(Age=22, Department='Medicine', ID=2, Student=571367, Hash=2024429527, Hash % numPartiti

2.  Custom partitioning. È possibile specificare una funzione di partizionamento personalizzata per soddisfare le esigenze specifiche dell'applicazione.

In [40]:
# Per il Custom Partitioning dobbiamo usare la funzione partitionBy.
# Per poter usare questa funzione, i dati devono essere nel formato (key, value). 
custom_partitioning = df.rdd \
    .map(lambda el: (el["Department"], el)) \
    .toDF()

# Visualizziamo il dataframe.
custom_partitioning.show()

# Il metodo partitionBy consente all'utente di usare delle funzioni "custom".
# Questa funzione prende in input la key e restituisce un intero che indica la partizione in cui inserire l'elemento.
# Nel nostro caso, per semplicità, la funzione restituirà l'indice del dipartimento all'interno della lista departments.
def department_partitioning(k):
    return departments.index(k)
    

# Applichiamo il partizionamento.
custom_partitioning = custom_partitioning.rdd \
    .partitionBy(numPartitions, department_partitioning) \
    .toDF()

# Visualizziamo le partizioni.
print_partitions(custom_partitioning)

+--------+--------------------+
|      _1|                  _2|
+--------+--------------------+
|    Math|{20, Math, 0, 465...|
| Physics|{21, Physics, 1, ...|
|Medicine|{22, Medicine, 2,...|
|    Math|{23, Math, 3, 145...|
| Physics|{20, Physics, 4, ...|
|Medicine|{21, Medicine, 5,...|
|    Math|{22, Math, 6, 599...|
| Physics|{23, Physics, 7, ...|
+--------+--------------------+

Number of partitions: 3
Content of each partition:
Partition 0: [Row(_1='Math', _2=Row(Age=20, Department='Math', ID=0, Student=465341)), Row(_1='Math', _2=Row(Age=23, Department='Math', ID=3, Student=145105)), Row(_1='Math', _2=Row(Age=22, Department='Math', ID=6, Student=599024))]
Partition 1: [Row(_1='Physics', _2=Row(Age=21, Department='Physics', ID=1, Student=445955)), Row(_1='Physics', _2=Row(Age=20, Department='Physics', ID=4, Student=292369)), Row(_1='Physics', _2=Row(Age=23, Department='Physics', ID=7, Student=742335))]
Partition 2: [Row(_1='Medicine', _2=Row(Age=22, Department='Medicine', ID=2, Stu

3. Range partitioning. I dati sono ordinati in base al valore della chiave e poi divisi in partizioni di uguale intervallo.

In [41]:
# Esempio di range partitioning
range_partitioned = df.repartitionByRange(2, "Age")

print_partitions(range_partitioned)

Number of partitions: 2
Content of each partition:
Partition 0: [Row(Age=20, Department='Math', ID=0, Student=465341), Row(Age=21, Department='Physics', ID=1, Student=445955), Row(Age=20, Department='Physics', ID=4, Student=292369), Row(Age=21, Department='Medicine', ID=5, Student=803052)]
Partition 1: [Row(Age=22, Department='Medicine', ID=2, Student=571367), Row(Age=23, Department='Math', ID=3, Student=145105), Row(Age=22, Department='Math', ID=6, Student=599024), Row(Age=23, Department='Physics', ID=7, Student=742335)]
