## Lazy Flights
    "flights.csv" holds a list of all flight records collected somehow from the US Department of Transportation.
    
#### 1. RDDs, Transformations and Actions

In [1]:
val flights = "flights.csv"
val flightsRDD = sc.textFile(flights)

flights = flights.csv
flightsRDD = flights.csv MapPartitionsRDD[1] at textFile at <console>:28


flights.csv MapPartitionsRDD[1] at textFile at <console>:28

In [2]:
// definition of class Flight
// all flight Strings in the flightsRDD will be transformed into an object of this class

import org.joda.time.LocalDate

case class Flight (
    journeyDate: LocalDate,
    airline: String,
    flightNumber: String,
    origin: String,
    destiny: String,
    departureTime: LocalDate,
    departureDelay: Double,
    arrivalTime: LocalDate,
    arrivalDelay: Double,
    airTime: Double,
    distance: Double
)

defined class Flight


In [3]:
// a parser that converts each line of the "flights.csv" into a Flight object

import org.joda.time.format.DateTimeFormat

def parseFlight(line: String): Flight = {
    val columns = line.split(",")
    val datePattern = DateTimeFormat.forPattern("YYYY-mm-dd")
    val timePattern = DateTimeFormat.forPattern("HHmm")
    
    val journeyDate: LocalDate = datePattern.parseDateTime(columns(0)).toLocalDate()
    val airline: String = columns(1)
    val flightNumber: String = columns(2)
    val origin: String = columns(3)
    val destiny: String = columns(4)
    val departureTime: LocalDate = timePattern.parseDateTime(columns(5)).toLocalDate()
    val departureDelay: Double = columns(6).toDouble
    val arrivalTime: LocalDate = timePattern.parseDateTime(columns(7)).toLocalDate()
    val arrivalDelay: Double = columns(8).toDouble
    val airTime: Double = columns(9).toDouble
    val distance: Double = columns(10).toDouble
    
    Flight(
        journeyDate, airline, flightNumber, origin, destiny,
        departureTime, departureDelay, arrivalTime, arrivalDelay,
        airTime, distance
    )
}

parseFlight: (line: String)Flight


In [4]:
// Transformation - map

// transforming RDD of Strings into an RDD of Flight Objects
val flightObjects = flightsRDD.map(parseFlight)

flightObjects = MapPartitionsRDD[2] at map at <console>:39


MapPartitionsRDD[2] at map at <console>:39

In [5]:
// calculating the average distance travelled by any flight

// Action - count
// total number of Flight objects that the RDD is holding
val flightsCount = flightObjects.count().toDouble

// Transformations - map, reduce
// total distance covered by all the flights
val totalDistance = flightObjects.map(_.distance).reduce(_+_)

// Average Distance
val avgDistance = totalDistance / flightsCount

flightsCount = 476881.0
totalDistance = 3.79052917E8
avgDistance = 794.8585013871385


794.8585013871385

In [6]:
// calculating percentage of flights that usually delay to take off (departureDelay)

// Transformation - filter
// Action - count
// total number of flights that are lazy to take off
val delayedFlights = flightObjects.filter(_.departureDelay > 0).count()

// Percentage of Flights
val delayedFlightsPercentage = (delayedFlights / flightsCount) * 100

delayedFlights = 179015
delayedFlightsPercentage = 37.53871510922012


37.53871510922012

In [7]:
// calculating the average delay in departure of any flight

// Transformation - map
// Action - aggregate
// an RDD that contains only the departureDelay column is created
// the values in this RDD is aggregated such the values are reduced to a tuple - (sum, count)
// sum holds the sum of all the departureDelays
// count holds the total number of delayed flights
val sumCountOfDelay = flightObjects.map(_.departureDelay).aggregate(0.0, 0)(
    (sumCount, delay) => (sumCount._1 + delay, sumCount._2 + 1),
    (sumCount1, sumCount2) => (sumCount1._1 + sumCount2._1, sumCount1._2 + sumCount2._2)
)

// Average Delay
val avgDelay = sumCountOfDelay._1 / sumCountOfDelay._2

sumCountOfDelay = (3964730.0,476881)
avgDelay = 8.313877046894298


8.313877046894298

In [8]:
// ** this is a lil intro to a PairRDD **

// creating a frequency distribution on the delayedFlights
// it is an RDD of tuples
// sample tuple - (0, 453)
// 0 - range of minutes delayed, should be understood as 0 to 1
// 453 - number of flights accounted to that much delay

// Action - countByValue
// FrequencyDistribution
val frequencyDistribution = flightObjects.map(flight => (flight.departureDelay / 60).toInt).countByValue()

frequencyDistribution = Map(0 -> 452963, 5 -> 249, 10 -> 15, 24 -> 3, 25 -> 1, 14 -> 13, 20 -> 4, 1 -> 16016, 6 -> 113, 28 -> 1, 21 -> 3, 9 -> 26, 13 -> 15, 2 -> 4893, 17 -> 2, 12 -> 9, 7 -> 66, 3 -> 1729, 11 -> 12, 8 -> 43, 4 -> 701, 15 -> 4)


Map(0 -> 452963, 5 -> 249, 10 -> 15, 24 -> 3, 25 -> 1, 14 -> 13, 20 -> 4, 1 -> 16016, 6 -> 113, 28 -> 1, 21 -> 3, 9 -> 26, 13 -> 15, 2 -> 4893, 17 -> 2, 12 -> 9, 7 -> 66, 3 -> 1729, 11 -> 12, 8 -> 43, 4 -> 701, 15 -> 4)

#### 2. PairRDDs, Transformations and Actions

In [9]:
val airportwiseDelay = flightObjects.map(flight => (flight.origin, flight.departureDelay))
val airportwiseDelaySum = airportwiseDelay.reduceByKey(_+_)
val airportwiseDelayCount = airportwiseDelay.mapValues(airport => 1).reduceByKey(_+_)
val airportwiseSumCountOfDelay = airportwiseDelaySum.join(airportwiseDelayCount)
val airportwiseAvgDelay1 = airportwiseSumCountOfDelay.mapValues(airport => airport._1 / airport._2)

airportwiseDelay = MapPartitionsRDD[10] at map at <console>:38
airportwiseDelaySum = ShuffledRDD[11] at reduceByKey at <console>:39
airportwiseDelayCount = ShuffledRDD[13] at reduceByKey at <console>:40
airportwiseSumCountOfDelay = MapPartitionsRDD[16] at join at <console>:41
airportwiseAvgDelay1 = MapPartitionsRDD[17] at mapValues at <console>:42


MapPartitionsRDD[17] at mapValues at <console>:42

In [10]:
val airportwiseAvgDelay2 = airportwiseDelay.combineByKey(
    delay => (delay, 1),
    (sumCount: (Double, Int), delay: Double) => (sumCount._1 + delay, sumCount._2 + 1),
    (sumCount1: (Double, Int), sumCount2: (Double, Int)) => (sumCount1._1 + sumCount2._1, sumCount1._2 + sumCount2._2)
).mapValues(airport => airport._1 / airport._2)

airportwiseAvgDelay2 = MapPartitionsRDD[19] at mapValues at <console>:44


MapPartitionsRDD[19] at mapValues at <console>:44

In [11]:
airportwiseAvgDelay2.sortBy(-_._2).take(10)

[(PPG,56.25), (EGE,32.0), (OTH,24.533333333333335), (LAR,18.892857142857142), (RDD,18.55294117647059), (MTJ,18.363636363636363), (PUB,17.54), (EWR,16.478549005929544), (CIC,15.931034482758621), (RST,15.6993006993007)]

In [12]:
def isHeader(line: String): Boolean = {
    line.contains("Description")
}

def parseAirport(line: String): (String, String) = {
    val columns = line.replace("\"", "").split(",")
    (columns(0), columns(1))
}

val airportsRDD = sc.textFile("airports.csv").filter(line => !isHeader(line)).map(parseAirport)

airportsRDD = MapPartitionsRDD[28] at map at <console>:38


isHeader: (line: String)Boolean
parseAirport: (line: String)(String, String)


MapPartitionsRDD[28] at map at <console>:38

In [13]:
airportsRDD.lookup("PPG")

WrappedArray(Pago Pago)

In [14]:
val airportLookup = airportsRDD.collectAsMap

airportLookup = Map(CLI -> Clintonville, DBM -> Debra Marcos, MTT -> Minatitlan, SZP -> Santa Paula, CVM -> Ciudad Victoria, CKM -> Clarksdale, LFO -> Kelafo, ETS -> Enterprise, BHH -> Bisha, BRL -> Burlington, BGL -> Baglung, YGP -> Gaspe, QRO -> Queretaro, NRI -> Afton, QQS -> Eastsound, CLR -> Calipatria, PPO -> Powell Point, CAR -> Caribou, IGN -> Iligan, BUR -> Burbank, CKV -> Clarksville, BJR -> Bahir Dar, CA7 -> Tracy, ARM -> Armidale, BTV -> Burlington, BGU -> Bangassou, TTD -> Troutdale, LIC -> Limon, DIB -> Dibrugarh, DHF -> Al Dhafra, SSI -> Brunswick, YYE -> Fort Nelson, SHI -> Shimojishima, YNE -> Norway House, PFB -> Passo Fundo, RED -> Reedsville, LIL -> Lille, KSH -> Kermanshah, KHH -> Kaohsiung, ELP -> El Paso, DVL -> Devils Lake, AT...


Map(CLI -> Clintonville, DBM -> Debra Marcos, MTT -> Minatitlan, SZP -> Santa Paula, CVM -> Ciudad Victoria, CKM -> Clarksdale, LFO -> Kelafo, ETS -> Enterprise, BHH -> Bisha, BRL -> Burlington, BGL -> Baglung, YGP -> Gaspe, QRO -> Queretaro, NRI -> Afton, QQS -> Eastsound, CLR -> Calipatria, PPO -> Powell Point, CAR -> Caribou, IGN -> Iligan, BUR -> Burbank, CKV -> Clarksville, BJR -> Bahir Dar, CA7 -> Tracy, ARM -> Armidale, BTV -> Burlington, BGU -> Bangassou, TTD -> Troutdale, LIC -> Limon, DIB -> Dibrugarh, DHF -> Al Dhafra, SSI -> Brunswick, YYE -> Fort Nelson, SHI -> Shimojishima, YNE -> Norway House, PFB -> Passo Fundo, RED -> Reedsville, LIL -> Lille, KSH -> Kermanshah, KHH -> Kaohsiung, ELP -> El Paso, DVL -> Devils Lake, ATE -> Antlers, DIK -> Dickinson, CHG -> Chaoyang, CRK -> Luzon Island, WFD -> Woodford, CGK -> Jakarta, VZD -> Campbell Lagoon, YYN -> Swift Current, WEH -> Dashuipo, SHR -> Sheridan, XXJ -> Doha, RGN -> Yangon, YCN -> Cochrane, KSQ -> Karshi, RQR -> Harbor

In [15]:
airportLookup("RGA")

Rio Grande

In [16]:
val airportwiseAvgDelay3 = airportwiseAvgDelay1.map(x => (airportLookup(x._1), x._2))

airportwiseAvgDelay3 = MapPartitionsRDD[31] at map at <console>:50


MapPartitionsRDD[31] at map at <console>:50

In [17]:
val airportLookupBC = sc.broadcast(airportLookup)
val airportwiseAvgDelay4 = airportwiseAvgDelay1.map(x => (airportLookupBC.value(x._1), x._2))

airportLookupBC = Broadcast(14)
airportwiseAvgDelay4 = MapPartitionsRDD[32] at map at <console>:51


MapPartitionsRDD[32] at map at <console>:51