###### Il progetto è visibile su github al seguente indirizzo: https://github.com/albianto97/bigdataFile.

Il dataset analizzato (https://www.kaggle.com/datasets/shuhengmo/uber-nyc-forhire-vehicles-trip-data-2021), contiene informazioni sui viaggi fatti da varie licenze "taxi" nella città di New York.
La grandezza totale del dataset è di 5 GB e contiene molte informazioni sui luoghi di partenza, arrivo, prezzo e varie tempistiche come l'orario e giorno di partenza e quelli d'arrivo.

In [2]:
%%configure -f
{"executorMemory":"5G", "numExecutors":2, "executorCores":3, "conf":
{"spark.dynamicAllocation.enabled": "false"}}


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1686252650686_0002,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1686252650686_0002,spark,idle,Link,Link,,✔


In [9]:
val bucketname = "unibo-bd-antonelli2023"
val path_ml_tripdata =
"s3a://"+bucketname+"/datasets/fhvhv_tripdata_2021-01.parquet"
val path_ml_tripdata2 =
"s3a://"+bucketname+"/datasets/fhvhv_tripdata_2021-02.parquet"

sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd-antonelli2023
path_ml_tripdata: String = s3a://unibo-bd-antonelli2023/datasets/fhvhv_tripdata_2021-01.parquet
path_ml_tripdata2: String = s3a://unibo-bd-antonelli2023/datasets/fhvhv_tripdata_2021-02.parquet
res13: String = application_1686252650686_0002
res15: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1686252650686_0002/


I dati sono presenti nel formato Parquet; dopo essere stati importati, tutti i job sono stati fatti dopo una successiva conversione in RDD. I dati sono raggruppati mensilmente, per un totale di 12 file Parquet. 

Importazione dei dati in formato Parquet.

In [10]:
import spark.implicits._
val parquetFileDF = spark.read.parquet(path_ml_tripdata)
val parquetFileDF2 = spark.read.parquet(path_ml_tripdata2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import spark.implicits._
parquetFileDF: org.apache.spark.sql.DataFrame = [hvfhs_license_num: string, dispatching_base_num: string ... 22 more fields]
parquetFileDF2: org.apache.spark.sql.DataFrame = [hvfhs_license_num: string, dispatching_base_num: string ... 22 more fields]


# Analisi dei campi del dataset
* licenseClass: tipo stringa che rappresenta la licenza de taxi.
	* HV0002: Juno
    * HV0003: Uber
    * HV0004: Via
    * HV0005: Lyft
* license: tipo Stringa che rappresenta il numero del taxi.
* request: tipo Timestamp che rappresenta la data/ora della richiesta del viaggio.
* pickup: tipo Timestamp  che rappresenta la data/ora dell'inizio del viaggio.
* dropoff: tipo Timestamp che rappresenta la data/ora della fine del viaggio.
* distance: tipo Double che rappresenta la distanza totale in miglia percorsa dal taxi per questo viaggio.
* startloc: tipo Long che rappresenta la zona di partenza del viaggio.
* endloc: tipo Long che rappresenta la zona di arrivo del viaggio.
* fare: tipo Double che rappresenta il prezzo del viaggio.

In [4]:
import java.sql.Timestamp
case class TaxiTrip(
  licenseClass:String,
  license:String,  
  request:Timestamp,
  pickup:Timestamp,
  dropoff:Timestamp,
  distance:Double,
  startloc:Long,
  endloc:Long,
  //time:Long,
  fare:Double,
)

object TaxiTrip {
    def extract(row:org.apache.spark.sql.Row) = {
        val licenseClass = row.getString(0)
        val license = row.getString(1)
        val request = row.getTimestamp(4)
        val pickup = row.getTimestamp(5)
        val dropoff = row.getTimestamp(6)
        val distance = row.getDouble(9)
        val startloc = row.getLong(7)
        val endloc = row.getLong(8)
        //val time = row.getLong(10)
        val fare = row.getDouble(11)
        
        new TaxiTrip(licenseClass,license,request,pickup,dropoff,distance,startloc,endloc,fare)
        

        //new TaxiTrip(license,request,pickup,dropoff,startloc,endloc,distance,time,fare)
    }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.sql.Timestamp
defined class TaxiTrip
defined object TaxiTrip
Companions must be defined together; you may wish to use :paste mode for this.


La seguente funzione serve per convertire da formato timestamp a calendar, in modo da poter estrarre informazioni come giorno del mese, ora del giorno oppure giorno della settimana. <br>
Si può notare che il dataset conteneva "trip_time" ossia il totale del tempo in secondi che i passeggeri hanno passato sul taxi durante il viaggio. <br>Ho preferito definire una funzione perchè sarebbe stata utile eventualmente per tutti i tempi importati dal dataset (request, pickup e dropoff).

In [5]:
import java.util.Calendar
def getDayTime(time: Long): Calendar = {
    var date:Calendar = Calendar.getInstance();
    date.setTimeInMillis(time);
    date
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.util.Calendar
getDayTime: (time: Long)java.util.Calendar


# Creazione e caching del RDD a partire dal file Parquet.
Caching per migliorare le prestazioni in vista di futuri utilizzi. Si è optato per la scelta tenere due ripartizioni (quindi due task per core). Essendoci due executor con tre core ciascuno, il numero totale di core è sei. Quindi si è scelta una ripartizione di 12, anche se guardando i tempi di esecuzione il miglioramento è solo di qualche secondo.

In [12]:
import org.apache.spark.HashPartitioner
val p = new HashPartitioner(12)
val data_rdd = parquetFileDF.rdd
val data_rdd2 = parquetFileDF2.rdd
val rddTaxiTrip = data_rdd.map(TaxiTrip.extract)
val rddTaxiTrip2 = data_rdd2.map(TaxiTrip.extract)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.HashPartitioner
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@c
data_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[9] at rdd at <console>:36
data_rdd2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[15] at rdd at <console>:36
rddTaxiTrip: org.apache.spark.rdd.RDD[TaxiTrip] = MapPartitionsRDD[18] at map at <console>:39
rddTaxiTrip2: org.apache.spark.rdd.RDD[TaxiTrip] = MapPartitionsRDD[19] at map at <console>:39


# File di input
Per tutte le seguenti query verranno utilizzati due file, i quali verranno uniti successivamente attraverso una union: il primo contenente i dati per il mese di gennaio 2021, il secondo contente i dati di febbraio 2021.
<br>Avendo caricato tutti i file nella cartella è possibile fare confronti anche su mesi diversi, ed avendo Kaggle anche i dataset di anni precedenti la stessa analisi può essere effettutata anche su anni differenti. (Attenzione ad eventuali campi mancanti in dataset di anni precedenti). 
<br>La grandezza di input è di circa 380/390 MB a file.


### Numero totale di record

In [13]:
rddTaxiTrip.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res16: Long = 11908468


### Numero totale di zone di arrivo diverse
Nota: Lo stesso numero sono quelle di partenza.

In [14]:
rddTaxiTrip.map(x => x.endloc).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res17: Long = 262


### Stampa delle classi di licenza 
Nel caso ne mancasse qualcuna significa che non è presente nel periodo di tempo considerato.

In [15]:
rddTaxiTrip.map(x => x.licenseClass).distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res18: Array[String] = Array(HV0004, HV0005, HV0003)


### VIAGGI PIU LUNGHI IN TERMINE DI KM, IN RELAZIONE CON LA DURATA DEL VIAGGIO


In [83]:
val query1 = rddTaxiTrip.map(x => (x.distance, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000)))
val query1802 = rddTaxiTrip2.map(x => (x.distance, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query1: org.apache.spark.rdd.RDD[(Double, Long)] = MapPartitionsRDD[264] at map at <console>:81
query1802: org.apache.spark.rdd.RDD[(Double, Long)] = MapPartitionsRDD[265] at map at <console>:81


In [84]:
val query1union = query1.union(query1802).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query1union: org.apache.spark.rdd.RDD[(Double, Long)] = UnionRDD[266] at union at <console>:82


In [85]:
query1union.sortByKey(false).take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res187: Array[(Double, Long)] = Array((738.95,666), (527.11,575), (512.5,541), (480.73,512), (454.49,452), (432.359,379), (417.77,469), (408.19,440), (389.65,370), (381.95,444))


### VIAGGI PIU COSTOSI, IN RELAZIONE CON LA SOMMA TRA LUNGHEZZA E LA DURATA DEL VIAGGIO
Aggiunto un filtro sulla distanza minima. Questo comporterà solo la stampa dei viaggi più costosi visto che ci si aspetta che l'andamento sia proporzionale tra distanza e costo.

In [86]:
val query2 = rddTaxiTrip.filter(_.distance > 20).map(x => (x.fare, ((x.dropoff.getTime() - x.pickup.getTime())/(60*1000) + x.distance)))
val query2802 = rddTaxiTrip2.filter(_.distance > 20).map(x => (x.fare, ((x.dropoff.getTime() - x.pickup.getTime())/(60*1000) + x.distance)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query2: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[274] at map at <console>:81
query2802: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[276] at map at <console>:81


In [87]:
val query2union = query2.union(query2802).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query2union: org.apache.spark.rdd.RDD[(Double, Double)] = UnionRDD[277] at union at <console>:82


In [88]:
query2union.sortByKey(false).take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res188: Array[(Double, Double)] = Array((1532.67,1404.95), (1374.73,906.49), (1361.13,759.65), (1308.56,886.77), (1215.83,825.95), (1159.66,626.4), (1149.87,992.73), (1149.03,452.39), (1082.69,536.21), (1073.88,629.78))


### TEMPO DI VIAGGIO MEDIO, IN BASE ALLA ZONA DI PARTENZA
Il codice esegue le seguenti operazioni:
* Viene eseguita una map per estrarre la zona di partenza e il tempo di viaggio;
* Successivamente viene fatta un aggregazione in base alla chiave per andare a calcolare la somma dei tempi e il conteggio dei viaggi;

Queste azioni sono state svolte per entrambi gli RDD dei mesi.

* Viene eseguita una union per raggruppare i due valori;

Il risultato finale che si sta cercando è quello di vedere per ogni zona quanto è il tempo medio di viaggio

In [89]:
val query3 = rddTaxiTrip.map(x => (x.startloc, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000))).
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).partitionBy(p)
val query31 = rddTaxiTrip2.map(x => (x.startloc, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000))).
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).partitionBy(p)
val query3union = query3.union(query31).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query3: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = ShuffledRDD[286] at partitionBy at <console>:83
query31: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = ShuffledRDD[289] at partitionBy at <console>:83
query3union: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = PartitionerAwareUnionRDD[290] at union at <console>:82


##### Risultato raggruppato per vedere la differenza tra i due mesi rispetto alla zona di partenza

In [90]:
val q = query3union.map({case(k,v) => (k,v._1/v._2)}).groupByKey().map({case(k,v) => (v,k)}).sortByKey(false).take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

q: Array[(Iterable[Double], Long)] = Array((CompactBuffer(35.26170798898072, 37.86046511627907),1), (CompactBuffer(29.887379973950182, 30.010265920054298),132), (CompactBuffer(29.043478260869566, 14.444444444444445),2), (CompactBuffer(26.88235294117647, 25.642857142857142),110), (CompactBuffer(23.692460239286607, 25.2119881087581),138), (CompactBuffer(22.56801909307876, 22.26112759643917),27), (CompactBuffer(22.0, 17.0),199), (CompactBuffer(21.416037008481112, 21.901016009852217),202), (CompactBuffer(20.15281650864473, 20.65938242280285),46), (CompactBuffer(19.722795265101055, 21.199589471142236),117))


#### Attenzione

In [68]:
//CAPIRE QUALE è IL RISULTATO
val query6 = rddTaxiTrip.map(x => (x.startloc, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000)))
val query61 = rddTaxiTrip2.map(x => (x.startloc, (x.dropoff.getTime() - x.pickup.getTime())/(60*1000)))
val query6union = query5.union(query51).cache()
//query5union.take(3)
query6union.aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).partitionBy(p)
query6union.map({case(k,v) => (k/v)}).take(10)
query6union.map({case(k,v) => (k,v._1/v._2)}).sortByKey(false).take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query6: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[203] at map at <console>:30
query61: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[204] at map at <console>:29
query6union: org.apache.spark.rdd.RDD[(Long, Long)] = UnionRDD[205] at union at <console>:30


### IN MEDIA QUANTI VIAGGI FA CIASCUNA LICENZA

Inizialmente, la funzione "map" viene utilizzata per estrarre le informazioni relative al numero di licenze e al numero di viaggi effettuati. Infine, la funzione "aggregate" viene utilizzata per calcolare la somma totale dei viaggi e il numero totale di licenze. Infine, viene calcolata la media.

In [92]:
val query4 = rddTaxiTrip.map(x => (x.license,1)).
reduceByKey(_+_).
aggregate((0,0))((a,v)=>(a._1+v._2, a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
"Media: " + (query4._1/query4._2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query4: (Int, Int) = (11908468,32)
res190: String = Media: 372139


### CLASSE DI LICENZA CHE FA PIU VIAGGI, CON LA PRESENZA ANCHE DELLA TARIFFA CLIENTE

Questo codice stampa i risultati delle classi di licenze che fa più viaggi in relazione anche al guadagno. In particolare, il codice mappa ogni elemento in una tupla in cui la chiave è la licenza, contenente il guadagno e il numero di viaggi di ogni licenza. Poi, i dati vengono aggregati tramite la funzione reduceByKey che somma i valori associati alla stessa chiave. Successivamente, il risultato viene mappato in una tupla contenente il nome della licenza, il guadagno totale e il numero di viaggi totali e viene ordinato in modo decrescente in base al guadagno. Infine, vengono stampati gli elementi della lista di risultati ottenuti dalla funzione collect().

In [93]:
val query5 = rddTaxiTrip.map(x => (x.licenseClass,(x.fare,1))).
reduceByKey((t1,t2) => (t1._1+t2._1, t1._2+t2._2)).
map(v => (v._1,v._2._1, v._2._2)).
sortBy(_._2,false).
collect().foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(HV0003,1.468278860287933E8,8704128)
(HV0005,5.509768345985989E7,3094325)
(HV0004,2816007.860000914,110015)
query5: Unit = ()


### LICENZA CHE FA PIU VIAGGI, CON LA PRESENZA ANCHE DELLA TARIFFA CLIENTE

Qui viene eseguita una replica della query precedente, basandosi su ogni singola licenza.

In [94]:
val query6 = rddTaxiTrip.map(x => (x.license,(x.fare,1))).
reduceByKey((t1,t2) => (t1._1+t2._1, t1._2+t2._2)).
map(v => (v._1,v._2._1, v._2._2)).
sortBy(_._2,false).
collect().foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(B02510,5.503748608986006E7,3091000)
(B02764,1.709378720999917E7,1009388)
(B02872,1.5509604710001156E7,924960)
(B02875,1.2014741050004447E7,735450)
(B02765,1.0189792220006326E7,591242)
(B02869,7744772.440006703,452098)
(B02887,5608155.9400040135,333768)
(B02871,5581241.000004057,330085)
(B02682,5456731.730003582,321599)
(B02866,5284853.590003659,309274)
(B02864,5203100.810003353,316395)
(B02878,5125333.480003258,312013)
(B02617,4884453.930003084,281432)
(B02883,4582633.870002626,268391)
(B02884,4354322.950002186,257674)
(B02882,4097472.360001952,241988)
(B02876,3929723.280001779,230732)
(B02867,3730036.2400015946,217449)
(B02879,3572925.7100014733,216993)
(B02877,3490842.3500013095,208986)
(B02835,3355979.0300012426,200129)
(B02888,3032522.3300008504,177542)
(B02800,2776613.2300008354,108146)
(B02889,2500286.6500002183,149398)
(B02836,2385524.3700000723,140365)
(B02395,2056890.0899998054,124107)
(B02880,2039189.9299998167,119173)
(B02870,1768859.4399998628,106975)
(B02865,1385092.24999

### MAP PER QUERY GROUP BY LOCATION/TIME, REDUCE BY KEY PER CONTARE LE OCCORRENZE DEI RECORD, MAP PER SCAMBIARE CHIAVE-VALORE PER POTER ORDINARE I VALORI PER VEDERE LE ZONE/PERIODI PIU GETTONATI


In [95]:
//rddTaxiTrip.map(x => (x.startloc, 1)).reduceByKey((x,y) => x + y).map({case(k,v) => (v,k)}).sortByKey(false).collect()

val query7 = rddTaxiTrip.map(x => (getDayTime(x.pickup.getTime()).get(Calendar.DAY_OF_WEEK), 1)).partitionBy(p)
val query7802 = rddTaxiTrip2.map(x => (getDayTime(x.pickup.getTime()).get(Calendar.DAY_OF_WEEK), 1)).partitionBy(p)

val query7union = query7.union(query7802).cache()

query7union.reduceByKey((x,y) => x + y).map({case(k,v) => (v,k)}).sortByKey(false).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query7: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[318] at partitionBy at <console>:85
query7802: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[320] at partitionBy at <console>:83
query7union: org.apache.spark.rdd.RDD[(Int, Int)] = PartitionerAwareUnionRDD[321] at union at <console>:83
res195: Array[(Int, Int)] = Array((4107150,6), (4102794,7), (3425392,1), (3205149,5), (3142246,4), (2869346,3), (2670333,2))


### DISTANZA MEDIA PERCORSA DAI TAXI IN UN SINGOLO VIAGGIO, IN RELAZIONE ALLA ZONA

In [96]:
val query8 = rddTaxiTrip.filter(_.distance < 100).map(x => (x.startloc,x.distance)).
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).partitionBy(p)
val query81 = rddTaxiTrip2.filter(_.distance < 100).map(x => (x.startloc,x.distance)).
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+v,a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).partitionBy(p)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query8: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = ShuffledRDD[330] at partitionBy at <console>:83
query81: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = ShuffledRDD[334] at partitionBy at <console>:83


In [97]:
val query8union = query8.union(query81).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

query8union: org.apache.spark.rdd.RDD[(Long, (Double, Double))] = PartitionerAwareUnionRDD[335] at union at <console>:82


In [98]:
val q8 = query8union.map({case(k,v) => (k,v._1/v._2)}).map({case(k,v) => (v,k)}).sortByKey(false).take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

q8: Array[(Double, Long)] = Array((20.136333333333326,1), (19.978372093023246,1), (15.583862644195618,132), (14.864652072083206,132), (11.889173913043477,2), (10.915017273061434,138), (10.765875894988065,27), (10.74933302682418,138), (10.515,199), (10.507823529411766,110))


In [99]:
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}

// Crea la sessione Spark
val spark = SparkSession.builder.getOrCreate()

// Definisci lo schema del DataFrame
val schema = StructType(Seq(
  StructField("Value", DoubleType, nullable = true),
  StructField("Key", LongType, nullable = true)
))

// Crea un array di oggetti Row
val rows = q8.map { case (v, k) => Row(v, k) }

// Crea il DataFrame utilizzando lo schema e l'array di righe
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

// Salva il DataFrame come file CSV
df.write.format("csv").mode(SaveMode.Overwrite).save("s3a://"+bucketname+"/datasets/project/output/avgDistance")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@31366754
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Value,DoubleType,true),StructField(Key,LongType,true))
rows: Array[org.apache.spark.sql.Row] = Array([20.136333333333326,1], [19.978372093023246,1], [15.583862644195618,132], [14.864652072083206,132], [11.889173913043477,2], [10.915017273061434,138], [10.765875894988065,27], [10.74933302682418,138], [10.515,199], [10.507823529411766,110])
df: org.apache.spark.sql.DataFrame = [Value: double, Key: bigint]


SQL

In [100]:
parquetFileDF.createOrReplaceTempView("parquetFile")
val parkDF = spark.sql("Select * from parquetFile")
parkDF

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parkDF: org.apache.spark.sql.DataFrame = [hvfhs_license_num: string, dispatching_base_num: string ... 22 more fields]
res208: org.apache.spark.sql.DataFrame = [hvfhs_license_num: string, dispatching_base_num: string ... 22 more fields]


### ANALISI SUL GUADAGNO MEDIO DI OGNI SINGOLO VIAGGIO PER OGNI GIORNO
Il codice ha lo scopo di calcolare il guadagno medio di ogni classe di licenza in base al giorno in cui si è svolto il viaggio. Il risultato è stato poi salvato in un file CSV.<br>
Sono state eseguite le seguenti operazioni:
* Viene eseguito creato un RDD (avgFareByDay) che associa a ciascuna coppia (classe di licenza, Giorno) una tupla contenente la somma del guadagno e il numero di viaggi durante il giorno.
* Viene quindi eseguita una riduzione per chiave (reduceByKey) per aggregare i dati raggruppati per classe di licenza e giorno.
* Successivamente, viene eseguita una mappatura (mapValues) per calcolare il guadagno medio di ogni viaggio per ogni giorno.
* In seguito, viene eseguita una mappatura (map) per riorganizzare i dati e raggrupparli per classe di licenza.
* Vengono poi eseguite ulteriori operazioni di trasformazione (flatMap e coalesce) per preparare i dati per il salvataggio su file CSV.
* Infine, viene convertito l'RDD in un DataFrame e salvato in un file CSV.


In [101]:
val avgFareByDay = rddTaxiTrip.map(x => ((x.licenseClass, getDayTime(x.pickup.getTime()).get(Calendar.DAY_OF_WEEK)), (x.fare, 1))).
  reduceByKey((accum, value) => (accum._1 + value._1, accum._2 + value._2)).
  mapValues(sumCount => sumCount._1 / sumCount._2).
  map(item => (item._1._1, item._2, item._1._2))
  //collect() //(licenza,avg,giorno)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

avgFareByDay: org.apache.spark.rdd.RDD[(String, Double, Int)] = MapPartitionsRDD[347] at map at <console>:88


In [102]:
// Raggruppo per licenza
val avgFareByLicense = avgFareByDay.groupBy(_._1).
  mapValues(_.map(item => (item._2, item._3)))//.collect()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

avgFareByLicense: org.apache.spark.rdd.RDD[(String, Iterable[(Double, Int)])] = MapPartitionsRDD[350] at mapValues at <console>:86


In [103]:
//flatmap con chiave prodotto e anno
val avgFareByLicenseAndDay = avgFareByLicense.flatMap { 
    case (license, dayFare) => dayFare.map { 
        case (avgFare, day) => ((license, day),avgFare)
  }
}.coalesce(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

avgFareByLicenseAndDay: org.apache.spark.rdd.RDD[((String, Int), Double)] = CoalescedRDD[352] at coalesce at <console>:89


In [104]:
import org.apache.spark.sql.SaveMode
// Conversione in DataFrame con stringa
val df = avgFareByLicenseAndDay.map{case ((license, day), avgFare) => (license,day,avgFare)}.toDF()

// Salvataggio su file CSV
df.write.format("csv").mode(SaveMode.Overwrite).save("s3a://"+bucketname+"/datasets/project/output/avgFare")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int ... 1 more field]


In [105]:
//rddTaxiTrip.map(x => x.startloc).distinct().count()
//rddTaxiTrip.map(x => x.license).distinct().count()
//rddTaxiTrip2.map(x => x.startloc).distinct().count()
//rddTaxiTrip2.map(x => x.license).distinct().count()
//rddTaxiTrip.map(x => x.startloc).take(5)
//rddTaxiTrip.map(x => x.license).take(5)
//rddTaxiTrip.map(x => x.request).take(5)
//rddTaxiTrip.map(x => x.request.getTime()).take(5)
//rddTaxiTrip.map(x => x.fare).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…