In [0]:
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("departure_datetime", TimestampType, true),
  StructField("arrival_datetime", TimestampType, true),
  StructField("airlines", StringType, true),
  StructField("travel_time", IntegerType, true),
  StructField("origin", StringType, true),
  StructField("destination", StringType, true),
  StructField("layover_n", IntegerType, true),
  StructField("layover_time", IntegerType, true),
  StructField("layover_location", StringType, true),
  StructField("price_eur", IntegerType, true),
  StructField("price_trend", StringType, true),
  StructField("price_value", StringType, true),
  StructField("access_date", StringType, true),
  StructField("one_way", BooleanType, true),
  StructField("has_train", BooleanType, true),
  StructField("days_advance", IntegerType, true)
))

In [1]:
// load data
val directoryPath = "/user/zc2398_nyu_edu/flight-data/"
val rawDF = spark.read.option("header", "true").option("mode", "PERMISSIVE").schema(schema).csv(directoryPath)


In [2]:
// check for properload
rawDF.show(50)

In [3]:
rawDF.count

In [4]:
// select only useful df
val usefulDF = rawDF.select("departure_datetime", "arrival_datetime","airlines","origin","destination","layover_n", "layover_location", "price_eur", "access_date").filter(col("departure_datetime").isNotNull)

In [5]:
usefulDF.count

In [6]:
// basic cleaning: remove codeshare, remove null
import org.apache.spark.sql.functions.udf

val primary = udf((airlines: String) => {
    val arr = airlines.replaceAll("^\\[|\\]$|'", "").split(", ")

    arr(0) match {
        case "Jet"=> "JetBlue"
        case "Separate tickets booked together"=> ""
        case _ => arr(0)
    }
})

val cleanLocation = udf((loc: String) => {
    if(loc == null || loc=="min" || loc=="Change of airports"){
       Array.empty[String]
    }else{
        Array(loc)
    }
})



val cleanDF = usefulDF.withColumn("airlines", primary(col("airlines"))).withColumnRenamed("airlines", "airline").filter(length(col("airline"))>0).withColumn("layover_location",cleanLocation(col("layover_location"))).withColumnRenamed("layover_location","layovers").withColumnRenamed("layover_n", "layover_number")
cleanDF.show(50)

In [7]:
// load and clesn airport data
val airportPath = "/user/zc2398_nyu_edu/airport-codes_csv.csv"
val airportDF = spark.read.option("header", true).csv(airportPath).filter((col("type")==="large_airport" || col("type")==="medium_airport") && col("iata_code").isNotNull).select("iso_country","municipality", "iata_code")


In [8]:
val countryMap: Map[String, String] = Map(
  "US" -> "United States",
  "GB" -> "United Kingdom",
  "CA" -> "Canada",
  "DE" -> "Germany",
  "FR" -> "France",
  "JP" -> "Japan",
  "CN" -> "China",
  "IN" -> "India",
  "BR" -> "Brazil",
  "RU" -> "Russia",
  "AU" -> "Australia",
  "ZA" -> "South Africa",
  "NZ" -> "New Zealand",
  "ES" -> "Spain",
  "IT" -> "Italy",
  "MX" -> "Mexico",
  "KR" -> "South Korea",
  "SE" -> "Sweden",
  "NO" -> "Norway",
  "FI" -> "Finland",
  "DK" -> "Denmark",
  "PL" -> "Poland",
  "NG" -> "Nigeria",
  "EG" -> "Egypt",
  "SA" -> "Saudi Arabia",
  "AE" -> "United Arab Emirates",
  "SG" -> "Singapore",
  "TH" -> "Thailand",
  "MY" -> "Malaysia",
  "PH" -> "Philippines",
  "AT" -> "Austria",
  "BE" -> "Belgium",
  "CH" -> "Switzerland",
  "CZ" -> "Czech Republic",
  "GR" -> "Greece",
  "HU" -> "Hungary",
  "IE" -> "Ireland",
  "NL" -> "Netherlands",
  "PT" -> "Portugal",
  "RO" -> "Romania",
  "SK" -> "Slovakia",
  "SI" -> "Slovenia",
  "UA" -> "Ukraine",
  "HR" -> "Croatia",
  "LV" -> "Latvia",
  "LT" -> "Lithuania",
  "EE" -> "Estonia",
  "BG" -> "Bulgaria",
  "IS" -> "Iceland",
)

In [9]:
// transform airport code to city and country
val broadcastCountryMap = sc.broadcast(countryMap)
val toFullName = udf((code: String) => {
    broadcastCountryMap.value.getOrElse(code, "Unknown")
})


val originDF = cleanDF.join(airportDF, cleanDF("origin")===airportDF("iata_code")).withColumn("iso_country", toFullName(col("iso_country"))).withColumnRenamed("iso_country","origin_country").withColumnRenamed("municipality", "origin_city").drop("iata_code").drop("origin").cache()
val transformedDF = originDF.join(airportDF, originDF("destination")===airportDF("iata_code")).withColumn("iso_country", toFullName(col("iso_country"))).withColumnRenamed("iso_country","destination_country").withColumnRenamed("municipality", "destination_city").drop("iata_code").drop("destination")

In [10]:
transformedDF.show(5)

In [11]:
// get currency table

val currencyDataPath = "/user/zc2398_nyu_edu/USD_EUR.csv"
val currSchema = StructType(Array(
    StructField("date", DateType, true),
    StructField("eur", DoubleType, true)
    ))
val currencyDF = spark.read.option("headers", "true").schema(currSchema).csv(currencyDataPath).filter(col("date").isNotNull).cache()
currencyDF.show(10)



In [12]:
// convert currency
val transformedAccessDF = transformedDF.withColumn("access_date_as_date", to_date(col("access_date")))
val getUSD = udf((eur:Double, rate:Double) =>{
    eur / rate
})

val finalDF = transformedAccessDF.join(currencyDF, transformedAccessDF("access_date_as_date") === currencyDF("date"), "left").withColumn("price", round(getUSD(col("price_eur"), col("eur")),2)).drop("eur", "access_date_as_date", "date", "price_eur").cache()



In [13]:
finalDF.write
  .mode("overwrite") // Overwrite any existing files
  .parquet("/user/zc2398_nyu_edu/flight-data-cleaned")

In [14]:
finalDF.withColumn("layovers", element_at(col("layovers"), 1)).write
  .option("header", "true") // Include column headers
  .option("sep", ",")
  .mode("overwrite") // Overwrite any existing files
  .csv("/user/zc2398_nyu_edu/flight-data-cleaned-csv")