# Big Data Project
Developed by: 
- Grushchak Denys: mat. 0001027862
- Bacca Riccardo: mat. 0001045029

Original dataset (reduced to 7gb for the scope of this project): https://www.kaggle.com/datasets/dilwong/flightprices

## Configuration

In [None]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [None]:
val bucketname ="unibo-bd2223-rbacca"

val path_flights_db = "s3a://"+bucketname+"/bigdata-project/xaa"

sc.applicationId

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

### Parsing
This parser makes the original dataset flat, based on the leg's number of each flight.

For example: if a flight has two legs it will be splitted into 2 rows.

Each row, now has this properties:
* duplicated fields: `id`, `flightDate`, `startingAirport`, `destinationAirport`, `isNonStop`, `isBasicEconomy`.
* separated fields (based on original leg's number for this flight): `airplaneType`, `airlineName`, `segmentDistance`, `segmentDuration`.
* recalculate `totalFare` for each row based on `segmentDistance`. Sum of two rows must return the original value of `totalFare`
* each time-based value, will be converted in minutes

`id` is a new numeric field that replace the orinal text identifier.

In [None]:
type MyTuple = (String, String, String, String, String, Boolean, Boolean, Double, Int, Int)

case class FlatFlightData(
    id: Long,
    flightDate: String,   
    startingAirport: String,
    destinationAirport: String,
    airplaneType: String,
    airlineName: String,
    isNonStop: Boolean,
    isBasicEconomy: Boolean,
    totalFare: Double,
    segmentDuration: Int,
    segmentDistance: Int
) extends Serializable{
    // return a tuple of the object
    def un() = FlatFlightData.unapply(this).get 
}


object FlatParser{
    import scala.collection.mutable.ListBuffer
    
    val commaRegex = ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"
    val doubleVerticalLine = "\\|\\|"
    
    def convertAirplaneType(value: String): Array[String] = {
        //airplane type [String||String] 
        val result = value.split(doubleVerticalLine)
        if (result.contains("")) Array.empty
        else result
    }
    
    def convertAirlineName(value: String): Array[String] = {
        // airliine name  [String||String]
        val result = value.split(doubleVerticalLine)
        if (result.contains("")) Array.empty
        else result
    }
    
    //return array of duration of each leg in minutes
    def convertSegmentsDuration(value: String): Array[Int] = {
        val result = value.split(doubleVerticalLine)
        if (result.contains("")) Array.empty
        else result.map(_.toInt/60)
    }
    
    def convertSegmentsDistance(value: String): Array[Int] = {
        // segment distance in miles: Vector of [Int||Int] can be [None||None]
        val result = value.split(doubleVerticalLine)
        if(result.contains("") || result.head == "None") Array.empty
        else result.map(_.toInt)
    }
   
    def parseFlightInformationLine(line: String): Option[ListBuffer[MyTuple]] = {
        try {
            val input = line.split(commaRegex)
            
            val airplanes = convertAirplaneType(input(23))
            val airline = convertAirlineName(input(21))
            val segmentsDuration = convertSegmentsDuration(input(24))
            val segmentsDistance = convertSegmentsDistance(input(25))
            if (airplanes.isEmpty || 
                airline.isEmpty || 
                segmentsDuration.isEmpty || 
                segmentsDistance.isEmpty|| 
                airplanes.length != airline.length ||
                airplanes.length != segmentsDuration.length ||
                airplanes.length != segmentsDistance.length ) None
            else {
                val arr: ListBuffer[MyTuple] = ListBuffer()
                val distanceSum = segmentsDistance.reduce(_+_).toDouble
                for (i <- 0 until airplanes.length){
                    val legPriceBasedOnDistance: Double = (segmentsDistance(i).toDouble/distanceSum) * input(12).toDouble //12 baseFare in $
                    arr += ((input(2), //flightdate
                             input(3), //starting airport
                             input(4), //destination airport
                             airplanes(i),
                             airline(i),
                             input(10).toBoolean,  //isNonStop
                             input(8).toBoolean,  //isBasicEconomy
                             legPriceBasedOnDistance,
                             segmentsDuration(i),
                             segmentsDistance(i))
                    )
                }
                Some(arr)
            }
        } catch {
            case _: Exception => None
        }
    }
}

### Tolerance for duplicated data
The original dataset has many duplicated record. 
We decided to create a new dataset that has the original format without duplicated records, then we use this new dataset as the new main source of data for elaboration.

After elaboration of 10 GB of data we find out that only 1.2 GB of data were unique and this is not sufficient to comply project requirements, at this point we decided to continue using the original duplicated dataset even if it is practically not correct.

### Setup
Purposes of this code:
1. Read and parse the original data
2. Create and append a new numeric identifier
3. Insert the data in `FlatFlighData` class
4. Create a cache

We use `persist()` function two times because the function `zipWithIndex()` automatically trigger a spark job and as a conseguense before saving the final version of data in cache we need to parse twice the original data. The indermediate disk only cache save us from second parsing. 

In [None]:
import org.apache.spark.storage.StorageLevel._

sc.getPersistentRDDs.foreach(_._2.unpersist()) 

val parsedData = sc.textFile(path_flights_db).flatMap(FlatParser.parseFlightInformationLine).persist(DISK_ONLY)
val rddFlightsCached = parsedData.
    zipWithIndex(). // Introduce numeric id for each flight row
    map({case (v, i) => (i, v)}).
    flatMapValues(m => m).
    map({case (k, v) => FlatFlightData.apply(k, v._1, v._2, v._3, v._4, v._5, v._6, v._7, v._8, v._9, v._10)}).persist(MEMORY_AND_DISK)

In [None]:
// To verify that parsing process was correct
rddFlightsCached.map(_.un).take(20).foreach(println)

val numberOfDistinctRecords = rddFlightsCached.map(_.id).distinct.count

"number of total (undistinct) records: " + rddFlightsCached.count

def round(v: Double): Double = {
    (v*100).toInt/100.toDouble
}

## Explorative queries

1. How many distinct airports and aircraft models
2. Average travel duration for airline
3. Percentage of basic economy tickets, based on all tickets
4. Percentage of non-stop flights (flights with one leg)
5. Average and price range of tickets
6. Average ticket price for each airline
7. Average and range of travel distance
8. Top 10 airports with more arriving flights


In [None]:
//1.How many distinct airports and aircraft models
val distinctAirports = rddFlightsCached.
    map(x => x.startingAirport).
    distinct.
    union(rddFlightsCached.map(x => x.destinationAirport).distinct).
    distinct.count
val distinctAircraftModels = rddFlightsCached.flatMap(x => x.airplaneType).distinct.count

In [None]:
//2. Average travel duration for each airline
val distinctAirlines = rddFlightsCached.
    map(x => (x.airlineName, x.segmentDuration)).
    aggregateByKey((0.0, 0.0))((a,v)=>(a._1+v, a._2+1), (a1,a2)=>(a1._1+a2._1, a1._2+a2._2)).
    map({case(k,v)=>(k,v._1/v._2)}).
    collect().
    foreach({case (airline, value) => println(airline + " => " + round(value) + " avg minutes")})

In [None]:
//3. Percentage of basic economy tickets, based on all tickets
"Basic economy tickets: " + round(
    (rddFlightsCached.filter(_.isBasicEconomy).map(_.id).distinct.count.toDouble/numberOfDistinctRecords).toDouble*100) + " %"

In [None]:
//4. Percentage of non-stop flights
"Non-stop flights: " + round(
    (rddFlightsCached.filter(_.isNonStop).map(_.id).distinct.count.toDouble/numberOfDistinctRecords).toDouble*100) + " %"

In [None]:
//5. Average and price range of tickets
val ticketPrices = rddFlightsCached.map(x => (x.id,x.totalFare)).reduceByKey(_+_).map(_._2)
"Range of prices: "  + ticketPrices.min + "$ to " + ticketPrices.max + "$"
"Avg price: " + round((ticketPrices.sum/numberOfDistinctRecords).toDouble) + "$"

In [None]:
//6. Average ticket price for each airline
val avgTicketPricePerAirline = rddFlightsCached.
    map(x => (x.airlineName,(x.totalFare, 1))).
    reduceByKey((a,b) => (a._1+b._1, a._2+b._2)).
    map(m => (m._1,m._2._1/m._2._2)).
    collect.foreach({case (name, value) => println(name + " => " + round(value) + "$")})

In [None]:
//7. Average and range of travel distance
val travelDistances = rddFlightsCached.map(x => (x.id, x.segmentDistance)).reduceByKey(_+_).map(_._2)
"Range of distances: "  + travelDistances.min + " to " + travelDistances.max + " miles"
"Avg travel distance: " + round((travelDistances.sum/numberOfDistinctRecords).toDouble) + " miles"

In [None]:
//8. Top 10 airports with more arriving flights
val topAirports = rddFlightsCached.
    map(x => (x.id, x.destinationAirport)).
    distinct().
    map(m => (m._2, 1)).
    reduceByKey(_+_).    
    sortBy(_._2, false).
    take(10).
    foreach({case (name, value) => println(name + " => " + value)})

## Main queries


In [None]:
val path_output = "s3a://"+bucketname+"/spark/avgRatPerMovie" //todo

### Riccardo

> Aggrego su “aircraft models” per calcolare la classifica discreta dei modelli più usati rispetto a ciascuna “airline”. 
Eseguo il join col dataset originale, infine riaggrego su “travel duration” e sulla classificazione di prima. 
Ottengo la durata media di ogni volo per ogni compagnia (oltre a numero di voli e totale di ore) sul modello di aereo più utilizzato

In [None]:
// Print results
def printQueryResult(result: ((String, String),(Int, Double, Double))): Unit = {
    result match {
        case ((compagnia, aereo),(numeroVoli, totOre, mediaVolo)) => 
            println(compagnia + " ====> " + aereo + " ====> " + "numero voli: " + numeroVoli + " - totale ore: " + totOre + " - media in volo (ore): " + mediaVolo)
    }
}
    

In [None]:
//primo caso: prima riduco tutti i dati, e poi faccio join (join più piccolo)

val mostUsedAircraftForAirline = rddFlightsCached.map(x => ((x.airlineName,x.airplaneType), 1)).
        reduceByKey(_+_).
        map({case ((airlineName, airplaneType),count) => (airlineName, (airplaneType,count))}).
        reduceByKey((a,b) => if (a._2 > b._2) a else b).
        map({case (airlineName, (airplaneType, count)) => ((airlineName, airplaneType),count)})

val totalFlightDurationForAirlineAirplane = rddFlightsCached.map(x => ((x.airlineName, x.airplaneType),x.segmentDuration)).reduceByKey(_+_)

val result = mostUsedAircraftForAirline.join(totalFlightDurationForAirlineAirplane).
    map({case (k, (count, sum)) => (k, (count, round(sum.toDouble/60), round((sum.toDouble/count)/60)))}).
    collect().
    foreach(printQueryResult)

In [None]:
//secondo caso: join più grande, meno dati da mappare e calcolo la somma delle durate dopo

val mostUsedAircraftForAirline = rddFlightsCached.map(x => ((x.airlineName,x.airplaneType), 1)).
        reduceByKey(_+_).
        map({case ((airlineName, airplaneType),count) => (airlineName, (airplaneType,count))}).
        reduceByKey((a,b) => if (a._2 > b._2) a else b).
        map({case (airlineName, (airplaneType, count)) => ((airlineName, airplaneType),count)})

val totalFlightDurationForAirlineAirplane = rddFlightsCached.map(x => ((x.airlineName, x.airplaneType),x.segmentDuration))

val result = mostUsedAircraftForAirline.join(totalFlightDurationForAirlineAirplane).reduceByKey((a,b) => (a._1, (a._2 + b._2))).
    map({case (k, (count, sum)) => (k, (count, round(sum.toDouble/60), round((sum.toDouble/count)/60)))}).
    collect().
    foreach(printQueryResult)

In [None]:
// terzo caso: uso aggregate
val mostUsedAircraftForAirline = rddFlightsCached.map(x => ((x.airlineName,x.airplaneType), x.segmentDuration)).
        aggregateByKey((0.0, 0.0))((a,v)=>(a._1+v, a._2+1), (a1,a2)=>(a1._1+a2._1, a1._2+a2._2)).
        map({case ((airline, airplane), (sum, count)) => (airline, (airplane, sum, count))}).
        reduceByKey((a,b) => if (a._3 > b._3) a else b)

val result = mostUsedAircraftForAirline.
    map({case (airline, (airplane, sum, count)) => ((airline, airplane), (count.toInt, round(sum.toDouble/60), round((sum.toDouble/count)/60)))}).
    collect().
    foreach(printQueryResult)

### Denys
I will calculate the velocity of each airplane type
> Denys: aggrego su “airplane model” per calcolare la “segment distance” totale percorsa da ogni modello, poi faccio self-join e aggregazione per determinare il “travel duration” per ogni “airplane model”. Alla fine determino la velocità media di ogni modello partendo dai dati aggregati. Visualizzo 10 aerei con la velocità media più alta

In [None]:
//return velocity in miles per hour
def getVelocity(distance: Int, durationInMinutes: Int): Double = {
    round(distance.toDouble/(durationInMinutes.toDouble/60))
}

First solution follows the problem description: there are two distinct agregation for distance and duration and then a self join the unite aggregated data.

In [None]:
val totalAirplanesDistance = rddFlightsCached.map(m => (m.airplaneType, m.segmentDistance)).reduceByKey(_+_)
val totalAirplanesDuration = rddFlightsCached.map(m => (m.airplaneType, m.segmentDuration)).reduceByKey(_+_)

totalAirplanesDistance.join(totalAirplanesDuration).
    map({case (name, (dis, dur)) =>  (name, getVelocity(dis, dur))}).
    sortBy(_._2,false).
    take(10).
    foreach({case (a, v) => println(a + " ----> " + v + " mph")})

In next implementation I tried to make a repartitioning of the data on base of common key but it gives a worst result because repartioning overhead is much bigger than the quantity of data that are suffle in the first query. It is because the first query aggregatetion produces a very few lines of result that can be shuffle very fast while the repartitioning shuffle all selected data.

For example: for 7 GB dataset the first query have only ~1Mb of suffle reads and writes while this solution have ~460MB of shuffle writes and ~920MB of shuffle reads

In [None]:
val repartitionedValues = rddFlightsCached.map(m => (m.airplaneType, (m.segmentDistance, m.segmentDuration))).repartition(12)

val totalAirplanesDistance = repartitionedValues.map(m => (m._1, m._2._1)).reduceByKey(_+_)
val totalAirplanesDuration = repartitionedValues.map(m => (m._1, m._2._2)).reduceByKey(_+_)

totalAirplanesDistance.join(totalAirplanesDuration).
    map({case (name, (dis, dur)) =>  (name, getVelocity(dis, dur))}).
    sortBy(_._2,false).
    take(10).
    foreach({case (a, v) => println(a + " ----> " + v + " mph")})

The fastes way for the velocity calculation is aggregation of two fields together. 

The performance is obtained by aggregating the data only once and due to absence of the join operation.

In [None]:
rddFlightsCached.
    map(m => (m.airplaneType, (m.segmentDistance, m.segmentDuration))).
    reduceByKey({case ((dis1, dur1), (dis2, dur2)) => (dis1+dis2, dur1+dur2)}).
    map({case (name, (dis, dur)) =>  (name, getVelocity(dis, dur))}).
    sortBy(_._2, false).
    take(10).
    foreach({case (a, v) => println(a + " ----> " + v + " mph")})

Alternative velocity valutation (mean of means): first we calculate the mean velocity of each flight and then we calculate the mean velocity for each airplane type. This query structure is very similar to the previous one so they have very similar performance. This type of velocity valutation is impossible to do with seperate vale aggregation as in first two queries.

Obviously the query result is different

In [None]:
rddFlightsCached.
    map(m => (m.airplaneType, (m.segmentDistance.toDouble, m.segmentDuration.toDouble/60))).
    aggregateByKey((0.0,0.0))((a,v)=>(a._1+(v._1/v._2), a._2+1),(a1,a2)=>(a1._1+a2._1, a1._2+a2._2)).
    map({case (name, (velocitySum, count)) =>  (name, round(velocitySum/count))}).
    sortBy(_._2, false).
    take(10).
    foreach({case (a, v) => println(a + " ----> " + v + " mph")})