# Obiettivo del progetto
Dopo aver analizzato e compreso i dati, si vuole studiare meglio la correlazione riscontrata tra distanza e prezzo e si è individuato l'obiettivo del progetto. L'obiettivo è quello di verificare se c’è una stagionalità, nella quale i prezzi per alcuni mesi sono molto più elevati rispetto ad altri o se ci sono grandi variazioni di prezzo tra i diversi mesi rispetto alle diverse distanze.

## Descrizione del job proposto
Avendo a disposizione un solo file *.csv* si è pensato di usare un pattern di tipo *self-join*:

-	**Prima aggregazione**: aggregare per ogni combinazione di aeroporto di partenza e destinazione (*startingAeroport* e *destinationAeroport*) per ottenere la distanza media di viaggio (*totalTravelDistance*). A partire dalla distanza media generare una nuova colonna che indichi la fascia di distanza del volo (breve distanza, media distanza, lunga distanza);

-	**Join**: unire il dataset originale con il risultato ottenuto;

-	**Seconda aggregazione**: aggregare per fascia di distanza e mese (*flightDate*, da cui si ricava il mese) per ottenere per ciascuna combinazione il prezzo medio.

### Caricamento libreria Spark

Per prima cosa, si deve importare la libreria spark per avviare una `spark-shell`; in seguito verrà mostrato il link tramite il quale è possibile accedere all'interfaccia utente di Spark.

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.5:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1744192500706)
SparkSession available as 'spark'


import org.apache.spark


In [None]:
// DO NOT EXECUTE - this is needed just to avoid showing errors in the following cells
val sc = spark.SparkContext.getOrCreate()

### Parser del file .csv

Nella cella sottostante è implementata una `case class Flight` avente come parametri tutte le colonne(*) presenti nel file .csv descritti nel notebook [data-exploration.ipynb](./data-exploration.ipynb) e un `FlightParser` che consentente l'estrazione delle informazioni necessarie per popolare l'oggetto RDD di Spark.

(*) nonostante vengano estratte tutte le colonne, per risolvere il `job` proposto verranno utilizzate solo alcune di esse.

In [2]:
import java.text.SimpleDateFormat
import java.util.Calendar

/**
 * Flight case class.
 * @param legId The unique identifier for the flight.
 * @param searchMonth The month in which the search was conducted.
 * @param flightMonth The month in which the flight is scheduled.
 * @param startingAirport The airport code IATA for the departure location.
 * @param destinationAirport The airport code IATA for the arrival location.
 * @param fareBasisCode The fare basis code for the flight.
 * @param travelDuration The total duration of the flight.
 * @param elapsedDays The number of days elapsed.
 * @param isBasicEconomy Whether the flight is a basic economy flight.
 * @param isRefundable Whether the flight is refundable.
 * @param isNonStop Whether the flight is non-stop.
 * @param baseFare The base fare for the flight.
 * @param totalFare The total fare for the flight, including taxes and other fees.
 * @param seatsRemaining The number of seats remaining on the flight.
 * @param totalTravelDistance The total distance of the flight.
 * @param segmentsDepartureTimeEpochSeconds The departure time of each segment expressed as epoch seconds.
 * @param segmentsDepartureTimeRaw The departure time of each segment in raw format.
 * @param segmentsArrivalTimeEpochSeconds The arrival time of each segment expressed as epoch seconds.
 * @param segmentsArrivalTimeRaw The arrival time of each segment in raw format.
 * @param segmentsArrivalAirportCode The airport code IATA for the arrival location of each segment.
 * @param segmentsDepartureAirportCode The airport code IATA for the departure location of each segment.
 * @param segmentsAirlineName The name of the airline for each segment.
 * @param segmentsAirlineCode The code of the airline for each segment.
 * @param segmentsEquipmentDescription The description of the equipment for each segment.
 * @param segmentsDurationInSeconds The duration of each segment in seconds.
 * @param segmentsDistance The distance of each segment.
 * @param segmentsCabinCode The cabin code for each segment.
 */
case class Flight(
    legId: String,
    searchMonth: Int,
    flightMonth: Int,
    startingAirport: String,
    destinationAirport: String,
    fareBasisCode: String,
    travelDuration: String,
    elapsedDays: Int,
    isBasicEconomy: Boolean,
    isRefundable: Boolean,
    isNonStop: Boolean,
    baseFare: Double,
    totalFare: Double,
    seatsRemaining: Int,
    totalTravelDistance: Double,
    segmentsDepartureTimeEpochSeconds: String,
    segmentsDepartureTimeRaw: String,
    segmentsArrivalTimeEpochSeconds: String,
    segmentsArrivalTimeRaw: String,
    segmentsArrivalAirportCode: String,
    segmentsDepartureAirportCode: String,
    segmentsAirlineName: String,
    segmentsAirlineCode: String,
    segmentsEquipmentDescription: String,
    segmentsDurationInSeconds: String,
    segmentsDistance: String,
    segmentsCabinCode: String
) extends Serializable

/**
 * Flight parser.
 */
object FlightParser extends Serializable {

  val comma = ","

  /**
   * Convert from date (String) to month (Int).
   * @param dateString the date
   * @return the month
   */
  def monthFromDate(dateString: String): Int = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    val date = sdf.parse(dateString.trim)
    val cal = Calendar.getInstance()
    cal.setTime(date)
    cal.get(Calendar.MONTH) + 1
  }

  /**
   * Function to parse flights records.
   * @param line that has to be parsed
   * @return Flight object, None in case of input errors
   */
  def parseFlightLine(line: String): Option[Flight] = {
    try {
      val columns = line.split(comma)
      Some(
        Flight(
          legId = columns(0).trim,
          searchMonth = monthFromDate(columns(1)),
          flightMonth = monthFromDate(columns(2)),
          startingAirport = columns(3).trim,
          destinationAirport = columns(4).trim,
          fareBasisCode = columns(5).trim,
          travelDuration = columns(6).trim,
          elapsedDays = columns(7).trim.toInt,
          isBasicEconomy = columns(8).trim.toBoolean,
          isRefundable = columns(9).trim.toBoolean,
          isNonStop = columns(10).trim.toBoolean,
          baseFare = columns(11).trim.toDouble,
          totalFare = columns(12).trim.toDouble,
          seatsRemaining = columns(13).trim.toInt,
          totalTravelDistance = columns(14).trim.toDouble,
          segmentsDepartureTimeEpochSeconds = columns(15).trim,
          segmentsDepartureTimeRaw = columns(16).trim,
          segmentsArrivalTimeEpochSeconds = columns(17).trim,
          segmentsArrivalTimeRaw = columns(18).trim,
          segmentsArrivalAirportCode = columns(19).trim,
          segmentsDepartureAirportCode = columns(20).trim,
          segmentsAirlineName = columns(21).trim,
          segmentsAirlineCode = columns(22).trim,
          segmentsEquipmentDescription = columns(23).trim,
          segmentsDurationInSeconds = columns(24).trim,
          segmentsDistance = columns(25).trim,
          segmentsCabinCode = columns(26).trim
        )
      )
    } catch {
      case e: Exception =>
//        println(s"Error during parsing of the line '$line': ${e.getMessage}")
        None
    }
  }
}

import java.text.SimpleDateFormat
import java.util.Calendar
defined class Flight
defined object FlightParser


### Caricamento dei dati

Con la seguente cella si effettua il caricamento del file *itineraries-sample\<N\>.csv*, dove con N si intende la percentuale di dati campionati dal file originale di 31,09 GB.

I file disponibili sono scaricabili dalla cartella [Datasets](https://liveunibo-my.sharepoint.com/:f:/g/personal/giulia_nardicchia_studio_unibo_it/Ei2686kRO3JFrY-4LnImGpwBtge9FRErDnIgvT2h2QB-Pg?e=VrufWl) su OneDrive e hanno percentuale: `02`, `16` e `33`.

In [3]:
val datasetsPath = "../../../datasets/"
val fileName = "itineraries-sample02.csv"

val rawData = sc.textFile(datasetsPath + fileName)

datasetsPath: String = ../../../datasets/
fileName: String = itineraries-sample02.csv
rawData: org.apache.spark.rdd.RDD[String] = ../../../datasets/itineraries-sample02.csv MapPartitionsRDD[1] at textFile at <console>:30


Trasformazione di un RDD composto da dati grezzi (*rawData*) in un RDD di oggetti `Flight`. La funzione `FlightParser.parseFlightLine` analizza ogni riga. `flatMap` appiattisce i risultati, scartando automaticamente le righe non valide.

In [4]:
val rddFlights = rawData.flatMap(FlightParser.parseFlightLine)

rddFlights: org.apache.spark.rdd.RDD[Flight] = MapPartitionsRDD[2] at flatMap at <console>:28


Per verificare che non ci siano stati problemi di *parsing*, con la cella seguente si vuole eseguire un'azione. La funzione `count()` calcola il numero di righe valide.

In [5]:
rddFlights.count()

res0: Long = 1622775


### Prima aggregazione

Innanzitutto si utilizza `map` per ridurre i dati da manipolare, quindi vengono scartate tutte le colonne che non servono a svolgere il job proposto e per trasformare i dati di tipo (chiave, valore). Si vuole aggregare per ogni combinazione di aeroporto di partenza e destinazione (*startingAeroport* e *destinationAeroport*) per ottenere la distanza media di viaggio (*totalTravelDistance*). Per fare questo si utilizza la funzione `aggregateByKey()` passando come argomenti: (0.0, 0), i due valori iniziali, una funzione *combining* (*map*) per indicare come i singoli valori sono combinati con l'accumulatore e una funzione di *merging* (*reduce*) per indicare come gli accumulatori per le differenti partizioni devono essere uniti. Come funzioni di *combining* sono state passate due funzioni **associative** e **commutative** quali la somma e il conteggio, una volta ottenuti questi risultati si effettua il rapporto per calcolare la media delle distanze.

In [6]:
val avgDistances = rddFlights
  .map(flight => ((flight.startingAirport, flight.destinationAirport), flight.totalTravelDistance))
  .aggregateByKey((0.0, 0))(
    (acc, travelDistance) => (acc._1 + travelDistance, acc._2 + 1),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  )
  .mapValues { case (sumDistance, count) => sumDistance / count }

avgDistances: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[5] at mapValues at <console>:33


In [9]:
avgDistances.take(10)

res3: Array[((String, String), Double)] = Array(((IAD,LGA),999.9239723535832), ((MIA,OAK),3314.2077417173764), ((EWR,LAX),2576.270231126667), ((DFW,CLT),1309.5679179044018), ((BOS,ATL),1040.5552817985833), ((EWR,PHL),1207.7278366502971), ((ORD,IAD),905.9191603394372), ((OAK,BOS),3169.411759118541), ((MIA,CLT),1221.4948882998865), ((ORD,BOS),931.8038178038178))


A partire dalla distanza media, si intende generare una nuova colonna che specifichi la fascia di distanza del volo, distinguendo tra breve distanza, media distanza e lunga distanza.

Poiché usare valori numerici *hard coded* rappresenta una *bad practice*, si è deciso di utilizzare il minimo, il massimo e il numero di classi per calcolare dinamicamente l'intervallo delle fasce di distanza. Tuttavia, si è consapevoli che, nel caso in cui il codice venisse riutilizzato su un diverso dataset, gli intervalli calcolati potrebbero perdere di significato. Le distanze, infatti, potrebbero variare significativamente, facendo sì che uno stesso percorso, classificato come breve in un dataset, risulti invece di media o lunga distanza in un altro. Rendendo così il confronto tra i due dataset non più fattibile.

Per ottenere il minimo e il massimo su `avgDistances` appena calcolato, si utilizza la funzione `aggregate()` specificando come valore iniziale per il minimo `Double.MaxValue` mentre per il massimo `Double.MinValue`. Per confrontare, invece, i valori tra di loro si utilizza il package `scala.math` con la funzione `math.min()` e `math.max()`.

In [10]:
val (minDistance, maxDistance) = avgDistances
    .aggregate((Double.MaxValue, Double.MinValue))(
        (acc, value) => (math.min(acc._1, value._2), math.max(acc._2, value._2)),
        (acc1, acc2) => (math.min(acc1._1, acc2._1), math.max(acc1._2, acc2._2))
    )

minDistance: Double = 393.6039441248973
maxDistance: Double = 3442.6743515850144


Le distanze sono state suddivise in tre fasce: breve, media e lunga (*numClasses = 3*).

Per calcolare il *range* in maniera equidistante si è calcolato la differenza tra la distanza massima e minima e il rapporto tra il valore calcolato e il numero di classi.

In [11]:
val numClasses = 3
val range = (maxDistance - minDistance) / numClasses

numClasses: Int = 3
range: Double = 1016.3568024867058


Per calcolare l'intervallo sono stati adottati i seguenti limiti:
- **Breve**: se la distanza media è inferiore a *minimo + intervallo*;
- **Media**: se la distanza media è compresa tra *minimo + intervallo; minimo + 2 * intervallo*;
- **Lunga**: se la distanza media è superiore a *minimo + (numero classi - 1) * intervallo*.

In [12]:
val classifiedDistances = avgDistances.mapValues {
  case d if d < minDistance + range => "short"
  case d if d < minDistance + 2 * range => "medium"
  case _ => "long"
}

classifiedDistances: org.apache.spark.rdd.RDD[((String, String), String)] = MapPartitionsRDD[6] at mapValues at <console>:29


In [13]:
classifiedDistances.take(10)

res4: Array[((String, String), String)] = Array(((IAD,LGA),short), ((MIA,OAK),long), ((EWR,LAX),long), ((DFW,CLT),short), ((BOS,ATL),short), ((EWR,PHL),short), ((ORD,IAD),short), ((OAK,BOS),long), ((MIA,CLT),short), ((ORD,BOS),short))


### Join + Seconda aggregazione

Si unisce il dataset originale con il risultato ottenuto, aggregando poi per fascia di distanza e mese (*flightDate*, da cui si ricava il mese) si ottiene per ciascuna combinazione il prezzo medio.

Per fare ciò, si utilizza la funzione `join` per unire i due dataset, successivamente si effettua un `map` per ottenere una nuova coppia chiave-valore, in cui la chiave è composta dal mese e dalla classificazione della distanza, mentre il valore è composto dal prezzo totale e il conteggio. Si effettua un `reduceByKey` per aggregare i valori in base alla chiave e calcolare la somma totale e il conteggio. Alla `reduceByKey` è stata aggiunta una funzione di arrotondamento per evitare errori dovuti alla precisione dei numeri in virgola mobile. Infine, si effettua un `map` per ottenere il risultato finale.

In [14]:
val resultJob = rddFlights
  .map(flight => ((flight.startingAirport, flight.destinationAirport), (flight.flightMonth, flight.totalFare)))
  .join(classifiedDistances)
  .map {
    case (_, ((flightMonth, totalFare), classification)) => ((flightMonth, classification), (totalFare, 1))
  }
  .reduceByKey((acc, totalFare) =>
    (BigDecimal(acc._1 + totalFare._1).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble,
      acc._2 + totalFare._2))
  .map {
    case ((flightMonth, classification), (sumTotalFare, count)) => (flightMonth, classification, sumTotalFare / count)
  }

resultJob: org.apache.spark.rdd.RDD[(Int, String, Double)] = MapPartitionsRDD[13] at map at <console>:37


In [15]:
resultJob.collect()

res5: Array[(Int, String, Double)] = Array((9,medium,318.3760535316619), (10,short,272.90626195986323), (8,short,285.3570154260343), (5,short,294.7269176582328), (11,medium,278.41263953314376), (6,medium,432.32829361007856), (6,long,602.9717814924518), (4,long,477.23622320768663), (8,medium,347.8695456922884), (7,short,314.0343191116306), (10,long,419.78446130428574), (4,short,310.6395352791159), (4,medium,347.80798695523276), (10,medium,319.77589381861407), (5,long,539.7802670253963), (8,long,467.07980860504125), (9,long,409.2186325749457), (11,short,232.2905439102314), (7,medium,411.2507502278304), (5,medium,387.795496742671), (9,short,268.8361475656684), (7,long,557.3582296212151), (6,short,315.64908727581553), (11,long,392.619358541526))


### Salvataggio dei risultati su file

Innanzitutto si importa la modalità di salvataggio `org.apache.spark.sql.SaveMode`, poi si creano le cartelle dentro al quale mettere i risultati.

In [16]:
import org.apache.spark.sql.SaveMode

val job = "../../../output/job"

import org.apache.spark.sql.SaveMode
job: String = ../../../output/job


A partire dall'*RDD* si applica la funzione `coalesce(1)` per ridurre il partizionamento a una sola partizione in modo da ottenere un singolo file. Per salvare il file, bisogna usare delle API che appartengono all'oggetto *DataFrame*, ecco perché viene effettuato un *mapping* con la funzione `toDF()`. E' poi possibile richiamare l'interfaccia `write` per salvare il contenuto del *DataFrame* nel formato *.csv*, con il metodo di salvataggio `SaveMode.Overwrite` (se il file esiste già al momento del salvataggio, viene sovrascritto), all'interno della cartella precedentemente specificata.

In [17]:
resultJob
  .coalesce(1)
  .toDF().write.format("csv").mode(SaveMode.Overwrite).save(job)

## Job Not Optimized
L'ordine con cui sono state eseguite alcune operazioni, è migliorabile e quindi a partire dal codice scritto nelle precedenti celle, si è proceduto a *"rifattorizzare"* l'implementazione. Di seguito il *job* proposto non ottimizzato, in cui il nome delle variabili usate è il medesimo con l'aggiunta del suffisso NO (*Not Optimized*).

Differenze:
   - Innanzitutto, è stato migliorato il *mapping* iniziale, così da evitare di eseguire un ulteriore *mapping* in seguito per effettuare il *join*. In pratica vengono estratte le colonne a cui si è interessati subito (*totalTravelDistance, flightMonth, totalFare*) e viene usato quello come dataset di partenza.

   - Il *join*, adesso viene eseguito sul risultato ottenuto con la classificazione, al contrario della modalità precedente.

In [18]:
val numClassesNO = 3
val jobNotOptimized = "../../../output/jobNotOptimized"

val rddFlightsNO = rawData.flatMap(FlightParser.parseFlightLine)
    // (k,v) => (startingAirport, destinationAirport), (totalTravelDistance, flightDate, totalFare))
    .map(flight => ((flight.startingAirport, flight.destinationAirport),
                    (flight.totalTravelDistance, flight.flightMonth, flight.totalFare)))

val avgDistancesNO = rddFlightsNO
    .aggregateByKey((0.0, 0))(
        (acc, travelDistance) => (acc._1 + travelDistance._1, acc._2 + 1),
        (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )
    // (k,v) => ((startingAirport, destinationAirport), avgDistance)
    .mapValues { case (sumDistance, count) => sumDistance / count }

val (minDistanceNO, maxDistanceNO) = avgDistancesNO
    .aggregate((Double.MaxValue, Double.MinValue))(
        (acc, avgDistance) => (math.min(acc._1, avgDistance._2), math.max(acc._2, avgDistance._2)),
        (acc1, acc2) => (math.min(acc1._1, acc2._1), math.max(acc1._2, acc2._2))
    )

val rangeNO = (maxDistanceNO - minDistanceNO) / numClassesNO

val resultJobNotOptimized = avgDistancesNO
    .mapValues {
      case d if d < minDistanceNO + rangeNO => "short"
      case d if d < minDistanceNO + 2 * rangeNO => "medium"
      case _ => "long"
    } // (k,v) => ((startingAirport, destinationAirport), classification)
    .join(rddFlightsNO)
    .map { case (_, (classification, (_, month, totalFare))) => ((month, classification), (totalFare, 1)) }
    .reduceByKey((acc, totalFare) =>
      (BigDecimal(acc._1 + totalFare._1).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble,
        acc._2 + totalFare._2))
    .map { case ((month, classification), (sumTotalFare : Double, count: Int)) => (month, classification, sumTotalFare / count) }
    .coalesce(1)
    .toDF().write.format("csv").mode(SaveMode.Overwrite).save(jobNotOptimized)

numClassesNO: Int = 3
jobNotOptimized: String = ../../../output/jobNotOptimized
rddFlightsNO: org.apache.spark.rdd.RDD[((String, String), (Double, Int, Double))] = MapPartitionsRDD[19] at map at <console>:34
avgDistancesNO: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[21] at mapValues at <console>:43
minDistanceNO: Double = 393.6039441248973
maxDistanceNO: Double = 3442.6743515850144
rangeNO: Double = 1016.3568024867058
resultJobNotOptimized: Unit = ()


## Considerazioni sulle ottimizzazioni

Si vogliono fare alcune considerazioni riguardo le ottimizzazioni che sono disponibili in Spark. Durante il corso sono state analizzate diverse tecniche per migliorare le prestazioni. Ogni tecnica presenta vantaggi e svantaggi che devono essere valutati in base al contesto specifico.

### Cache / Persist

Innanzitutto, sono state considerate le tecniche per mantenere in memoria i dati tramite `cache` o `persist`. Le operazioni di caching o persistenza sono utili quando i dati vengono riutilizzati più volte. L'utilizzo di questo meccanismo riduce il costo computazionale, evitando il ricalcolo delle stesse operazioni multiple volte e migliorando le prestazioni in caso di ripetuti accessi agli stessi dati. Tuttavia, bisogna evitare di utilizzare `cache` o `persist` su dataset di grandi dimensioni se non si prevede che vengano riutilizzati più volte, in quanto ciò potrebbe causare un uso eccessivo della memoria, rallentando l'intero sistema o addirittura facendo fallire il job.

Differenze tra i vari metodi:
- **cache**: di default usa il livello di memorizzazione `MEMORY_ONLY`. I dati vengono memorizzati completamente in memoria e non vengono scritti su disco. Questo è utile quando l'*RDD* è piccolo e può essere completamente caricato in memoria. Se i dati non possono essere memorizzati in memoria si avrà un errore di `OutOfMemoryError`.
- **persist**: consente di specificare il livello di memorizzazione desiderato, tra cui: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, `MEMORY_AND_DISK_SER`, `DISK_ONLY`. Tra questi il metodo più interessante è `persist(MEMORY_AND_DISK_SER)` che usa la memoria se disponibile, altrimenti scrive su disco in formato serializzato. I dati vengono memorizzati in modo più compatto grazie alla serializzazione, ma introduce una latenza aggiuntiva per effettuare la deserializzazione. `MEMORY_AND_DISK_SER` è utile quando i dati sono troppo grandi per rimanere completamente in memoria, o se l'elaborazione dei dati è costosa e il riutilizzo dei dati è frequente.

### PartitionBy / Repartition / Coalesce
Le operazioni di partizionamento sono utili quando si desidera ottimizzare la distribuzione dei dati su più partizioni. L'ottimizzazione delle partizioni può ridurre significativamente il costo delle operazioni di shuffle, che sono spesso le più costose in termini di tempo di esecuzione. Una distribuzione non uniforme dei dati su partizioni può causare squilibri nel carico di lavoro tra i nodi, portando a una scarsa performance.

Differenze tra i vari metodi:
- **partitionBy**: questo metodo partiziona i dati in base alla chiave, ed è particolarmente utile quando si effettuano operazioni come `join()` su chiavi. Utilizzare un `HashPartitioner()` aiuta a ridurre il costo dello shuffle, perché i dati con la stessa chiave finiranno nella stessa partizione. Inoltre, evita lo shuffle tra nodi, se l'*RDD* è già partizionato in modo compatibile.
- **repartition**: è un'operazione costosa che implica un completo reshuffling dei dati, cambiando il numero di partizioni. È utile quando si vuole migliorare la distribuzione dei dati o quando si ha un numero di partizioni iniziale troppo basso. Tuttavia, se non usata con attenzione, può risultare inefficiente per grandi set di dati.
- **coalesce**: è meno costoso rispetto a `repartition()`, poiché cerca di ridurre il numero di partizioni combinando le partizioni esistenti senza fare uno shuffle completo. È utile quando si vuole ridurre il numero di partizioni, ad esempio, prima di scrivere i dati su disco. Tuttavia, non può aumentare il numero di partizioni.

### Broadcast Variables

Le **broadcast variables** sono utili per inviare copie di dati *read-only* a tutti i nodi in modo efficiente, senza la necessità di ricalcolarli ripetutamente durante ogni task. Sono adatte per dati immutabili che sono letti da tutti i task. L'uso di broadcast riduce il traffico di rete e migliora le performance. Tuttavia, bisogna evitare di utilizzare le variabili broadcast con dataset troppo grandi, poiché i dati verranno copiati su tutti i nodi e potrebbero sovraccaricare la memoria di ciascun nodo, riducendo le prestazioni complessive.


## Job Optimized

Una volta capite quali tecniche di ottimizzazione adottare per migliorare le performance, il *main job* è stato modificato ottenendo così il seguente codice.

Differenze rispetto al *job* proposto non ottimizzato:
- **PartitionBy**: si è deciso di utilizzare il meccanismo di partizionamento `partitionBy` passando in ingresso un `HashPartitioner` in modo tale da assicurare che i dati con la stessa chiave finiscano nella stessa partizione. Questo aiuta a ridurre il costo dello shuffle durante operazioni costose come `aggregateByKey` e `join` subito dopo. Inoltre, per rendere il `join` più efficiente si dovrebbe utilizzare lo stesso partizionamento per entrambi gli *RDD* (su cui verrà applicata l'operazione di `join`) ma dato che il dataset è lo stesso, è stato utilizzato il partizionamento una sola volta all'inizio.
- **Cache / Persist**: si è deciso di utilizzare il meccanismo di persistenza con il livello di memorizzazione `MEMORY_AND_DISK_SER` per mantenere in memoria i dati, evitando di rieseguire il `map` e il `partitionBy` per quanto riguarda `rddFlights`. Siccome in questo punto la quantità dei dati non è poca si è scelto di non fare memorizzazione `MEMORY_ONLY`. Anche per quanto riguarda `avgDistances` si è deciso di utilizzare `MEMORY_AND_DISK_SER`, per evitare che venga rieseguita l'operazione di `aggregateByKey`.
- **Broadcast Variables**: si è deciso di utilizzare il meccanismo di broadcast per le variabili `minDistance` e il `range` per la classificazione delle distanze, in quanto i valori sono i medesimi per tutti, così da non doverli ricalcolare per ogni task.

In [19]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.HashPartitioner

val numPartitions = sc.defaultParallelism
val p = new HashPartitioner(numPartitions)
val jobOptimized = "../../../output/jobOptimized"
val numClassesO = 3

val rddFlightsO = rawData.flatMap(FlightParser.parseFlightLine)
  // (k,v) => (startingAirport, destinationAirport), (totalTravelDistance, flightDate, totalFare))
  .map(flight => ((flight.startingAirport, flight.destinationAirport),
    (flight.totalTravelDistance, flight.flightMonth, flight.totalFare)))
  .partitionBy(p)
  .persist(MEMORY_AND_DISK_SER)

val avgDistancesO = rddFlightsO
  .aggregateByKey((0.0, 0))(
    (acc, travelDistance) => (acc._1 + travelDistance._1, acc._2 + 1),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  )
  // (k,v) => ((startingAirport, destinationAirport), avgDistance)
  .mapValues { case (sumDistance, count) => sumDistance / count }
  .persist(MEMORY_AND_DISK_SER)

val (minDistanceO, maxDistanceO) = avgDistancesO
  .aggregate((Double.MaxValue, Double.MinValue))(
    (acc, avgDistance) => (Math.min(acc._1, avgDistance._2), Math.max(acc._2, avgDistance._2)),
    (acc1, acc2) => (Math.min(acc1._1, acc2._1), Math.max(acc1._2, acc2._2))
  )

val broadcastStats = sc.broadcast((minDistanceO, (maxDistanceO - minDistanceO) / numClassesO))

val resultJobOptimized = avgDistancesO
  .mapValues { d =>
    val (minDist, range) = broadcastStats.value
    if (d < minDist + range) "short"
    else if (d < minDist + 2 * range) "medium"
    else "long"
  }
  // (k,v) => ((startingAirport, destinationAirport), classification)
  .join(rddFlightsO)
  .map { case (_, (classification, (_, month, totalFare))) => ((month, classification), (totalFare, 1)) }
  .reduceByKey((acc, totalFare) =>
    (BigDecimal(acc._1 + totalFare._1).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble, acc._2 + totalFare._2))
  .map { case ((month, classification), (sumTotalFare, count)) => (month, classification, sumTotalFare / count) }
  .coalesce(1)
  .toDF().write.format("csv").mode(SaveMode.Overwrite).save(jobOptimized)

import org.apache.spark.sql.SaveMode
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.HashPartitioner
numPartitions: Int = 8
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@8
jobOptimized: String = ../../../output/jobOptimized
numClassesO: Int = 3
rddFlightsO: org.apache.spark.rdd.RDD[((String, String), (Double, Int, Double))] = ShuffledRDD[35] at partitionBy at <console>:44
avgDistancesO: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[37] at mapValues at <console>:53
minDistanceO: Double = 393.6039441248973
maxDistanceO: Double = 3442.6743515850144
broadcastStats: org.apache.spark.broadcast.Broadcast[(Double, Double)] = Broadcast(18)
resultJobOptimized: Unit = ()


Questa cella, serve a liberare la memoria ed è stata eseguita solo quando necessario per motivi di *debugging*.

In [6]:
// sc.getPersistentRDDs.foreach(_._2.unpersist())