# Flights Price Analysis 

The goal of this notebook is to run some analysis on a dataset that contains one-way flights found on Expedia between 2022-04-16 and 2022-10-05 (you can find it at this [link](https://www.kaggle.com/datasets/dilwong/flightprices)).

In [None]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [1]:
val bucketname = "unibo-bd2223-paolopenazzi"
// val bucketname = "unibo-bd2223-vfolin"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1677420161242_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2223-paolopenazzi


## Data Preparation

The following columns will be kept and created:
- flightID: Identifier for the flight.
- searchDate: Date the record was obtained from Expedia.
- searchMonth: Month the record was obtained from Expedia.
- searchDay: Day the record was obtained from Expedia.
- flightDate: The date of the flight.
- flightMonth: The month of the flight.
- flightDay: The day of the flight.
- startingAirport: 3-letter code for the starting airport.
- destinationAirport: 3-letter code for the destination airport.
- duration: Travel duration in minutes.
- isEconomy: Is basic economy?
- isRefundable: Is the ticket refundable?
- isNonStop: Is the flight non-stop?
- baseFare: Price of the ticket (not including taxes).
- totalFare: Price of the ticket, including taxes and fees.
- seatsRemaining: Number of remaining seats.
- travelDistance: The total travel distance in miles.

In [2]:
case class FlightData(
    flightID:String,
    searchDate:String,
    searchMonth:String,
    searchDay:String,
    flightDate:String,
    flightMonth:String,
    flightDay:String,
    startingAirport:String,
    destinationAirport:String,
    duration:Int,
    isEconomy:Boolean,
    isRefundable:Boolean,
    isNonStop:Boolean,
    baseFare:Double,
    totalFare:Double,
    seatsRemaining:Int,
    travelDistance:Int
)

object FlightData {

    def parse(line:String) = {
        val input = line.split(",")
        val flightID = input(0)
        val searchDate = input(1)
        val searchMonth = searchDate.substring(5,7)
        val searchDay = searchDate.substring(8,10)
        val flightDate = input(2)
        val flightMonth = flightDate.substring(5,7)
        val flightDay = flightDate.substring(8,10)
        val startingAirport = input(3)
        val destinationAirport = input(4)
        val dur = input(6).replace("P","").replace("T","").split("D|H|M").map(x => x.toInt)
        val duration = dur.length match {
            case 3 => dur(0) * 1440 + dur(1) * 60 + dur(2)
            case 2 => dur(0) * 60 + dur(1)
            case 1 => dur(0)
        }
        val isEconomy = input(8).toBoolean
        val isRefundable = input(9).toBoolean
        val isNonStop = input(10).toBoolean
        val baseFare = input(11).toDouble
        val totalFare = input(12).toDouble
        val seatsRemaining = input(13).toInt
        val travelDistance = input(14) match {
            case "" => 0
            case _ => input(14).toInt
        }
        
        new FlightData(flightID,searchDate,searchMonth,searchDay,flightDate,flightMonth,flightDay,startingAirport,
                       destinationAirport,duration,isEconomy,isRefundable,isNonStop,baseFare,totalFare,
                       seatsRemaining,travelDistance)
    }
}

// Create an RRD from the csv file.
val rdd = sc.textFile("s3a://"+bucketname+"/datasets/itineraries.csv")
// Extract header from RDD and parse every row
val header = rdd.first(); 
val rddFlights = rdd.filter(row => row != header).map(FlightData.parse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class FlightData
defined object FlightData
Companions must be defined together; you may wish to use :paste mode for this.
rdd: org.apache.spark.rdd.RDD[String] = s3a://unibo-bd2223-paolopenazzi/datasets/itineraries.csv MapPartitionsRDD[1] at textFile at <console>:29
header: String = legId,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,travelDuration,elapsedDays,isBasicEconomy,isRefundable,isNonStop,baseFare,totalFare,seatsRemaining,totalTravelDistance,segmentsDepartureTimeEpochSeconds,segmentsDepartureTimeRaw,segmentsArrivalTimeEpochSeconds,segmentsArrivalTimeRaw,segmentsArrivalAirportCode,segmentsDepartureAirportCode,segmentsAirlineName,segmentsAirlineCode,segmentsEquipmentDescription,segmentsDurationInSeconds,segmentsDistance,segmentsCabinCode
rddFlights: org.apache.spark.rdd.RDD[FlightData] = MapPartitionsRDD[3] at map at <console>:29


#### Visualize data as a DataFrame

In [3]:
import spark.implicits._

val columns = Seq("flightID",
                  "searchDate",
                  "searchMonth",
                  "searchDay",
                  "flightDate",
                  "flightMonth",
                  "flightDay",
                  "startingAirport",
                  "destinationAirport",
                  "duration",
                  "isEconomy",
                  "isRefundable",
                  "isNonStop",
                  "baseFare",
                  "totalFare",
                  "seatsRemaining",
                  "travelDistance")

val flightDataframe = rddFlights.toDF(columns:_*)
flightDataframe.show(3,truncate=40,vertical=true)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import spark.implicits._
columns: Seq[String] = List(flightID, searchDate, searchMonth, searchDay, flightDate, flightMonth, flightDay, startingAirport, destinationAirport, duration, isEconomy, isRefundable, isNonStop, baseFare, totalFare, seatsRemaining, travelDistance)
flightDataframe: org.apache.spark.sql.DataFrame = [flightID: string, searchDate: string ... 15 more fields]
-RECORD 0----------------------------------------------
 flightID           | 9ca0e81111c683bec1012473feefd28f 
 searchDate         | 2022-04-16                       
 searchMonth        | 04                               
 searchDay          | 16                               
 flightDate         | 2022-04-17                       
 flightMonth        | 04                               
 flightDay          | 17                               
 startingAirport    | ATL                              
 destinationAirport | BOS                              
 duration           | 149                              
 isEc

#### Define a function to compute the number of days between the searchDate and the flightDate

In [4]:
import java.time.temporal.ChronoUnit.DAYS
import java.time.LocalDate

def daysBetween(x: FlightData): Long = {
        val searchDate = LocalDate.parse(x.searchDate);
        val flightDate = LocalDate.parse(x.flightDate);
        val daysBetween = DAYS.between(searchDate, flightDate);
        daysBetween
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import java.time.temporal.ChronoUnit.DAYS
import java.time.LocalDate
daysBetween: (x: FlightData)Long


## Data Exploration

#### Cache the dataset to run futher analysis and queries later

In [5]:
val rddFlightsCached = rddFlights.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddFlightsCached: rddFlights.type = MapPartitionsRDD[3] at map at <console>:29


#### Perform the following queries to explore the dataset:

- Number of searches performed 
- Number of distinct flights
- Number of distinct startingAirport

In [13]:
val rddDistinctFlightsCached = rddFlightsCached.
    map(x => (x.flightID, x.startingAirport, x.flightMonth, x.isNonStop, x.isEconomy, x.destinationAirport)).
    distinct().
    cache()

println("\nNumber of searches performed: " + rddFlightsCached.count())
println("Number of distinct flights: " + rddDistinctFlightsCached.count())
println("Number of distinct startingAirport: " + rddDistinctFlightsCached.map(x => (x._2)).distinct().count())

/*
Number of searches performed: 82138753
Number of distinct flights: 6741921
Number of distinct startingAirport: 16
*/

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDistinctFlightsCached: org.apache.spark.rdd.RDD[(String, String, String, Boolean, Boolean, String)] = MapPartitionsRDD[62] at distinct at <console>:32

Number of searches performed: 82138753
Number of distinct flights: 6741921
Number of distinct startingAirport: 16


- Number of flights by starting airport
- Number of flights by month
- Number of routes, that are the possible combinations of starting and destination airport

In [21]:
println("Number of flights by starting airport")
val flightsByAirport = rddDistinctFlightsCached.
    map(x => (x._2, 1)).
    reduceByKey(_+_).
    collect()

println("\nNumber of flights by month")
val flightsByMonth = rddDistinctFlightsCached.
    map(x => (x._3, 1)).
    reduceByKey(_+_).
    collect()

println("\nNumber of routes: " + rddDistinctFlightsCached.map(x => (x._2, x._6)).distinct().count()) 

/*
Number of flights by starting airport
flightsByAirport: Array[(String, Int)] = Array((LAX,647762), (CLT,439551), (JFK,321474), (BOS,503551),
(OAK,322976), (LGA,489480), (ATL,411753), (MIA,386155), (DTW,369802), (PHL,399106), (SFO,568692),
(EWR,329291), (ORD,416925), (DEN,359393), (IAD,342082), (DFW,433928))

Number of flights by month
flightsByMonth: Array[(String, Int)] = Array((04,205745), (05,768008), (06,1022075), (07,1080180),
(08,1203901), (09,1192040), (10,880474), (11,389498))

Number of routes: 235
*/

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of flights by starting airport
flightsByAirport: Array[(String, Int)] = Array((LAX,647762), (CLT,439551), (JFK,321474), (BOS,503551), (OAK,322976), (LGA,489480), (ATL,411753), (MIA,386155), (DTW,369802), (PHL,399106), (SFO,568692), (EWR,329291), (ORD,416925), (DEN,359393), (IAD,342082), (DFW,433928))

Number of flights by month
flightsByMonth: Array[(String, Int)] = Array((04,205745), (05,768008), (06,1022075), (07,1080180), (08,1203901), (09,1192040), (10,880474), (11,389498))

Number of routes: 235


- Number of flights that have an available economy ticket
- Number of flights that have an available no-economy ticket
- Number of direct flights

In [22]:
println("Number of flights that offer an economy fare: " + rddDistinctFlightsCached.filter(x => x._5).count())
println("Number of flights that offer a non economy fare: " + rddDistinctFlightsCached.filter(x => !x._5).count())
println("Number of direct flights: " + rddDistinctFlightsCached.filter(x => x._4).count())

/*
Number of flights that offer an economy fare: 1189149
Number of flights that offer a non economy fare: 5552772
Number of direct flights: 809106
*/

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of flights that offer an economy fare: 1189149
Number of flights that offer a non economy fare: 5552772
Number of direct flights: 809106


## Jobs

In [24]:
import org.apache.spark.sql.SaveMode

val output_path = "s3a://"+bucketname+"/spark/bdexam"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode
path_output: String = s3a://unibo-bd2223-vfolin/spark/bdexam


### 1 - Veronika Folin

The change that occurs in the ticket price in 14 days prior to flight departure is calculated, dividing the result between economy and no-economy tickets.

The correlation between that change in price and the number of seats remaining on that flight is then displayed.

#### We filter data to obtain searches made in the last 30 days before flight departure

The filtering operation allows us to reduce the number of records and therefore to optimize subsequent queries.

In [22]:
val rddFlightsFiltered = rddFlights.
    filter(x => daysBetween(x) <= 30).
    cache()

//"Number of filtered flights: " + rddFlightsFiltered.count()
// 47674280

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddFlightsFiltered: org.apache.spark.rdd.RDD[FlightData] = MapPartitionsRDD[132] at filter at <console>:33


#### We divide the result between economy and no-economy tickets

In [23]:
val rddEconomyFlights = rddFlightsFiltered.filter(_.isEconomy)
val rddNoEconomyFlights = rddFlightsFiltered.filter(!_.isEconomy)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEconomyFlights: org.apache.spark.rdd.RDD[FlightData] = MapPartitionsRDD[133] at filter at <console>:30
rddNoEconomyFlights: org.apache.spark.rdd.RDD[FlightData] = MapPartitionsRDD[134] at filter at <console>:30


#### We calculate the average ticket price in a given number of days before departure

In [26]:
val rddAveragePriceEconomy = rddEconomyFlights.
    map(x => (daysBetween(x), x.baseFare)).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (x,y)=>(x._1 + y._1, x._2 + y._2)).
    map({case(k,v) => (k, v._1/v._2)})

val rddAveragePriceNoEconomy = rddNoEconomyFlights.
    map(x => (daysBetween(x), x.baseFare)).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (x,y)=>(x._1 + y._1, x._2 + y._2)).
    map({case(k,v) => (k, v._1/v._2)})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddAveragePriceEconomy: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[137] at map at <console>:36
rddAveragePriceNoEconomy: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[140] at map at <console>:37


#### We calculate the average number of seats remaining on flights over a given number of days before departure

In [27]:
val rddAverageSeatsRemainingEconomy = rddEconomyFlights.
    map(x => (daysBetween(x), x.seatsRemaining)).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (x,y)=>(x._1 + y._1, x._2 + y._2)).
    map({case(k,v) => (k, v._1/v._2)})

val rddAverageSeatsRemainingNoEconomy = rddNoEconomyFlights.
    map(x => (daysBetween(x), x.seatsRemaining)).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (x,y)=>(x._1 + y._1, x._2 + y._2)).
    map({case(k,v) => (k, v._1/v._2)})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddAverageSeatsRemainingEconomy: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[143] at map at <console>:36
rddAverageSeatsRemainingNonEconomy: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[146] at map at <console>:37


#### We join average ticket prices with average number of remaining seats, and then we save the result on S3 to reuse it

In [28]:
val rddEconomyResult = rddAverageSeatsRemainingEconomy.join(rddAveragePriceEconomy).
    map({case(k,v) => (k, v._1, v._2)}).
    coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_output)
val rddNoEconomyResult = rddAverageSeatsRemainingNonEconomy.join(rddAveragePriceNoEconomy).
    map({case(k,v) => (k, v._1, v._2)}).
    coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_output)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEconomyResult: Unit = ()
rddNoEconomyResult: Unit = ()


#### We calculate the correlation matrix between the attributes AverageSeatsRemaining and AveragePrice

In [11]:
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg._

val economyData = rddEconomyResult.map({case(days,(averageSeatsRemaining, averagePrice)) => 
    Vectors.dense(averageSeatsRemaining, averagePrice)}).cache()
val correlMatrixEconomy: Matrix = Statistics.corr(economyData)

val noEconomyData = rddNonEconomyResult.map({case(days,(averageSeatsRemaining, averagePrice)) => 
    Vectors.dense(averageSeatsRemaining, averagePrice)}).cache()
val correlMatrixNoEconomy: Matrix = Statistics.corr(noEconomyData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg._
economyData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[34] at map at <console>:36
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0                  -0.8847960167193927
-0.8847960167193927  1.0
nonEconomyData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[43] at map at <console>:36
correlMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0                  -0.5823660225408362
-0.5823660225408362  1.0


### 2 - Paolo Penazzi

The goal is to identify the cheapest flights departing in a given period, grouped according to the starting airport, calculating the following values:
- The average price recorded in the 14 days before the departure of direct flights only.
- The previous result in relation to the distance traveled by the flight. (the flights that take us as far as possible for less money)
- The cheapest airport to travel from, based on the previous result.

Data is filtered to obtain the searches made in the last 14 days before the flight departure. 

Records with the travelDistance of the flight equal to 0 are removed.

The filtering operation is the first step done because it allows to reduce the number of records and therefore to optimize subsequent queries.

In [24]:
val rddSearchesFiltered = rddFlights.
    filter(x => daysBetween(x) <= 14).
    filter(_.travelDistance != 0).
    cache()

println("\nNumber of searches: " + rddSearchesFiltered.count())

// Number of searches: 21736294

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddSearchesFiltered: org.apache.spark.rdd.RDD[FlightData] = MapPartitionsRDD[139] at filter at <console>:34

Number of searches: 21736294


In [26]:
def computeWeek(flightDate: String): Long = {
    val firstDate = LocalDate.parse("2022-04-16");
    val actualDate = LocalDate.parse(flightDate);
    DAYS.between(firstDate, actualDate)/7.toInt;
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

computeWeek: (flightDate: String)Long


In [37]:
val rddDistinctFlightsFiltered = rddSearchesFiltered.
    map(x => (x.flightID, x.flightDate, x.travelDistance, x.startingAirport)).
    distinct().
    cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDistinctFlightsFiltered: org.apache.spark.rdd.RDD[(String, String, Int, String)] = MapPartitionsRDD[159] at distinct at <console>:32


In [31]:
val rddDistinctFlightsWithPeriod = rddDistinctFlightsFiltered.
    map(x => (x._1, computeWeek(x._2)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddDistinctFlightsWithPeriod: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[144] at map at <console>:33


In [33]:
val rddAveragePricePerFlight = rddSearchesFiltered.
    map(x => (x.flightID, x.baseFare)).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (x,y)=>(x._1 + y._1, x._2 + y._2)).
    map({case(k,v) => (k, v._1/v._2)})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddAveragePricePerFlight: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[147] at map at <console>:33


In [39]:
val rddAveragePricePerDistance = rddDistinctFlightsFiltered.
    map(x => (x._1, x._3)).
    join(rddAveragePricePerFlight).
    map({case(x,y) => (x, y._2/y._1)})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddAveragePricePerDistance: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[164] at map at <console>:35


In [40]:
val rddAveragePricePerAirport = rddDistinctFlightsFiltered.
    map(x => (x._1, x._4)).
    join(rddAveragePricePerDistance)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddAveragePricePerAirport: org.apache.spark.rdd.RDD[(String, (String, Double))] = MapPartitionsRDD[168] at join at <console>:34


In [41]:
val rddFinal = rddAveragePricePerAirport.
    map({case(x,y) => (y._1, y._2)}).
    aggregateByKey((0.0,0.0))((x,y)=>(x._1+y, x._2 + 1) , (a,b)=>(a._1 + b._1, a._2 + b._2)).
    map({case(k,v) => (v._1/v._2, k)}).
    sortByKey()

/*
distinct 300 sec
map      625 sec
map     16 sec
map      373 sec
map     18 sec
map      400 sec
map     14 sec
sortbyK 0,5 sec
*/

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddFinal: org.apache.spark.rdd.RDD[(Double, String)] = ShuffledRDD[174] at sortByKey at <console>:34


In [45]:
val df = rddFinal.toDF("Airport", "AvgPricePerDistance")
df.head(16)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df: org.apache.spark.sql.DataFrame = [Airport: double, AvgPricePerDistance: string]
res159: Array[org.apache.spark.sql.Row] = Array([0.17988029904856137,LAX], [0.19393624443351556,SFO], [0.2002686589880659,DEN], [0.20032604686862723,MIA], [0.21269134538073378,DFW], [0.23640432106133333,OAK], [0.24453020718848176,ORD], [0.24990981053289352,LGA], [0.2590611607820318,BOS], [0.26611110688969863,EWR], [0.2780071246501974,ATL], [0.30096046871780907,PHL], [0.3044091070553833,CLT], [0.3113123198244222,DTW], [0.31312544853160823,JFK], [0.3312865798692019,IAD])
