# Flight Delay Analysis

**Objective**:
- Group by **Airline** to classify them as **Good** / **Average** / **Bad** based on the average departure delay.
- Join the classification back to the original dataset to tag each flight with its airline’s class.
- Group by **Route** (Origin→Dest) and **Class** to count the number of flights and uncover patterns.

There are **two** Spark pipelines in Scala:
1. **Non‑optimized version**
2. **Optimized version**


## 1. Setup Spark Session

In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import java.io.{File, PrintWriter}
import java.util.Calendar

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.148.1:4040
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1749068934338)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import java.io.{File, PrintWriter}
import java.util.Calendar


In [2]:
val spark = SparkSession.builder()
    .appName("FlightDelayAnalysis")
    .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6a998ea9


In [3]:
val path_to_output = "../../../src/main/outputs/"
val path_to_samples = "../../../datasets/samples/"

val path_2019_ds = path_to_samples + "Combined_Flights_2019_sample.csv"
val path_2020_ds = path_to_samples + "Combined_Flights_2020_sample.csv"
val path_2021_ds = path_to_samples + "Combined_Flights_2021_sample.csv"
val path_2022_ds = path_to_samples + "Combined_Flights_2022_sample.csv"

path_to_output: String = ../../../src/main/outputs/
path_to_samples: String = ../../../datasets/samples/
path_2019_ds: String = ../../../datasets/samples/Combined_Flights_2019_sample.csv
path_2020_ds: String = ../../../datasets/samples/Combined_Flights_2020_sample.csv
path_2021_ds: String = ../../../datasets/samples/Combined_Flights_2021_sample.csv
path_2022_ds: String = ../../../datasets/samples/Combined_Flights_2022_sample.csv


## 2. Load & Inspect a Sample for Debugging

In [None]:
val sc = spark.sparkContext

In [4]:
def logJobExecution(startTime: Long, jobName: String, outputPath: String): Unit = {

    val endTime = System.currentTimeMillis()
    val duration = endTime - startTime
    val dateTime = Calendar.getInstance().getTime()

    println(s"[$dateTime] Job '$jobName' completed in $duration ms.")

    val writer = new PrintWriter(new File(outputPath))
    writer.write(s"[$dateTime] Job '$jobName' completed in $duration ms.\n")
    writer.close()
}

logJobExecution: (startTime: Long, jobName: String, outputPath: String)Unit


In [5]:
def parseFlights(row: String) = {
    
    def getInt(s: String): Int = {
        val str = if (s == null) "" else s.trim
        if (str.matches("-?\\d+")) str.toInt else 0
    }

    def getDouble(s: String): Double = {
        val str = if (s == null) "" else s.trim
        if (str.matches("-?\\d+(\\.\\d+)?")) str.toDouble else 0.0
    }

    def getBoolean(s: String): Boolean =
        Option(s).map(_.trim.toLowerCase).contains("true")

    val columns = row.split(",", -1).map(_.trim.stripPrefix("\"").stripSuffix("\""))

    val flightDate = columns(0)
    val airline = columns(1)
    val origin = columns(2)
    val dest = columns(3)
    val cancelled = getBoolean(columns(4))
    val diverted = getBoolean(columns(5))
    val depTime = getInt(columns(7))
    val depDelay = getDouble(columns(9))
    val airTime = getDouble(columns(12))
    val year = getInt(columns(16))
    val month = getInt(columns(17))

    (flightDate, airline, origin, dest, cancelled, diverted, depTime, depDelay, airTime, year, month)
}

parseFlights: (row: String)(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)


In [6]:
def loadYear(path: String) = {
    val raw = sc.textFile(path)
    val header = raw.first()
    raw.filter(_ != header)
}

loadYear: (path: String)org.apache.spark.rdd.RDD[String]


In [7]:
val raw2019 = loadYear(path_2019_ds)
val raw2020 = loadYear(path_2020_ds)
val raw2021 = loadYear(path_2021_ds)
val raw2022 = loadYear(path_2022_ds)

raw2019: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:37
raw2020: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:37
raw2021: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at filter at <console>:37
raw2022: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:37


In [8]:
val flights2019 = raw2019.map(x => parseFlights(x))
val flights2020 = raw2020.map(x => parseFlights(x))
val flights2021 = raw2021.map(x => parseFlights(x))
val flights2022 = raw2022.map(x => parseFlights(x))

flights2019: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)] = MapPartitionsRDD[12] at map at <console>:38
flights2020: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)] = MapPartitionsRDD[13] at map at <console>:39
flights2021: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)] = MapPartitionsRDD[14] at map at <console>:40
flights2022: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)] = MapPartitionsRDD[15] at map at <console>:41


## 3. Non‑Optimized Pipeline (Sample)

In [19]:
def runYear(
        flights: RDD[(String,String,String,String,Boolean,Boolean,Int,Double,Double,Int,Int)],
        flightYear: String
): RDD[String] = {
    
    // Average depDelay per airline
    val airlineStats = flights
        .map { case (_, airline, _, _, _, _, _, depDelay, _, _, _) => (airline, (depDelay, 1))}
        .groupByKey()

    val arlineMap = airlineStats
        .mapValues { iter => iter.foldLeft((0.0, 0)) { case ((sum, cnt), (delay, one)) => (sum + delay, cnt + one)}}

    val airlineAvgDelay = arlineMap.mapValues { case (sum, cnt) => sum / cnt }

    // Build class
    val airlineClass = airlineAvgDelay.mapValues {
        case d if d < 5.0  => "Good"
        case d if d < 15.0 => "Average"
        case _             => "Bad"
    }

    // Re-key flights by airline
    val flightsByAirline = flights
        .map { case (date, airline, origin, dest, cancelled, diverted, depTime, depDelay, airTime, year, month) =>
            (airline, (date, origin, dest, cancelled, diverted, depTime, depDelay, airTime, year, month))
        }

    // Join
    val taggedFlights = flightsByAirline
        .join(airlineClass) 
        .map { case (_, (flight, cls)) =>
            val (date, origin, dest, cancelled, diverted, depTime, depDelay, airTime, year, month) = flight
            ((origin, dest, cls), 1)
        }

    // Count by route+class with groupByKey
    val routeClassCounts = (taggedFlights
        .groupByKey()
        .mapValues(_.size)
    )

    val perClassLists = routeClassCounts.map {
        case ((o, d, c), cnt) => (c, List(((o, d), cnt)))
    }

    val classOrder = Map("Bad" -> 1, "Average" -> 2, "Good" -> 3)

    // Top5 with reduceByKey
    val top5List = perClassLists
        .reduceByKey { (l1, l2) => (l1 ++ l2)
            .sortBy(-_._2).take(5)
        }
        .sortBy { case (cls, _) => classOrder(cls) }
    
    val top5 = top5List
        .flatMap { case (cls, list) =>
            list.map { case ((o, d), cnt) =>
                f"$cls%-7s | $o%-4s - $d%-4s | Count: $cnt%6d"
            }
        }

    top5
}


runYear: (flights: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)], flightYear: String)org.apache.spark.rdd.RDD[String]


In [24]:
val startNonOpt = System.currentTimeMillis()

val res2019 = runYear(flights2019, "2019")
val res2020 = runYear(flights2020, "2020")
val res2021 = runYear(flights2021, "2021")
val res2022 = runYear(flights2022, "2022")

val allResults = sc.union(res2019, res2020, res2021, res2022)

println(s"\n=== NON-OPTIMIZED TOP 5 ROUTES PER CLASS ===")
allResults.collect().foreach(println)

logJobExecution(startNonOpt, "NonOptimized-2019-2022", path_to_output + "non_optimized_job_execution_times.txt")


=== NON-OPTIMIZED TOP 5 ROUTES PER CLASS ===
Bad     | Inc. - EWR  | Count:   3999
Bad     | LLC d/b/a United Express - DTW  | Count:   2847
Bad     | LLC d/b/a United Express - ORD  | Count:   2133
Bad     | Inc. - IAD  | Count:   1891
Bad     | LLC d/b/a United Express - DEN  | Count:   1589
Average | LAX  - SFO  | Count:   3420
Average | SFO  - LAX  | Count:   3238
Average | LGA  - ORD  | Count:   2942
Average | ORD  - LGA  | Count:   2877
Average | LAX  - LAS  | Count:   2569
Good    | OGG  - HNL  | Count:   1961
Good    | HNL  - OGG  | Count:   1949
Good    | HNL  - KOA  | Count:   1401
Good    | KOA  - HNL  | Count:   1340
Good    | HNL  - LIH  | Count:   1295
Average | LLC d/b/a United Express - ORD  | Count:   1574
Average | Inc. - IAD  | Count:    718
Average | Inc. - EWR  | Count:    614
Average | Inc. - IAH  | Count:    349
Average | LLC d/b/a United Express - EWR  | Count:    260
Good    | SFO  - LAX  | Count:    889
Good    | LAX  - SFO  | Count:    

startNonOpt: Long = 1749072101535
res2019: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[337] at flatMap at <console>:89
res2020: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[357] at flatMap at <console>:89
res2021: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[377] at flatMap at <console>:89
res2022: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[397] at flatMap at <console>:89
allResults: org.apache.spark.rdd.RDD[String] = UnionRDD[398] at union at <console>:54


## 4. Optimized Pipeline (Sample)

In [9]:
// Union the three year‑specific RDDs
val allFlights = sc.union(flights2019, flights2020, flights2021, flights2022).cache()

allFlights: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)] = UnionRDD[16] at union at <console>:39


In [23]:
def runAllFlights(
        allFlights: RDD[(String,String,String,String,Boolean,Boolean,Int,Double,Double,Int,Int)]
): Unit = {

    // Average depDelay per airline
    val airlineAvgDelay = allFlights
        .map { case (_, airline, _, _, _, _, _, depDelay, _, _, _) => (airline, (depDelay, 1)) }
        .reduceByKey { case ((s1, c1), (s2, c2)) => (s1 + s2, c1 + c2) }
        .mapValues { case (sum, cnt) => sum / cnt }
    
    // Build class
    val airlineClass = airlineAvgDelay.mapValues {
        case d if d < 5.0  => "Good"
        case d if d < 15.0 => "Average"
        case _             => "Bad"
    }

    // Re-key flights by airline
    val flightsByAirline = allFlights
        .map { case (_, airline, origin, dest, _, _, _, _, _, _, _) =>
            (airline, (origin, dest))
        }.join(airlineClass)
        .map { case (_, ((o, d), cls)) => ((o, d, cls), 1) }

    // Count by route+class with reduceByKey
    val routeClassCounts = flightsByAirline
        .reduceByKey(_ + _)

    val perClassLists = routeClassCounts
        .map { case ((o, d, cls), cnt) => (cls, ((o, d), cnt))}

    val classOrder = Map("Bad" -> 1, "Average" -> 2, "Good" -> 3)

    // Top5 with aggregateByKey
    val top5 = perClassLists
        .aggregateByKey(List.empty[((String, String), Int)])(
            (acc, v) => (acc :+ v)
                .sortBy(-_._2)
                .take(5),
            (acc1, acc2) => (acc1 ++ acc2)
                .sortBy(-_._2)
                .take(5)
        ).mapValues(_.toSeq)

    // Top5 print
    println(s"\n=== OPTIMIZED TOP5 ROUTES PER CLASS ===")
    top5
        .flatMap { case (cls, seqRoutes) =>
            seqRoutes.map { case ((o, d), cnt) =>
                f"$cls%-7s | $o%-4s - $d%-4s | Count: $cnt%6d"
            }
        }
        .collect()
        .foreach(println)

    // Cleanup cached RDDs
    allFlights.unpersist()
}


runAllFlights: (allFlights: org.apache.spark.rdd.RDD[(String, String, String, String, Boolean, Boolean, Int, Double, Double, Int, Int)])Unit


In [25]:
val startOpt = System.currentTimeMillis()
runAllFlights(allFlights)
logJobExecution(startOpt, "Optimized-2019-2022", path_to_output + "optimized_job_execution_times.txt")


=== OPTIMIZED TOP5 ROUTES PER CLASS ===
Good    | HNL  - OGG  | Count:   3481
Good    | OGG  - HNL  | Count:   3415
Good    | SEA  - PDX  | Count:   2838
Good    | PDX  - SEA  | Count:   2767
Good    | HNL  - KOA  | Count:   2507
Average | ORD  - LGA  | Count:   4878
Average | LAX  - SFO  | Count:   4864
Average | LGA  - ORD  | Count:   4864
Average | SFO  - LAX  | Count:   4780
Average | LAX  - LAS  | Count:   4304
Bad     | LLC d/b/a United Express - ORD  | Count:   5700
Bad     | Inc. - EWR  | Count:   4818
Bad     | Inc. - IAH  | Count:   4171
Bad     | Inc. - IAD  | Count:   3436
Bad     | LLC d/b/a United Express - DTW  | Count:   3122
[Wed Jun 04 23:22:43 CEST 2025] Job 'Optimized-2019-2022' completed in 17627 ms.


startOpt: Long = 1749072146046
