# Progetto Big Data - e-commerce analysis

## Il dataset
Il [dataset analizzato](https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store) riporta gli eventi, generati da utenti, all'interno di un portale di e-commerce nel mese di novembre 2019. 
In esso sono riportati diversi tipi di eventi generati dagli utenti del portale, tra cui:
- visualizzazione di un prodotto
- aggiunta al carrello di prodotto
- acquisto di un prodotto
- rimozione dal carrello di un prodotto

Ogni riga del dataset rappresenta un evento e, per ognuno di essi, sono riportati l’utente che l’ha generato e diverse informazioni relative al prodotto, tra cui categoria, prezzo e brand.

#### Upload del dataset su S3
A causa di una scarsa connessione in upload (300 KB/s), è stato difficoltoso caricare il file su S3. Inizialmente si è cercato di sfruttare il meccanismo di s3 “multipart upload”. Questo era però disponibile solamente mediante AWS CLI e, probabilmente per la mancanza dei permessi necessari, non è stato possibile usufruirne. 

Il dataset è perciò stato prima splittato in partizioni da 128Mb mediante il comando **split** nativo dei sistemi unix, e poi caricato.

```split -d -a3 -b 128M 2019-Nov.csv 2019-Nov- ```

Poter spezzare l'upload in più parti ha reso più agevole gestire i casi di fallimento, non dovendo ricaricare il dataset per intero ma solo le partizioni mancanti.

## Obbiettivi dell'analisi
L'obbiettivo del progetto è analizzare il dataset in modalità batch, al fine di rilevare informazioni utili sui dati raccolti. Di seguito vengono riportate le domande che ci si è posti e a cui si è cercato di dare una risposta analizzando il dataset
- numero di utenti che hanno visualizzato un prodotto
- numero di utenti che hanno acquistato un prodotto
- numero di acquisti effettuati
- in media, quanti acquisti per utente
- in media, costo medio degli acquisti
- brand più venduti in termini di unità e di fatturato
- categorie più vendute in termini di unità e fatturato
- confronto tra prodotti visti e prodotti venduti
- confronto tra prodotti aggiunti al carrello e prodotti venduti
- classificando i brand sulla base del fatturato, individuare i fatturati totali ripartiti sulle varie categorie

## Configurazione del cluster

### Prima opzione
Configurazione del cluster di default, utilizzata anche durante i laboratori. Esso è quindi composto da 1 master e 2 nodi, entrambi con 4 core e 16 GB di RAM. Si è scelto quindi di mantenere gli stessi setting per il cluster:

- 2 executor con 3 core ciascuno (viene lasciato 1 core per gli altri servizi della macchina)
- 8GB di memoria per executor

### Seconda opzione
Si è scelto di utilizzare una configurazione alternativa, questo per approfondire e sperimentare sul tuning di CPU e memoria nella configurazione del cluster
- **Master node**: m5.xlarge (4 core e 16GB di ram)
- **Core node**: m5.x2large (8 core e 32GB di ram)

Il setting del cluster è stato definito secondo le linee guida viste a lezione, per cui si è cercato di stabilire la configurazione migliore per quanto riguarda il numero degli executor e la quantità di memoria allocata a ciascuno di essi. 

#### Tuning CPU
Per la CPU allocata, si è tenuto in considerazione il numero di nodi disponibili e il numero di core disponibili su ciascuno. Ogni nodo ha 8 core, seguendo la linea guida di 3-5 core per executor, sono state prese in considerazioni due opzioni:
- 3 core per executor
- 5 core per executor

La prima opzione consente di avere (8 - 1)/3 = 2 executor, mentre la seconda consente (8-1)/5 = 1 executor (per macchina). 
La seconda opzione permette di sfruttare al meglio le variabili broadcast, in quanto esse sono condivise in ogni executor, mentre la prima permette in termini assoluti di sfruttare un core in più per nodo. 

#### Tuning memoria
Dotare gli executor di troppa memoria può portare a problemi di garbage collection, perciò è buona prassi non superare i 64 GB per executor, anche se non è questo il problema in questo caso, dal momento che si ha a che fare con macchine dotate di 32 GB di memoria. 
Si è cercato di lasciare abbastanza memoria per il sistema operativo e i servizi hadoop perciò si è seguito il calcolo visto in teoria.

**3 core per executor, 2 executor per nodo** 
32 * 0.75 * 0.9 / 2 = 10.8 GB RAM per nodo

**5 core per executor, 1 executor per nodo**
32 * 0.75 * 0.9 / 1 = 21.6 GB RAM per nodo

La configurazione sceltà è la prima, in quanto viene utilizzato 1 core in più per nodo, quindi:

- 4 executor (2 per nodo) con 3 core ciascuno
- 8GB di memoria per executor

In [3]:
%%configure -f
{"numExecutors":4, "executorMemory":"8G",  "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1658749683351_0002,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1658749683351_0002,spark,idle,Link,Link,,✔


## Preprocessing dei dati

In [4]:
import java.util.UUID
import org.apache.spark.sql.SaveMode

object EcommerceEventParser {

  case class Event(time: String,
                   kind: String,
                   productID: Long,
                   categoryID: Long,
                   categoryCode: String,
                   brand: String,
                   price: Double,
                   userID: Long,
                   userSession: UUID)

  def apply(line: String): Option[Event] = {
    try{
      val input = line.split(',')
      Some(
        Event(
          time = input(0),
          kind = input(1),
          productID = input(2).toLong,
          categoryID = input(3).toLong,
          categoryCode = input(4),
          brand = input(5),
          price = input(6).toDouble,
          userID = input(7).toLong,
          userSession = UUID.fromString(input(8))
        )
      )
    } catch {
      case _: Throwable => None
    }
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.util.UUID
import org.apache.spark.sql.SaveMode
defined object EcommerceEventParser


In [91]:
val bucketname = "unibo-bd2122-fmazzini"
val path_ecommerce_events = "s3a://"+bucketname+"/datasets/ecommerce/*"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

//val rddEvents = sc.textFile(path_ecommerce_events).flatMap(EcommerceEventParser(_))
val rddEvents = sc.textFile(path_ecommerce_events).flatMap(EcommerceEventParser(_)).coalesce(60)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2122-fmazzini
path_ecommerce_events: String = s3a://unibo-bd2122-fmazzini/datasets/ecommerce/*
res225: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1658749683351_0002/


## Riflessioni sul numero di partizioni
Dal momento che il dataset è molto grande, si è subito valutato il numero di partizioni in cui era suddiviso. 
Come precedentemente affermato, il dataset è stato suddiviso in blocchi di 128 Mb al fine di caricarlo su S3, tuttavia Spark, per l'intero dataset, crea 269 partizioni (di circa 32 Mb ciascuna). 

In [6]:
rddEvents.getNumPartitions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res6: Int = 60


### Riduzione delle partizioni
Mediante coalesce si è ridimensionato il numero di partizioni a 60, in modo tale che, circa, ogni core avesse a che fare con 5 partizioni anzichè 22. 
Si sono valutate le prestazioni nel primo semplice job durante l'esplorazione del dataset e non sono state notate differenze significative in termini di prestazioni. 

![60 vs 269 partizioni.png](images/60_vs_269_partizioni.png)

## Sulle dimensioni del dataset

Trattandosi di un dataset più grande rispetto a quanto visto precedentemente in laboratorio, inizialmente sono state svolte diverse analisi sull'occupazione di memoria e sui tempi di risposta dell'applicativo per svolgere semplici job di count. 


In [41]:
import org.apache.spark.util.SizeEstimator

// RDD con parsing in oggetti Event
val rddEventsCached = rddEvents.cache()

rddEventsCached.count()

//Stima grandezza RDD, visibile anche dalla Spark UI
SizeEstimator.estimate(rddEventsCached)  + " bytes"

sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.util.SizeEstimator
rddEventsCached: rddEvents.type = CoalescedRDD[3] at coalesce at <console>:29
res110: Long = 67501914
res113: String = 19117464 bytes


Uno dei primi problemi che ci si è ritrovati ad affrontare è la grande differenza in occupazione di memoria del dataset raw csv memorizzato su S3, rispetto all'RDD creato dal parsing dello stesso. 

Il dataset originale è un file csv di 8.4 Gb, mentre lo stesso come RDD in cache ha dimensione di circa 16Gb.
Per mostrare quanto ottenuto sulla Spark UI, il job di count è stato eseguito due volte
![job count intero dataset non cachato.png](images/job_count_intero_dataset_non_cachato.png)
![job count intero dataset cachato.png](images/job_count_intero_dataset_cachato.png)

Inizialmente si pensava che, avendo ogni executor 8Gb di memoria da utilizzare per caching, variabili broadcast e analisi dati, la memoria fosse sufficiente. Al caching di altri RDD più piccoli però, come ad esempio il caching dei soli acquisti, si sono notati strani overhead nella memoria e nei tempi di esecuzione.

### Storage Memory vs Execution Memory
Questo ha portato ad approfondire com'è suddivisa la memoria per ogni executor. Questa, sebbene sia 8 Gb, in realtà la memoria utilizzata al fine di storage è di circa la metà. Questo perchè l'executor memory (8Gb) è [suddivisa a sua volta in due parti](https://medium.com/analytics-vidhya/apache-spark-memory-management-49682ded3d42): 
- Storage Memory 
- Execution Memory

La memoria è comunque [ripartizionata tra le due parti](https://spark.apache.org/docs/latest/tuning.html#:~:text=%20When%20no%20execution%20memory%20is%20used,%20storage%20can%20acquire%20all%20the%20available%20memory%20and%20vice%20versa.) quando la storage memory o l'execution memory non sono utilizzate, ma in sostanza si pensa che la storage memory degli executor non fosse abbastanza per lo storage del dataset in memoria quando l'executor veniva usato anche per eseguire computazioni.

### Il testing

Si è cercato di verificare il tutto in un cluster con più memoria, in cui il numero di core per executor e il numero di executor è volutamente mantenuto invariato. A tal proposito si riporta il cluster 

- **Master node**: m5.xlarge (4 core e 16GB di ram)
- **Core node**: m5.x4large (16 core e 64GB di ram)

e la configurazione scelta
- 4 executor (2 per nodo) con 3 core ciascuno
- 16GB di memoria per executor


Per ognuna delle due diverse configurazioni di memoria si è eseguito il seguente scenario.
- Dichiarazione di un RDD riguardante tutto il file, suddiviso in 60 partizioni
- Creazione di 3 differenti RDD, riguardanti articoli visualizzati, aggiunti al carrello e acquistati
- Dichiarazione del caching per i 3 RDD ed effettiva invocazione del caching tramite count()
- Ripetizione della count() sui 3 RDD cachati

I 3 RDD sono uno più piccolo dell'altro, dal momento che le visualizzazione per i prodotti sono molte di più rispetto alle aggiunte al carrello e le visualizzazioni. 

In [42]:
val rddViewsTestingCached = rddEvents.filter(_.kind == "view").cache()
val rddCartsTesting = rddEvents.filter(_.kind == "cart").cache()
val rddPurchasesTesting = rddEvents.filter(_.kind == "purchase").cache()

rddViewsTestingCached.count()
rddCartsTestingCached.count()
rddPurchasesTestingCached.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddViewsTesting: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[14] at filter at <console>:38
rddCartsTesting: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[15] at filter at <console>:38
rddPurchasesTesting: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[16] at filter at <console>:38
res117: Long = 63556058
res118: Long = 3028917
res119: Long = 916939


### Risultati ottenuti
#### Tempi job senza caching
I tempi senza caching sono alquanto paragonabili

**8Gb per executor**
![32Gb memoria no cache.png](images/testing_memory/32Gb_memoria_no_cache.png)
**16Gb per executor**
![64Gb memoria no cache.png](images/testing_memory/64Gb_memoria_no_cache.png)

In [43]:
rddViewsTestingCached.count()
rddCartsTestingCached.count()
rddPurchasesTestingCached.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res120: Long = 63556058
res121: Long = 3028917
res122: Long = 916939


#### Tempi job con caching
Si nota come i tempi con dati cachati siano nettamente inferiori quando gli executor hanno più memoria, per il semplice fatto che questi effettivamente riescono a mantenere in cache gli RDD utili e a riutilizzarli quando servono. Quando invece gli executor non hanno memoria per mantenere gli RDD richiesti in cache, devono ricalcolarli sempre al bisogno, rileggendo quindi i dati da S3)

**8Gb per executor**
![32Gb memoria cache.png](images/testing_memory/32Gb_memoria_cache.png)
**16Gb per executor**
![64Gb memoria cache.png](images/testing_memory/64Gb_memoria_cache.png)

#### Dati in input job con caching
Successivamente si è andati nel dettaglio di ogni job. Si nota come i dati in input nei job eseguiti da executor con più memoria utilizzino effettivamente dati cachati, mentre nel primo caso, i dati in input sono sempre 8.4 Gb (eccezione per il primo, in cui effettivamente si sono usati dati cachati). 

Da qui, si evince come Spark vada a rileggere i dati da S3, probabilmente perchè ha dovuto togliere gli RDD cachati per mancanza di spazio. 

**8Gb per executor - Count delle visualizzazioni**
![32Gb - cache viewCount.png](images/testing_memory/32Gb_cache_viewCount.png)
**16Gb per executor - Count delle visualizzazioni**
![64Gb - cache viewCount.png](images/testing_memory/64Gb_cache-viewCount.png)

**8Gb per executor - Count delle aggiunte al carrello**
![32Gb - cache cartCount.png](images/testing_memory/32Gb_cache_cartCount.png)
**16Gb per executor - Count delle aggiunte al carrello**
![64Gb - cache cartCount.png](images/testing_memory/64Gb_cache_cartCount.png)

**8Gb per executor - Count degli acquisti**
![32Gb - cache purchaseCount.png](images/testing_memory/32Gb_cache_purchaseCount.png)
**16Gb per executor - Count delle aggiunte al carrello**
![64Gb - cache purchaseCount.png](images/testing_memory/64Gb_cache_purchaseCount.png)


Si è comunque scelto di mantenere il cluster configurato come deciso, perchè si pensa, che per il dataset in questione la seconda configurazione sia eccessiva. Nel corso dell'analisi si è perciò cercato di ottimizzare al meglio lo spazio in cache e le computazioni. 

Per prima cosa, si è smesso di effettuare il caching del dataset quando conteneva oggetti "Event" (ad eccezioni di piccoli RDD), ma solamente un sottoinsieme composto dai soli campi utili al momento.

In [45]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Dataset exploration

In [46]:
val rddViewsCached = rddEvents.map(v => (v.productID, v.userID, v.kind)).filter(_._3 == "view").cache()

/** Numero di visualizzazioni effettuate */
val viewsCount = rddViewsCached.count()

/** Numero di prodotti visti, distinti */
val distinctItemsViewed = rddViewsCached.map(_._1).distinct().count()

/** Numero di utenti che hanno interagito con l'e-commerce */
val interactionUsers = rddViewsCached.map(_._2).distinct().count()

println(s"Numero di visualizzazioni effettuate: ${viewsCount}")
println(s"Numero di prodotti visti, distinti: ${distinctItemsViewed}")
println(s"Numero di utenti che hanno interagito con l'e-commerce: ${interactionUsers}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddViewsCached: org.apache.spark.rdd.RDD[(Long, Long, String)] = MapPartitionsRDD[18] at filter at <console>:38
viewsCount: Long = 63556058
distinctItemsViewed: Long = 190662
interactionUsers: Long = 3695598
Numero di visualizzazioni effettuate: 63556058
Numero di prodotti visti, distinti: 190662
Numero di utenti che hanno interagito con l'e-commerce: 3695598


In [47]:
/** Numero di prodotti aggiunti al carrello */
val cartCount = rddEvents.filter(_.kind == "cart").count()
println(s"Numero di prodotti aggiunti al carrello: ${cartCount}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cartCount: Long = 3028917
Numero di prodotti aggiunti al carrello: 3028917


In [83]:
val rddPurchasesCached = rddEvents.filter(_.kind == "purchase").cache()

/** Numero di prodotti acquistati */
val purchaseCount = rddPurchasesCached.count()

/** Numero di ordini effettuati */
val purchases = rddPurchasesCached.map(_.userSession).distinct().count()

///** Numero di utenti che hanno acquistato un prodotto */
val purchasingUsers = rddPurchasesCached.map(_.userID).distinct().count()

println(s"Numero di prodotti acquistati: ${purchaseCount}")
println(s"Numero totale di acquisti effettuati: ${purchases}")
println(s"In media, articoli acquistati per ogni ordine: ${purchaseCount / purchases.toDouble} ")
println(s"Numero di utenti che hanno acquistato un prodotto: ${purchasingUsers}")
println(s"Rapporto tra clienti e utenti visualizzatori: ${purchasingUsers * 100 / interactionUsers}%")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddPurchasesCached: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[226] at filter at <console>:38
purchaseCount: Long = 916939
purchases: Long = 773214
purchasingUsers: Long = 441638
Numero di prodotti acquistati: 916939
Numero totale di acquisti effettuati: 773214
In media, articoli acquistati per ogni ordine: 1.1858799763066887 
Numero di utenti che hanno acquistato un prodotto: 441638
Rapporto tra clienti e utenti visualizzatori: 11%


## Costo medio di un acquisto 
Un acquisto può comprendere più prodotti, se nella stessa sessione. 
<br> Prima di tutto si sono quindi aggregati i dati per sessione utente, al fine di accorpare i prodotti acquistati nella stessa sessione. Successivamente, è stata effettuata un'aggregazione per recuperare il totale monetario degli acquisti e il numero degli stessi. 

In [90]:
val avgPurchaseWorth = rddPurchasesCached.
        map(event => (event.userSession, event.price)).
        reduceByKey(_ + _).
        aggregate(0.0, 0)((agg, v) => (agg._1 + v._2, agg._2 + 1), (agg1, agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))

println(s"Fatturato totale: ${avgPurchaseWorth._1}")
println(s"Totale acquisti: ${avgPurchaseWorth._2}")
println(s"In media, costo degli acquisti: ${avgPurchaseWorth._1 / avgPurchaseWorth._2}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

avgPurchaseWorth: (Double, Int) = (2.7519489049999905E8,773214)
Fatturato totale: 2.7519489049999905E8
Totale acquisti: 773214
In media, costo degli acquisti: 355.91038250729946


## Brand più venduti in termini di fatturato e unità

I record sono stati filtrati, mappati e aggregati sulla base del brand. Sono state raccolte, dall'aggregazione, le somme dei vari prezzi dei prodotti venduti e il numero degli stessi.

In [35]:
println("Risultato cosi' composto: brand, fatturato, numero di vendite \n")
val topSellerBrand = rddPurchasesCached.
        filter(_.brand.nonEmpty).
        map(e => (e.brand, (e.price, 1))).
        reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)).
        map(v => (v._1, v._2._1, v._2._2)).
        sortBy(_._2, false).
        collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Risultato cosi' composto: brand, fatturato, numero di vendite 

topSellerBrand: Array[(String, Double, Int)] = Array((apple,1.2751252488000034E8,166064), (samsung,5.486988086999983E7,200027), (xiaomi,1.1259865960000014E7,68292), (lg,5239018.759999999,12879), (huawei,4780682.35,23703), (sony,3862886.3000000017,10309), (lucente,3527545.570000001,14559), (oppo,3488540.7599999965,15080), (acer,3347306.5300000003,6402), (lenovo,2698106.299999999,6547), (bosch,1832717.5000000002,8010), (asus,1665811.5899999999,3062), (artel,1642215.0400000005,9267), (hp,1341546.2399999993,4106), (indesit,1306939.3300000005,5187), (haier,1110829.0399999998,3847), (beko,904347.0499999999,3884), (dauscher,870044.89,4221), (cordiant,787330.8600000002,16983), (philips,658690.0099999999,5539), (midea,626837.9899999999,4004), (vivo,516173.52,1914), (canon,511642.11000000016,1097...


## Categorie più vendute in termini di fatturato e unità
Sono presenti categorie e sotto categorie, divise da un ".", come ad esempio "electronics.smartphone". 
<br> Si è deciso di flattare le categorie, quindi sommando i risultati non si trovano il fatturato e le vendite reali.
<br> Ad esempio, l'acquisto di uno smartphone è fatto ricadere nella categoria smartphone, ma anche nella categoria elettronica

In [50]:
val topSellerCategories = rddPurchasesCached.
            filter(_.categoryID != 0).
            flatMap(e => e.categoryCode.split('.').map((_, (e.price, 1)))).
            reduceByKey((cat1, cat2) => (cat1._1 + cat2._1, cat1._2 + cat2._2)).
            filter(_._1.nonEmpty).
            sortBy(_._2._2, false).
            collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

topSellerCategories: Array[(String, (Double, Int))] = Array((electronics,(2.0525011820999885E8,493639)), (smartphone,(1.778216616099989E8,382647)), (appliances,(1.864050166999997E7,99026)), (kitchen,(1.4119915599999988E7,64365)), (audio,(6396802.7900000075,46302)), (headphone,(5669502.490000011,40834)), (computers,(1.399433069000001E7,34477)), (video,(1.2670405990000006E7,30766)), (tv,(1.2457151160000002E7,30274)), (environment,(3678727.6999999993,25975)), (clocks,(6552737.249999996,23237)), (washer,(5801906.330000002,19772)), (notebook,(1.0678429710000006E7,18433)), (vacuum,(2762311.62,18193)), (apparel,(1181025.1199999996,14215)), (refrigerators,(4722657.300000004,13042)), (accessories,(1473207.8399999999,12873)), (shoes,(1070973.0999999996,12527)), (furniture,(2543351.42,11546)), (au...


## Query più complesse

Vengono tolti dalla memoria tutti gli RDD di cui si era fatto cache() (per i problemi sopra citati) e si lascia spazio all'RDD utile in questa fase. 
Esso contiene tutti gli eventi, ma solamente per i campi utili. 

In [82]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

val rddEventsCached = rddEvents.map(e => (e.productID, e.kind, e.brand, e.price, e.categoryCode)).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Prodotti visti, aggiunti al carrello e acquistati
Per ogni prodotto, si riporta il numero di visualizzazioni, di aggiunte al carrello e di acquisti.
Purtroppo si riporta solamente il productID, essendo il nome del prodotto mancante all'interno del dataset.
<br>Questa query vede un solo passo di shuffle, oltre che una cache, quindi vi è poco da ottimizzare. 

In [53]:
val path_output_items_metrics = "s3a://"+bucketname+"/spark/ecommerce/itemsMetrics"

rddEventsCached.
            //per ogni record, inserisco un 1 nella giusta posizione in base alla tipologia di evento
            map(e => (e._1, e._2 match {
              case "view" => (1, 0, 0)
              case "cart" => (0, 1, 0)
              case "purchase" => (0, 0, 1)
            })). //productId, (views, carts, purchases)
            reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, x._3 + y._3)). //productId, (views, carts, purchases)
            map(x => (x._1, x._2._1, x._2._2, x._2._3)).
            coalesce(1).
            toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_output_items_metrics)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_output_items_metrics: String = s3a://unibo-bd2122-fmazzini/spark/ecommerce/itemsMetrics


## Per ogni classe di brand, fatturati totali ripartiti sulle varie categorie

Su consiglio del professore, il seguente job ha lo scopo di individuare una classificazione dei brand sulla base del fatturato e successivamente capire come il fatturato totale di ciascuna classe di brand è ripartito sulle varie categorie di prodotti venduti all'interno dell'ecommerce.  

Per motivi di complessità, si è scelto di lavorare sull'intero dataset degli eventi e non solo sugli acquisti (per avere un numero dei dati maggiore e quindi un dataset di partenza più corposo). Questo è ovviamente semanticamente sbagliato, definire la somma di tutti i prezzi degli eventi "fatturato" non è corretto, ma in questo modo si hanno abbastanza dati per rendere il job corposo e valutare le performance. 

Si è quindi assunto, al fine di avere molti dati, che ogni record del dataset rappresentasse un acquisto (purchase)

In [55]:
val path_category_classbrand = "s3a://"+bucketname+"/spark/ecommerce/brandclass_category"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_category_classbrand: String = s3a://unibo-bd2122-fmazzini/spark/ecommerce/brandclass_category


### Classificazione dei brand
Inizialmente si è cercato di capire come classificare i brand sulla base del fatturato. Per farlo, si è esportato un dataset riguardante i brand e i relativi fatturati.

In [56]:
val path_brand_revenue = "s3a://"+bucketname+"/spark/ecommerce/brand_revenue"

rddEventsCached.
    map(e => (e._3, e._4)). // brand, price
    reduceByKey(_ + _). //brand, revenue
    sortBy(_._2).
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_brand_revenue)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_brand_revenue: String = s3a://unibo-bd2122-fmazzini/spark/ecommerce/brand_revenue


Successivamente sono stati osservati i fatturati e ripartiti su 4 categorie

- **Top brand**: fatturato superiore a 10 M
- **Popular brand**: fatturato superiore a 1 M e inferiore a 10M
- **Normal brand**: fatturato superiore a 100k e inferiore a 1M
- **Niche brand**: fatturato minore a 100k

In [57]:
val brandClassification = rddEventsCached.
    map(e => (e._3, e._4)). // brand, price
    reduceByKey(_ + _).
    map(e => (e._1, e._2 match {
      case f if f >= 10000000 => "Top brand" 
      case f if f < 10000000 && f >= 1000000 => "Popular"  
      case f if f < 1000000 && f >= 100000 => "Normal" 
      case _ => "Niche"
      })) //.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandClassification: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[73] at map at <console>:41


#### Primo workflow

In questo primo tentativo si riporta quanto definito inizialmente, la bozza di job iniziale. 

Questo job presenta vari problemi, a cominciare dal fatto di eseguire la join tra due rdd molto corposi. Di seguito i passi eseguiti:

- Map per brand
- Join eventi con classi di brand (join effettuata su chiave <brand>)
- Flatmap su categorie (con relativa esplosione delle categorie) e mapping su <categoria, classeBrand>
- Aggregazione per <categoria, classeBrand>

In [80]:
rddEventsCached.
    map(p => (p._3, (p._4, p._5))). // brand, (price, categorycode)
    join(brandClassification). // brand, ((price, categoryCode), brandClass)
    flatMap(p => p._2._1._2.split('.').map(x => ((x, p._2._2), p._2._1._1))). // (categoryCode, brandClass), price
    aggregateByKey((0.0,0))((agg, v) => (agg._1 + v, agg._2 + 1),((agg1,agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). // (categoryCode, brandClass), price, count
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). // categoryCode, brandClass, price, count
    //collect()
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_category_classbrand)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res186: Array[(String, String, Double, Int)] = Array((belt,Top brand,177111.0199999989,2743), (swing,Popular,3122453.580000097,25448), (refrigerators,Normal,4192127.089999985,14195), (medicine,Top brand,25597.250000000127,621), (drill,Niche,166645.96000000017,1751), (swing,Top brand,2010676.6500000872,11119), (keds,Niche,212003.22000000047,3530), (kids,Popular,5.690961849999924E7,312629), (moccasins,Popular,625346.1100000151,7833), (tablet,Popular,1.096769988999922E7,82325), (soldering,Popular,26607.570000000167,1350), (kettle,Normal,833903.9700000795,35427), (jumper,Popular,653905.8299999486,23424), (subwoofer,Top brand,1.2747114819999406E7,152745), (camera,Normal,1040772.94000002,9839), (saw,Normal,634464.4800000029,8281), (parktronic,Top brand,47699.39000000052,1096), (skates,Normal,...


**Tempo di esecuzione medio:** 1.5 min 
    
In generale non si è fatto particolare attenzione all'aggregare prima del join e della flatmap sulle categorie, questo ha causato un generale aumento del volume dei dati, il quale si è tradotto in maggior tempo per eseguire il job. Un'altro aspetto da considerare è l'elevato numero di stage, il chè è dovuto all'elevato numero di shuffle. 
    
Mediante lo shuffling i dati sono ridistribuiti tra le varie partizioni, questa è una tra le operazioni più costose perchè prevede il passaggio dei dati in rete, I/O su disco e (de)/serializzazione dei dati. 


Si nota come venga effettuato il join tra i due data set, entrambi di dimensione 15 Gb. 
Lo stage in cui si perde più tempo è il 123, dove avviene la join, non avendo aggregato in precedenza è computazionalmente dispendioso. 
![job con join.png](images/job_join.png)

#### Utilizzo del broadcast
Anzichè utilizzare join, è stata usata una variabile broadcast mantenuta in sola lettura da tutti gli executor. Attraverso questa variabile condivisa in sola lettura, non sarà necessario eseguire task di join. 

In [76]:
val brandClass = sc.broadcast(brandClassification.collectAsMap())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandClass: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(103)


#### Secondo workflow

**Tempo di esecuzione medio:** 17 secondi

Versione ottimizzata con variabili broadcast, mediante le quali non vengono fatte join. Di seguito i passi eseguiti:

- FlatMap per <categoria, classeBrand> utilizzando broadcast (utilizzato flatmap, e non map, per togliere subito l'opzionale derivante dal get su map del broadcast)
- Flatmap su categorie (con relativa esplosione delle categorie) e mapping su <categoria, classeBrand>
- Aggregazione per <categoria, classeBrand>

In [73]:
rddEventsCached.
    flatMap(p => brandClass.value.get(p._3).map(bc => (p._5, bc, p._4))). // ((categoryCode, brandClass), price)
    flatMap(x => x._1.split('.').map(y => ((y, x._2), x._3))). // ((categoryCode, brandClass), price)
    aggregateByKey((0.0, 0))((agg, v) => (agg._1 + v, agg._2 + 1), ((agg1, agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). // ((categoryCode, brandClass), price, count)
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). //categoryCode, brandClass, price, count
    //collect()
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_category_classbrand)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res173: Array[(String, String, Double, Int)] = Array((belt,Top brand,177111.01999999993,2743), (swing,Popular,3122453.579999992,25448), (refrigerators,Normal,4192127.090000001,14195), (medicine,Top brand,25597.25,621), (drill,Niche,166645.95999999993,1751), (swing,Top brand,2010676.6500000013,11119), (keds,Niche,212003.21999999994,3530), (kids,Popular,5.690961850000027E7,312629), (moccasins,Popular,625346.1100000001,7833), (tablet,Popular,1.096769989000005E7,82325), (soldering,Popular,26607.570000000007,1350), (kettle,Normal,833903.9700000024,35427), (jumper,Popular,653905.8300000008,23424), (subwoofer,Top brand,1.2747114820000025E7,152745), (saw,Normal,634464.4800000004,8281), (camera,Normal,1040772.9400000003,9839), (parktronic,Top brand,47699.38999999999,1096), (skates,Normal,3571789...


In questo workflow, facendo uso di variabili broadcast ho meno stage. Un'unico per l'analisi e uno, dopo la coalesce, per scrivere il risultato ottenuto su S3.
In input ho la stessa quantità di dati, ma mediante la aggregate questi si riducono notevolmente. Inoltre, non si hanno stage per il join, facendo uso di variabili broadcast. 

![job con broadcast.png](images/job_broadcast.png)

#### Terzo workflow

**Tempo di esecuzione medio:** 11 secondi 

Versione ottimizzata con variabili broadcast e aggregazioni prima di utilizzarle. Di seguito i passi eseguiti:

- Map per <brand, categoria>
- Aggregazione per <brand, categoria>
- Flatmap su categorie (con relativa esplosione delle categorie) e mapping su <categoria, classeBrand>
- Aggregazione per <categoria, classeBrand>

In [81]:
rddEventsCached.
    map(e => ((e._5, e._3), (e._4, 1))). // ((categoryCode, brand), (price, count))
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // ((categoryCode, brand), (price, count))
    flatMap(e => e._1._1.split('.').map(cat => ((cat, brandClass.value.get(e._1._2)), e._2))). // ((categoryCode, brandClass), (price, count))
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // ((categorycode, brandClass), (price, count))
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). // categoryCode, brandClass, price, count
    //collect()
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_category_classbrand)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In questa ultima e migliore versione del job, viene eseguita un'aggregazione sui dati prima di utilizzare il broadcast per recuperare la classe di ogni brand. 
E' possibile vedere come, mentre prima la variabile broadcast era acceduta per 15Gb (circa 67milioni di record) , ora lo è per soli 9.4Mb (circa 300k record). 

Questo perchè nel secondo workflow, si accedeva alla variabile broadcast per ogni record, dove un record era un evento. In questo caso invece (terzo workflow), aggrego prima i dati sulla base del brand e della categoria, dal momento che tra brand e classeBrand vi è una relazione 1 a 1.
![dag job con aggregazione e broadcast.png](images/dag_job_aggregazione_broadcast.png) 
![job con aggregazione e broadcast.png](images/job_aggregazione_broadcast_2.png)

## Risultati ottenuti

Di seguito si riporta quanto ottenuto con Tableau, una heatmap dove, per ogni categoria il fatturato è suddiviso sulle diverse classi di brand. 

Il risultato è stato ricalcolato sulla base delle vendite reali e non su tutti gli eventi, in modo tale da avere un'analisi veritiera su categorie, classi di brand e vendite effettive. Le analisi rimangono invariate, sono solamente i dati di partenza ad essere cambiati.

#### Analisi classi brand

In [84]:
val path_real_brand_revenue = "s3a://"+bucketname+"/spark/ecommerce/real_brand_revenue"

rddPurchasesCached.
    map(e => (e.brand, e.price)). // brand, price
    reduceByKey(_ + _). //brand, revenue
    sortBy(_._2).
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_real_brand_revenue)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_real_brand_revenue: String = s3a://unibo-bd2122-fmazzini/spark/ecommerce/real_brand_revenue


#### Classificazione dei brand, ripartizione dei fatturati sulle categorie, salvataggio su S3

In [87]:
val path_real_category_classbrand = "s3a://"+bucketname+"/spark/ecommerce/real_brandclass_category"

val real_brandClassification = rddPurchasesCached.
    map(e => (e.brand, e.price)). // brand, price
    reduceByKey(_ + _).
    map(e => (e._1, e._2 match {
      case f if f >= 1000000 => "Top brand" 
      case f if f < 1000000 && f >= 100000 => "Popular"  
      case f if f < 100000 && f >= 10000 => "Normal" 
      case _ => "Niche"
      })) //.cache()


val real_brandClass = sc.broadcast(real_brandClassification.collectAsMap())

rddPurchasesCached.
    map(e => ((e.categoryCode, e.brand), (e.price, 1))). // ((categoryCode, brand), (price, count))
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // ((categoryCode, brand), (price, count))
    flatMap(e => e._1._1.split('.').map(cat => ((cat, real_brandClass.value.get(e._1._2)), e._2))). // ((categoryCode, brandClass), (price, count))
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // ((categorycode, brandClass), (price, count))
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). // categoryCode, brandClass, price, count
    //collect()
    coalesce(1).
    toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_real_category_classbrand)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_real_category_classbrand: String = s3a://unibo-bd2122-fmazzini/spark/ecommerce/real_brandclass_category
real_brandClassification: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[247] at map at <console>:42
real_brandClass: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(131)


Questa immagine è un frammento della heatmap ottenuta, la quale è stata allegata con il resto del materiale. Si nota come, per alcune categorie, la maggior parte del fatturato non arrivi dai top brand, bensì da brand più piccoli. Questo è vero soprattutto per categorie meno "commerciali" rispetto ad altre. Nelle categorie tecnologiche invece, i top brand dominano le vendite.

![tableau_results.png](images/tableau_results.png)