# Progetto Big Data - Big Sales Data

## Il dataset

Il [dataset analizzato "Liquor Sales.csv"](https://www.kaggle.com/datasets/pigment/big-sales-data), contiene informazioni sulle vendite di bevande alcoliche effettuate da diversi negozi in America nel periodo che va da Gennaio 2012 a Settembre 2020. La grandezza del dataset è di 4.77 GB e contiene molte informazioni sulle transazioni di vendita, tra cui il tipo di bevanda alcolica venduta, il prezzo di vendita, la quantità venduta, la data e l'ora della transazione e il negozio in cui la transazione è stata effettuata.

Il dataset può essere utilizzato per analizzare il mercato delle bevande alcoliche in America e per comprendere le tendenze di vendita nel corso del tempo. Ad esempio, si può valutare la popolarità di diversi tipi di bevande alcoliche in base alla loro quota di mercato, oppure si può esaminare come le vendite di alcune bevande alcoliche sono influenzate da eventi stagionali come le festività.

Inoltre, il dataset può essere utilizzato per fare previsioni sulle vendite future di bevande alcoliche o sul loro prezzo medio, ad esempio utilizzando tecniche di analisi delle serie temporali per identificare tendenze e stagionalità.

In generale, il dataset "Liquor Sales.csv" è una risorsa preziosa per coloro che vogliono studiare il mercato delle bevande alcoliche in America o per coloro che vogliono effettuare analisi di mercato e previsioni.


## Obbiettivi dell'analisi

L'obiettivo del progetto è analizzare il dataset al fine di rilevare informazioni utili sui dati raccolti. Di seguito vengono riportate le query esplorative che ci siamo posti e a cui si è cercato di dare una risposta analizzando il dataset:

- Capire che il numero degli stores number e degli stores name è diverso (es: negozio1(nome1,nome2,nome3));
- Capire che il numero degli items number e degli items name è diverso;
- Capire che il numero degli country number e degli country name è diverso;
- Capire che il numero degli city number e degli city name è diverso;
- Capire che il numero degli category number e degli category name è diverso;
- Capire che il numero degli vendor number e degli vendor name è diverso;

Successivamente abbiamo svolto delle query più complesse:

- In Media, quanti prodotti sono stati venduti per negozio?;
- In Media, quanti litri sono stati venduti per stato?;
- In Media, quante bottiglie sono state vendute per categoria?;
- Items più venduti in termini di fatturato e unità;
- Categorie più vendute in termini di fatturato e unità;

Per poi terminare con i due job:

- (job1) Svolgere un’analisi sul prezzo medio di ogni singolo prodotto per ogni anno;
- (job2) Svolgere una classificazione dei prodotti in base al fatturato e calcolare il fatturato totale per ogni categoria e fascia di prezzo.

## Configurazione del cluster

Configurazione del cluster di default, utilizzata anche durante i laboratori. Esso è composto da 1 master e 2 nodi slave, entrambi con 4 core e 16 GB di RAM. In questo caso si è scelto di mantenere gli stessi setting per il cluster in quanto ottimali per quanto visto in teoria:

- 1 master
- 2 executor con 4 core ciascuno
- 16GB di memoria per executor


## Preprocessing dei dati

In [1]:
val bucketname = "unibo-bigdata-project"
val pathSales = "s3a://"+bucketname+"/datasets/Liquor_Sales.csv"

sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1678372684627_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bigdata-project

pathSales: String = s3a://unibo-bigdata-project/datasets/Liquor_Sales.csv

res3: String = application_1678372684627_0001

res5: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1678372684627_0001/


In [2]:
import org.apache.spark.sql.SaveMode

object LiquorSalesParser {
    
  /** Function to parse stores records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing storeNumber, storeName,, productsID none in case of input errors
   */
    
  def parseStores(line: String) : Option[(Long, String, String)] = {
    try {
      val input = line.split(',')
      Some(input(2).trim.toLong, input(3),  input(16))
    } catch {
      case _: Exception => None
    }
  }  
    
    
  /** Function to parse items (products) records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing itemNumber, itemName, Sales($), categoryName none in case of input errors
   */
    
    def parseItems(line: String) : Option[(Long, String, Double, String, Int)] = {
      try {
        val input = line.split(',')
        val date = input(1)
        val year = date.split("/").last.toInt
        Some(input(14).trim.toLong, input(15), input(22).trim.toDouble, input(11), year)
      } catch {
        case _: Exception => None
      }
    } 


/** Function to parse country records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing countryNumber, countryName, VolumeSold(L), Sales($), date
   */
    
  def parseCountries(line: String) : Option[(String, String, Double, Double, String)] = {
    try {
      val input = line.split(',')
      Some(input(9), input(10), input(23).trim.toDouble, input(22).trim.toDouble, input(2))
    } catch {
      case _: Exception => None
    }
  }  
    
    
/** Function to parse city records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing zipCode, cityName
   */
    
  def parseCities(line: String) : Option[(String, Long)] = {
    try {
      val input = line.split(',')
      Some(input(6), input(7).trim.toLong)
    } catch {
      case _: Exception => None
    }
  }
    
    
/** Function to parse categories records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing categoryNumber, categoryName, bottleSold,  Sales($)
   */
    
  def parseCategories(line: String) : Option[(Long, String, Long, Double)] = {
    try {
      val input = line.split(',')
      Some(input(11).trim.toLong, input(12), input(21).trim.toLong, input(22).trim.toDouble)
    } catch {
      case _: Exception => None
    }
  }
    
    
/** Function to parse vendors records
   *
   *  @param line line that has to be parsed
   *  @return tuple containing vendorNumber, vendorName
   */
    
  def parseVendors(line: String) : Option[( Long, String)] = {
    try {
      val input = line.split(',')
      Some(input(13).trim.toLong, input(14))
    } catch {
      case _: Exception => None
    }
  }    
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode

defined object LiquorSalesParser


In [3]:
val rddStores = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseStores)
val rddItems = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseItems)
val rddCountries = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseCountries)
val rddCities = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseCities)
val rddCategories = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseCategories)
val rddVendors = sc.textFile(pathSales).flatMap(LiquorSalesParser.parseVendors)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddStores: org.apache.spark.rdd.RDD[(Long, String, String)] = MapPartitionsRDD[2] at flatMap at <console>:26

rddItems: org.apache.spark.rdd.RDD[(Long, String, Double, String, Int)] = MapPartitionsRDD[5] at flatMap at <console>:26

rddCountries: org.apache.spark.rdd.RDD[(String, String, Double, Double, String)] = MapPartitionsRDD[8] at flatMap at <console>:26

rddCities: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[11] at flatMap at <console>:26

rddCategories: org.apache.spark.rdd.RDD[(Long, String, Long, Double)] = MapPartitionsRDD[14] at flatMap at <console>:26

rddVendors: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[17] at flatMap at <console>:26


## Dataset exploration

In [4]:
val rddStoresCached = rddStores.cache()
"Number of Stores group by their name: " + rddStoresCached.map(x => (x._2)).distinct().count()
"Number of Stores group by their code: " + rddStoresCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddStoresCached: rddStores.type = MapPartitionsRDD[2] at flatMap at <console>:26

res9: String = Number of Stores group by their name: 2632

res10: String = Number of Stores group by their code: 2484


In [5]:
val rddItemsCached = rddItems.cache()
"Number of Items group by their name: " + rddItemsCached.map(x => (x._2)).distinct().count()
"Number of Items group by their code: " + rddItemsCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddItemsCached: rddItems.type = MapPartitionsRDD[5] at flatMap at <console>:26

res13: String = Number of Items group by their name: 7834

res14: String = Number of Items group by their code: 8322


In [6]:
val rddCountriesCached = rddCountries.cache()
"Number of countries group by their name: " + rddCountriesCached.map(x => (x._2)).distinct().count()
"Number of countries group by their code: " + rddCountriesCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCountriesCached: rddCountries.type = MapPartitionsRDD[8] at flatMap at <console>:26

res15: String = Number of countries group by their name: 277

res16: String = Number of countries group by their code: 298


In [7]:
val rddCitiesCached = rddCities.cache()
"Number of cities group by their name: " + rddCitiesCached.map(x => (x._2)).distinct().count()
"Number of cities group by their code: " + rddCitiesCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCitiesCached: rddCities.type = MapPartitionsRDD[11] at flatMap at <console>:26

res17: String = Number of cities group by their name: 218

res18: String = Number of cities group by their code: 354


In [8]:
val rddCategoriesCached = rddCategories.cache()
"Number of categories group by their name: " + rddCategoriesCached.map(x => (x._2)).distinct().count()
"Number of categories group by their code: " + rddCategoriesCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCategoriesCached: rddCategories.type = MapPartitionsRDD[14] at flatMap at <console>:26

res19: String = Number of categories group by their name: 128

res20: String = Number of categories group by their code: 100


In [9]:
val rddVendorsCached = rddVendors.cache()
"Number of vendors group by their name: " + rddVendorsCached.map(x => (x._2)).distinct().count()
"Number of vendors group by their code: " + rddVendorsCached.map(x => (x._1)).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddVendorsCached: rddVendors.type = MapPartitionsRDD[17] at flatMap at <console>:26

res21: String = Number of vendors group by their name: 377

res22: String = Number of vendors group by their code: 262


## Query complesse

### Media prodotti venduti per negozio
Viene calcolato il numero medio di prodotti venduti per negozio attraverso l'utilizzo di una serie di operazioni sui dati contenuti nel dataset. In particolare, vengono mappati gli identificativi dei negozi e per ognuno di essi, viene creato un valore pari a 1. Questi valori vengono poi sommati insieme utilizzando la funzione "reduceByKey". Con la funzione "aggregate" si ottiene una struttara del (numeroItemsVenduti, numeroNegozi). Infine basta dividere il numeroItemsVenduti per numeroNegozi ed ottenere così la media dei prodotti venduti per singolo negozio

In [10]:
val rddStoresCached = rddStores.cache()
val avgProductSoldPerShop = rddStoresCached.map(x => (x._1,1)).reduceByKey(_+_).aggregate((0,0))((a,v)=>(a._1+v._2, a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
"Average number of products sold per shop: " + (avgProductSoldPerShop._1/avgProductSoldPerShop._2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddStoresCached: rddStores.type = MapPartitionsRDD[2] at flatMap at <console>:26

avgProductSoldPerShop: (Int, Int) = (19666763,2484)

res24: String = Average number of products sold per shop: 7917


### Media litri venduti per stato
Inizialmente, la funzione "map" viene utilizzata per estrarre le informazioni relative al numero del paese e al numero di litri venduti. Successivamente, la funzione "countByKey" viene utilizzata per calcolare il numero di occorrenze di ogni paese nel dataset. Infine, la funzione "aggregate" viene utilizzata per calcolare la somma totale dei litri venduti e il numero totale di paesi presenti nel dataset. Infine, viene calcolata la media dei litri venduti per paese.

In [11]:
val rddCountiresCached = rddCountries.cache()
val avgLitersSoldPerCountry = rddCountriesCached.
    map(x => (x._1,x._3)). /*(countryNumber,liters)*/
    countByKey().
    aggregate((0.0,0.0))((a,v)=>(a._1+v._2, a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
"Media di litri venduti per country: " + (avgLitersSoldPerCountry._1/avgLitersSoldPerCountry._2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCountiresCached: rddCountries.type = MapPartitionsRDD[8] at flatMap at <console>:26

avgLitersSoldPerCountry: (Double, Double) = (1.9666762E7,298.0)

res27: String = Media di litri venduti per country: 65995.84563758389


### Media bottiglie vendute per categoria
Inizialmente, la funzione "map" viene utilizzata per estrarre le informazioni relative al numero della categoria e al numero di bottiglie vendute. Successivamente, la funzione "countByKey" viene utilizzata per calcolare il numero di occorrenze di ogni categoria nel dataset. Infine, la funzione "aggregate" viene utilizzata per calcolare la somma totale delle bottiglie vendute e il numero totale di categorie presenti nel dataset. Infine, viene calcolata la media delle bottiglie vendute per categoria.

In [12]:
val rddCategoriesCached = rddCategories.cache()
val avgBottlesPerCategory = rddCategoriesCached.
    map(x => (x._1,x._3)).
    countByKey().
    aggregate((0.0,0.0))((a,v)=>(a._1+v._2, a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
"Media di bottiglie vendute per categoria: " + (avgBottlesPerCategory._1/avgBottlesPerCategory._2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddCategoriesCached: rddCategories.type = MapPartitionsRDD[14] at flatMap at <console>:26

avgBottlesPerCategory: (Double, Double) = (1559988.0,100.0)

res30: String = Media di bottiglie vendute per categoria: 15599.88


### Items più venduti in termini di fatturato e unità
Inizialmente, la funzione "map" viene utilizzata per estrarre le informazioni relative al nome dell'item, al suo fatturato e al numero di vendite. Successivamente, la funzione "reduceByKey" viene utilizzata per aggregare le informazioni raggruppando i valori con lo stesso nome dell'item.
Successivamente, la funzione "sortBy" viene utilizzata per ordinare gli elementi in base al fatturato in modo decrescente.
Infine, viene utilizzata la funzione "collect" per raccogliere i risultati del calcolo e la funzione "take" per selezionare solo i primi 10 risultati. Infine, viene utilizzata la funzione "foreach" per stampare a schermo le informazioni relative ai primi 10 elementi del RDD, ovvero il nome dell'item, il suo fatturato e il numero di vendite.

In [13]:
println("Risultato cosi' composto: nomeItem, fatturato, numero di vendite \n")
val rddItemsCached = rddItems.cache()
rddItemsCached.
        map(e => (e._2, (e._3, 1))).
        reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)).
        map(v => (v._1, v._2._1, v._2._2)).
        sortBy(_._2, false).
        collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Risultato cosi' composto: nomeItem, fatturato, numero di vendite 



rddItemsCached: rddItems.type = MapPartitionsRDD[5] at flatMap at <console>:26

(Hawkeye Vodka,7275708.29,411211)

(Black Velvet,5242031.529999999,220957)

(Captain Morgan Spiced Rum,4018092.1699999985,188778)

(Titos Handmade Vodka,3770229.4600000014,166741)

(Barton Vodka,3219200.8399999994,112894)

(Five O'clock Vodka,2997823.2799999984,226502)

(Phillips Vodka,2366347.33,107766)

(Jack Daniels Old #7 Black Lbl,2306382.9500000007,159353)

(Admiral Nelson Spiced Rum,1944860.9900000014,177337)

(Fireball Cinnamon Whiskey,1850195.199999996,122337)


### Categorie più vendute in termini di fatturato e unità
Questo codice stampa i risultati delle categorie più vendute in termini di fatturato e di numero di vendite. In particolare, dopo aver memorizzato la cache dei dati delle categorie, il codice mappa ogni elemento in una tupla in cui la chiave è il nome della categoria e il valore è un'altra tupla contenente il fatturato e il numero di vendite della categoria. Poi, i dati vengono aggregati tramite la funzione reduceByKey che somma i valori associati alla stessa chiave. Successivamente, il risultato viene mappato in una tupla contenente il nome della categoria, il fatturato totale e il numero di vendite totali e viene ordinato in modo decrescente in base al fatturato. Infine, vengono stampati i primi 10 elementi della lista di risultati ottenuti dalla funzione collect().

In [9]:
println("Risultato cosi' composto: nomeCategor, fatturato, numero di vendite \n")
val rddCategoriesCached = rddCategories.cache()
rddCategoriesCached.
        map(e => (e._2, (e._4, 1))).
        reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)).
        map(v => (v._1, v._2._1, v._2._2)).
        sortBy(_._2, false).
        collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Risultato cosi' composto: nomeCategor, fatturato, numero di vendite 



rddCategoriesCached: rddCategories.type = MapPartitionsRDD[14] at flatMap at <console>:26

(American Vodkas,1.4152316429999966E7,140485)

(Canadian Whiskies,1.389965760000001E7,103166)

(VODKA 80 PROOF,9530538.619999986,109531)

(Imported Brandies,8827292.330000015,30078)

(Spiced Rum,7252489.699999989,49083)

(SPICED RUM,7068700.709999999,44743)

(Whiskey Liqueur,6887928.550000002,50702)

(CANADIAN WHISKIES,6197296.109999994,47384)

(TEQUILA,4861197.939999997,35800)

(Straight Bourbon Whiskies,4779353.42,47715)


## Job 1
### Svolgere un’analisi sul prezzo medio di ogni singolo prodotto per ogni anno
Il codice ha lo scopo di calcolare il prezzo medio di ogni prodotto per ogni anno e di salvare il risultato in un file CSV.
In particolare, il codice esegue le seguenti operazioni:

- Inizialmente, viene eseguita una cache del RDD rddItems e viene creato un nuovo RDD (avgPricesByYear) che associa a ciascuna coppia (nomeProdotto, anno) una tupla contenente la somma dei prezzi e il numero di prodotti venduti durante l'anno considerato.
- Viene quindi eseguita una riduzione per chiave (reduceByKey) per aggregare i dati raggruppati per nome del prodotto e anno.
- Successivamente, viene eseguita una mappatura (mapValues) per calcolare il prezzo medio di ogni prodotto per ogni anno.
- In seguito, viene eseguita una mappatura (map) per riorganizzare i dati e raggrupparli per nome del prodotto.
- Vengono poi eseguite ulteriori operazioni di trasformazione (flatMap e coalesce) per preparare i dati per il salvataggio su file CSV.
- Infine, viene convertito l'RDD in un DataFrame e salvato in un file CSV.

In [None]:
val bucketname = "unibo-bigdata-project"
val path_itemsForYear_revenue = "s3a://"+bucketname+"/risultati/satates_revenue"

// calcolare il prezzo medio di ogni prodotto per ogni anno
val rddItemsCached = rddItems.cache()

val avgPricesByYear = rddItemsCached.map(item => ((item._2, item._5), (item._3, 1))).//((itemName,anno), (price,1))
  reduceByKey((accum, value) => (accum._1 + value._1, accum._2 + value._2)).
  mapValues(sumCount => sumCount._1 / sumCount._2).
  map(item => (item._1._1, item._2, item._1._2))
  //collect() //(itemName,avg,year)

// Raggruppo per nomeItem
val avgPricesByProduct = avgPricesByYear.groupBy(_._1).
  mapValues(_.map(item => (item._2, item._3)))//.collect()

//flatmap con chiave prodotto e anno
val avgPricesByProductAndYear = avgPricesByProduct.flatMap { case (itemName, yearPrices) =>
  yearPrices.map { case (avgPrice, year) =>
    ((itemName, year), avgPrice)
  }
}.coalesce(1)

// Conversione in DataFrame con stringa
val df = avgPricesByProductAndYear.map{case ((itemName, year), avgPrice) => (itemName,year,avgPrice)}.toDF()

// Salvataggio su file CSV
df.write.format("csv").mode(SaveMode.Overwrite).save(path_itemsForYear_revenue)

## Job 2
### Svolgere una classificazione dei prodotti in base al fatturato e calcolare il fatturato totale per ogni categoria e fascia di prezzo.
Questo codice esegue l'analisi delle vendite di prodotti e calcola il fatturato per categoria e fascia di prezzo dei prodotti. Innanzitutto, il codice memorizza la RDD rddItems nella variabile rddItemsCached, che serve a migliorare le prestazioni durante l'elaborazione. Successivamente, il codice esegue una serie di operazioni su rddItemsCached:
- map: mappa ogni record dell'RDD in una coppia di chiave-valore in cui la chiave è una tupla formata dal nome del prodotto e il nome della categoria, mentre il valore è una tupla formata dal prezzo del prodotto e 1 (ovvero la quantità di prodotto venduta).
- reduceByKey: raggruppa le tuple per chiave e somma il prezzo del prodotto e la quantità venduta, restituendo una nuova coppia di chiave-valore in cui la chiave è ancora la tupla prodotto-categoria, mentre il valore è una tupla formata dalla somma dei prezzi e la somma delle quantità vendute.
- map: mappa la coppia di chiave-valore ottenuta nel passaggio precedente in una tupla formata dal nome del prodotto, il nome della categoria, il fatturato totale e il numero totale di prodotti venduti, in modo da avere un formato più facile da leggere.
- sortBy: ordina le tuple per fatturato in ordine decrescente.

Successivamente, il codice esegue altre operazioni sul RDD risultante:

- map: mappa la coppia di chiave-valore prodotto-fatturato in una coppia di chiave-valore in cui la chiave è il nome del prodotto, mentre il valore è il fatturato totale del prodotto.
- reduceByKey: raggruppa le tuple per chiave e somma il fatturato per prodotto.
- map: mappa la coppia di chiave-valore risultante nel passaggio precedente in una coppia di chiave-valore in cui la chiave è il nome del prodotto, mentre il valore è la fascia di prezzo del prodotto. La fascia di prezzo viene assegnata in base al fatturato totale del prodotto e viene mappata utilizzando una serie di espressioni case che controllano se il fatturato del prodotto supera una soglia specifica.

Successivamente vengono eseguite le seguenti operazioni:

- join: unisce l'RDD risultante con il RDD che contiene il nome del prodotto, il nome della categoria e il fatturato totale per prodotto e categoria.
- map: mappa la coppia di chiave-valore risultante dalla join in una nuova coppia di chiave-valore in cui la chiave è una tupla formata dal nome della categoria e la fascia di prezzo del prodotto, mentre il valore è il fatturato totale del prodotto per quella categoria e il Tier.
- aggregateByKey: raggruppa le tuple per chiave e calcola la somma dei ricavi e il numero di prodotti venduti per categoria e fascia di prezzo.
- map: mappa la coppia di chiave-valore risultante dalla operazione di aggregazione in una tupla formata dal nome della categoria, la fascia di prezzo, il fatturato totale e il numero totale di prodotti venduti

In [7]:
val bucketname = "unibo-bigdata-project"
val path_category_revenue = "s3a://"+bucketname+"/risultati/categories_revenue"

val rddItemsCached = rddItems.cache()
val v = rddItemsCached.
        map(e => ((e._2, e._4), (e._3, 1))). //(nameItem, categoryName),(price,1)
        reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)). //(nameItem,categoryName),(fatturato,unitàVendute)
        map(v => (v._1._1, v._1._2, v._2._1, v._2._2)). //nomeItem, categoryName, fatturato, unitàVendute
        sortBy(_._3, false)


val itemsClassification = v.
    map(e => (e._1, e._3)). // nomeItem, fatturato
    reduceByKey(_ + _).
    map(e => (e._1, e._2 match {
      case f if f >= 1000000 => "Top Tier"
      case f if f < 1000000 && f >= 100000 => "Mid Tier"  
      case _ => "Low Tier"
      })) //nomeItem, Tier


val revenueByCategory = v.
    map(p => (p._1, (p._3, p._2))). // nomeItem, (fatturato, categoryName)
    join(itemsClassification). // nomeItem, ((fatturato,categoryName), Tier)
    map(p => ((p._2._1._2, p._2._2), p._2._1._1)). // (categoryName, Tier), fatturato
    aggregateByKey((0.0,0))((agg, v) => (agg._1 + v, agg._2 + 1),((agg1,agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). // (categoryName, Tier), price, count
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)).// categoryName, Tier, revenue, count
    coalesce(1)

revenueByCategory.map(x => x.productIterator.mkString(",")).saveAsTextFile(path_category_revenue)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bigdata-project

path_category_revenue: String = s3a://unibo-bigdata-project/risultati/categories_revenue

rddItemsCached: rddItems.type = MapPartitionsRDD[5] at flatMap at <console>:26

v: org.apache.spark.rdd.RDD[(String, String, Double, Int)] = MapPartitionsRDD[58] at sortBy at <console>:29

itemsClassification: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[61] at map at <console>:29

revenueByCategory: org.apache.spark.rdd.RDD[(String, String, Double, Int)] = CoalescedRDD[69] at coalesce at <console>:33
