# Debug Notebook

This notebook is used to debug and test the Spark environment and configurations.

## Setup

In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.io.{File, PrintWriter}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.io.{File, PrintWriter}


In [2]:
val path_to_datasets = "../../../../datasets/"
val path_to_output = "../../../../src/main/outputs/"

val path_to_sample = path_to_datasets + "itineraries_sample.csv"

val spark = SparkSession.builder
  .appName("Debug")
  .getOrCreate()

path_to_datasets: String = ../../../../datasets/
path_to_output: String = ../../../../src/main/outputs/
path_to_sample: String = ../../../../datasets/itineraries_sample.csv
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@568f509d


## Data loading, parsing, and cleaning

The structure of the dataset is as follows:
- **legId**: An identifier for the flight.
- **searchDate**: The date (`YYYY-MM-DD`) on which this entry was taken from Expedia.
- **flightDate**: The date (`YYYY-MM-DD`) of the flight.
- **startingAirport**: Three-character IATA airport code for the initial location.
- **destinationAirport**: Three-character IATA airport code for the arrival location.
- **fareBasisCode**: The fare basis code.
- **travelDuration**: The travel duration in hours and minutes.
- **elapsedDays**: The number of elapsed days (usually 0).
- **isBasicEconomy**: Boolean indicating whether the ticket is for basic economy.
- **isRefundable**: Boolean indicating whether the ticket is refundable.
- **isNonStop**: Boolean indicating whether the flight is non-stop.
- **baseFare**: The price of the ticket (in USD).
- **totalFare**: The price of the ticket (in USD) including taxes and other fees.
- **seatsRemaining**: Integer indicating the number of seats remaining.
- **totalTravelDistance**: The total travel distance in miles. This data is sometimes missing.
- **segmentsDepartureTimeEpochSeconds**: String containing the departure time (Unix time) for each leg of the trip, separated by `||`.
- **segmentsDepartureTimeRaw**: String containing the departure time (ISO 8601 format: `YYYY-MM-DDThh:mm:ss.000±[hh]:00`) for each leg of the trip, separated by `||`.
- **segmentsArrivalTimeEpochSeconds**: String containing the arrival time (Unix time) for each leg of the trip, separated by `||`.
- **segmentsArrivalTimeRaw**: String containing the arrival time (ISO 8601 format: `YYYY-MM-DDThh:mm:ss.000±[hh]:00`) for each leg of the trip, separated by `||`.
- **segmentsArrivalAirportCode**: String containing the IATA airport code for the arrival location for each leg of the trip, separated by `||`.
- **segmentsDepartureAirportCode**: String containing the IATA airport code for the departure location for each leg of the trip, separated by `||`.
- **segmentsAirlineName**: String containing the name of the airline that services each leg of the trip, separated by `||`.
- **segmentsAirlineCode**: String containing the two-letter airline code that services each leg of the trip, separated by `||`.
- **segmentsEquipmentDescription**: String containing the type of airplane used for each leg of the trip (e.g., "Airbus A321" or "Boeing 737-800"), separated by `||`.
- **segmentsDurationInSeconds**: String containing the duration of the flight (in seconds) for each leg of the trip, separated by `||`.
- **segmentsDistance**: String containing the distance traveled (in miles) for each leg of the trip, separated by `||`.
- **segmentsCabinCode**: String containing the cabin class for each leg of the trip (e.g., "coach"), separated by `||`.


In [3]:
case class Flight(
    legId: String,
    searchDate: LocalDate,
    flightDate: LocalDate,
    startingAirport: String,
    destinationAirport: String,
    fareBasisCode: String,
    travelDuration: Int,
    elapsedDays: Int,
    isBasicEconomy: Boolean,
    isRefundable: Boolean,
    isNonStop: Boolean,
    baseFare: Double,
    totalFare: Double,
    seatsRemaining: Int,
    totalTravelDistance: Double,
    segmentsDepartureTimeEpochSeconds: String,
    segmentsDepartureTimeRaw: String,
    segmentsArrivalTimeEpochSeconds: String,
    segmentsArrivalTimeRaw: String,
    segmentsArrivalAirportCode: String,
    segmentsDepartureAirportCode: String,
    segmentsAirlineName: String,
    segmentsAirlineCode: String,
    segmentsEquipmentDescription: String,
    segmentsDurationInSeconds: String,
    segmentsDistance: String,
    segmentsCabinCode: String
)

defined class Flight


In [4]:
def parseLine(line: String): Option[Flight] = {
    try {
        val cols = line.split(",", -1).map(_.trim)
        val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

        def parseDurationToMinutes(duration: String): Int = {
            val hourPattern = "PT(\\d+)H".r
            val minutePattern = "PT(?:\\d+H)?(\\d+)M".r

            val hours = hourPattern.findFirstMatchIn(duration).map(_.group(1).toInt).getOrElse(0)
            val minutes = minutePattern.findFirstMatchIn(duration).map(_.group(1).toInt).getOrElse(0)

            hours * 60 + minutes
        }

        def toIntSafe(s: String): Int = try { s.toInt } catch { case _: Exception => 0 }
        def toDoubleSafe(s: String): Double = try { s.toDouble } catch { case _: Exception => 0.0 }
        def toBooleanSafe(s: String): Boolean = try { s.toBoolean } catch { case _: Exception => false }

        val flight = Flight(
            legId = cols(0),
            searchDate = LocalDate.parse(cols(1), dateFormatter),
            flightDate = LocalDate.parse(cols(2), dateFormatter),
            startingAirport = cols(3),
            destinationAirport = cols(4),
            fareBasisCode = cols(5),
            travelDuration = parseDurationToMinutes(cols(6)),
            elapsedDays = toIntSafe(cols(7)),
            isBasicEconomy = toBooleanSafe(cols(8)),
            isRefundable = toBooleanSafe(cols(9)),
            isNonStop = toBooleanSafe(cols(10)),
            baseFare = toDoubleSafe(cols(11)),
            totalFare = toDoubleSafe(cols(12)),
            seatsRemaining = toIntSafe(cols(13)),
            totalTravelDistance = toDoubleSafe(cols(14)),
            segmentsDepartureTimeEpochSeconds = cols(15),
            segmentsDepartureTimeRaw = cols(16),
            segmentsArrivalTimeEpochSeconds = cols(17),
            segmentsArrivalTimeRaw = cols(18),
            segmentsArrivalAirportCode = cols(19),
            segmentsDepartureAirportCode = cols(20),
            segmentsAirlineName = cols(21),
            segmentsAirlineCode = cols(22),
            segmentsEquipmentDescription = cols(23),
            segmentsDurationInSeconds = cols(24),
            segmentsDistance = cols(25),
            segmentsCabinCode = cols(26)
        )

        Some(flight)
    } catch {
        case e: Exception =>
            println(s"[PARSE ERROR] Line: $line\nError: ${e.getMessage}")
            None
    }
}

parseLine: (line: String)Option[Flight]


### Data parsing

In [5]:
var rddRaw = spark.sparkContext.textFile(path_to_sample).filter(line => !line.startsWith("legId"))
var rddParsed = rddRaw.flatMap(parseLine)

rddRaw: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:32
rddParsed: org.apache.spark.rdd.RDD[Flight] = MapPartitionsRDD[3] at flatMap at <console>:33


## Jobs

- **Average Flight Price per Route with Classification:** Calculate the average ticket price for each departure-destination pair, then classify each route as "Cheap," "Moderate," or "Expensive" based on the calculated average price.
- **Direct vs. Connecting Flights Price Comparison:**  Compare average ticket prices between non-stop and connecting flights on each route to analyze fare differences by flight type.

### Non-optimized version

In [6]:
def nonOptimizedFlightPriceAnalysis(flights: RDD[Flight]): Unit = {

    // Job 1: Average Price per Route and classification
    val routeAvg = flights.map(f => ((f.startingAirport, f.destinationAirport), f.totalFare))
        .groupByKey()
        .mapValues { fares =>
            val (sum, count) = fares.foldLeft((0.0, 0)) { case ((s, c), fare) => (s + fare, c + 1) }
            sum / count
        }

    val routeClass = routeAvg.map { case ((origin, dest), avgFare) =>
            val category = if (avgFare < 200) "Cheap"
            else if (avgFare <= 500) "Moderate"
            else "Expensive"
            ((origin, dest), category)
        }

    val joinedFlights = flights
        .map(f => ((f.startingAirport, f.destinationAirport), f))
        .join(routeClass)

    val flightsByClass = joinedFlights.map { case (_, (flight, routeClass)) =>
            (routeClass, flight.totalFare)
        }
        .groupByKey()
        .mapValues { fares =>
            val (sum, count) = fares.foldLeft((0.0, 0)) { case ((s, c), fare) => (s + fare, c + 1) }
            (sum / count, count)
        }

    println("\n--- Flight count and average price per route class: ---")
    flightsByClass.map { case (routeClass, (avgFare, totalCount)) =>
            s"Class: $routeClass | Avg Fare: $$${"%.2f".format(avgFare)} | Flights: $totalCount"
        }.foreach(println)

    // Job 2: Average Price by Direct vs. Connecting Flights
    val routeNonStopPairs = flights.map { flight =>
            ((flight.startingAirport, flight.destinationAirport, flight.isNonStop), flight.totalFare)
        }

    val routeNonStopAverage = routeNonStopPairs
        .groupByKey()
        .mapValues { fares =>
            val (sum, count) = fares.foldLeft((0.0, 0)) { case ((s, c), fare) => (s + fare, c + 1) }
            sum / count
        }

    val routeGrouped = routeNonStopAverage.map { case ((origin, dest, isNonStop), avgFare) =>
            ((origin, dest), (isNonStop, avgFare))
        }
        .groupByKey()

    println("\n--- Average flight price per route with Direct and Connecting prices: ---")
    routeGrouped.map { case ((origin, dest), faresIterable) =>
            val fares = faresIterable.toMap
            val directFare = fares.getOrElse(true, Double.NaN)
            val connectingFare = fares.getOrElse(false, Double.NaN)
            s"$origin -> $dest | Direct: $$${"%.2f".format(directFare)} | Connecting: $$${"%.2f".format(connectingFare)}"
        }.foreach(println)
}

nonOptimizedFlightPriceAnalysis: (flights: org.apache.spark.rdd.RDD[Flight])Unit


In [16]:
var startTimeNonOptimized = System.currentTimeMillis()

nonOptimizedFlightPriceAnalysis(rddParsed)

var endTimeNonOptimized = System.currentTimeMillis()
var durationNonOptimized = endTimeNonOptimized - startTimeNonOptimized


--- Flight count and average price per route class: ---
Class: Cheap | Avg Fare: $176,02 | Flights: 143305
Class: Expensive | Avg Fare: $577,63 | Flights: 201958
Class: Moderate | Avg Fare: $328,91 | Flights: 2117951

--- Average flight price per route with Direct and Connecting prices: ---
IAD -> LGA | Direct: $178,13 | Connecting: $427,71
LAX -> CLT | Direct: $453,48 | Connecting: $449,64
LGA -> SFO | Direct: $NaN | Connecting: $477,02
CLT -> SFO | Direct: $518,22 | Connecting: $508,04
IAD -> EWR | Direct: $217,14 | Connecting: $347,08
LAX -> DEN | Direct: $190,38 | Connecting: $287,37
ORD -> JFK | Direct: $211,79 | Connecting: $467,43
BOS -> IAD | Direct: $138,86 | Connecting: $317,00
DTW -> ORD | Direct: $196,25 | Connecting: $268,04
DFW -> ORD | Direct: $247,17 | Connecting: $296,25
CLT -> PHL | Direct: $257,07 | Connecting: $289,65
EWR -> LAX | Direct: $331,18 | Connecting: $364,27
ORD -> PHL | Direct: $207,05 | Connecting: $282,56
OAK -> JFK | Direct: $NaN | C

startTimeNonOptimized: Long = 1751658889333
endTimeNonOptimized: Long = 1751658917951
durationNonOptimized: Long = 28618


### Optimized version


In [8]:
def optimizedFlightPriceAnalysis(flights: RDD[Flight]): Unit = {

    val routeAggregates = flights
        .map { flight =>
            val routeKey = (flight.startingAirport, flight.destinationAirport)
            val fare = flight.totalFare
            val isDirect = flight.isNonStop
            ((routeKey), (fare, 1, Map(isDirect -> (fare, 1))))
        }
        .reduceByKey { case ((fareSum1, count1, map1), (fareSum2, count2, map2)) =>
            val mergedMap = map1 ++ map2.map { case (k, v) =>
                k -> (v._1 + map1.getOrElse(k, (0.0, 0))._1, v._2 + map1.getOrElse(k, (0.0, 0))._2)
            }
            (fareSum1 + fareSum2, count1 + count2, mergedMap)
        }
        .cache()

    // Extract results per route (Job 2) + classify (Job 1)
    val perRouteResults = routeAggregates.map { case ((origin, dest), (totalFare, totalCount, directMap)) =>
            val routeAvg = totalFare / totalCount
            val routeClass = if (routeAvg < 200) "Cheap"
            else if (routeAvg <= 500) "Moderate"
            else "Expensive"

            val directAvg = directMap.get(true).map { case (sum, cnt) => sum / cnt }.getOrElse(Double.NaN)
            val connectingAvg = directMap.get(false).map { case (sum, cnt) => sum / cnt }.getOrElse(Double.NaN)

            ((origin, dest), routeClass, routeAvg, directAvg, connectingAvg, totalCount)
        }

    // Prepare Job 1 aggregates (class averages and counts)
    val classAggregates = perRouteResults
        .map { case (_, routeClass, routeAvg, _, _, totalCount) =>
            (routeClass, (routeAvg * totalCount, totalCount))
        }
        .reduceByKey { case ((sum1, count1), (sum2, count2)) =>
            (sum1 + sum2, count1 + count2)
        }
        .mapValues { case (totalSum, totalCount) =>
            (totalSum / totalCount, totalCount)
        }

    println("\n--- Flight count and average price per route class: ---")
    classAggregates.map { case (routeClass, (avgFare, count)) =>
            s"Class: $routeClass | Avg Fare: $$${"%.2f".format(avgFare)} | Flights: $count"
        }.foreach(println)

    println("\n--- Average flight price per route with Direct and Connecting prices: ---")
    perRouteResults.map { case ((origin, dest), _, _, directAvg, connectingAvg, _) =>
            s"$origin -> $dest | Direct: $$${"%.2f".format(directAvg)} | Connecting: $$${"%.2f".format(connectingAvg)}"
        }.foreach(println)

    routeAggregates.unpersist() // Clean up cached RDD
}

optimizedFlightPriceAnalysis: (flights: org.apache.spark.rdd.RDD[Flight])Unit


In [9]:
var startTimeOptimized = System.currentTimeMillis()

optimizedFlightPriceAnalysis(rddParsed)

var endTimeOptimized = System.currentTimeMillis()
var durationOptimized = endTimeOptimized - startTimeOptimized


--- Flight count and average price per route class: ---
Class: Cheap | Avg Fare: $176,02 | Flights: 143305
Class: Expensive | Avg Fare: $577,63 | Flights: 201958
Class: Moderate | Avg Fare: $328,91 | Flights: 2117951

--- Average flight price per route with Direct and Connecting prices: ---
SFO -> BOS | Direct: $444,36 | Connecting: $442,00
EWR -> ORD | Direct: $200,38 | Connecting: $239,28
DEN -> LGA | Direct: $261,81 | Connecting: $272,63
MIA -> EWR | Direct: $202,70 | Connecting: $323,18
DEN -> ORD | Direct: $259,38 | Connecting: $351,12
DFW -> ORD | Direct: $247,17 | Connecting: $296,25
ORD -> ATL | Direct: $217,99 | Connecting: $228,12
EWR -> LAX | Direct: $331,18 | Connecting: $364,27
ATL -> LGA | Direct: $205,85 | Connecting: $198,48
JFK -> DFW | Direct: $224,92 | Connecting: $469,71
SFO -> CLT | Direct: $568,13 | Connecting: $515,32
SFO -> DFW | Direct: $305,07 | Connecting: $336,95
BOS -> EWR | Direct: $128,78 | Connecting: $254,83
IAD -> ORD | Direct: $228,

startTimeOptimized: Long = 1751658786557
endTimeOptimized: Long = 1751658791937
durationOptimized: Long = 5380


## Time comparison and log from both jobs

In [11]:
def logOutput(jobName: String, outputPath: String, duration: Long): Unit = {
    val writer = new PrintWriter(new File(outputPath))
    writer.write(s"Job '$jobName' completed in $duration ms.\n")
    writer.close()
}

logOutput: (jobName: String, outputPath: String, duration: Long)Unit


In [14]:
val path_non_optimized = path_to_output + "non_optimized_job.log"

println(s"\n [TIME] Non-optimized job duration: ${durationNonOptimized} ms")
logOutput("Non-Optimized", path_non_optimized, durationNonOptimized)


 [TIME] Non-optimized job duration: 34657 ms


path_non_optimized: String = ../../../../src/main/outputs/non_optimized_job.log


In [15]:
val path_optimized = path_to_output + "optimized_job.log"

println(s"\n [TIME] Optimized job duration: ${durationOptimized} ms")
logOutput("Optimized", path_optimized, durationOptimized)


 [TIME] Optimized job duration: 5380 ms


path_optimized: String = ../../../../src/main/outputs/optimized_job.log
