# Intro

In [2]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import java.time.LocalTime
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.expressions.Window


// import spark.implicits._

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

import spark.sqlContext.implicits._


val bucket = "dataproc-temp-us-central1-1044206227610-i54vpwyj"
spark.conf.set("temporaryGcsBucket", bucket)

println("Scala language: "+util.Properties.versionString)

// spark.sparkContext.version
spark.version


Scala language: version 2.12.18


spark = org.apache.spark.sql.SparkSession@493ed6e4
bucket = dataproc-temp-us-central1-1044206227610-i54vpwyj


3.3.2

---

# Read

In [3]:
val dropList = List("OP_CARRIER_AIRLINE_ID", "OP_CARRIER_FL_NUM")

val df_flights = spark.read.format("bigquery")
    .option("table","flight-project-401118.silver.flights")
    .load()
    .drop(dropList : _*) 
    .withColumnRenamed("ORIGIN_AIRPORT_ID", "DEP_AIRPORT_ID")
    .withColumnRenamed("DEST_AIRPORT_ID", "ARR_AIRPORT_ID")

print(df_flights.columns.size, df_flights.count())

(8,17920326)

dropList = List(OP_CARRIER_AIRLINE_ID, OP_CARRIER_FL_NUM)
df_flights = [DEP_AIRPORT_ID: bigint, ARR_AIRPORT_ID: bigint ... 6 more fields]


[DEP_AIRPORT_ID: bigint, ARR_AIRPORT_ID: bigint ... 6 more fields]

In [4]:
df_flights.show(3)

+--------------+--------------+-------------+----------------+-------------+---------+-------------------+----------+
|DEP_AIRPORT_ID|ARR_AIRPORT_ID|ARR_DELAY_NEW|CRS_ELAPSED_TIME|WEATHER_DELAY|NAS_DELAY|        FL_DATETIME|IS_DELAYED|
+--------------+--------------+-------------+----------------+-------------+---------+-------------------+----------+
|         12889|         11298|           15|             165|            0|        0|2013-07-01 15:40:00|         0|
|         10397|         10431|            0|              59|            0|        0|2013-07-01 09:05:00|         0|
|         13930|         14492|            6|             118|            0|        0|2013-07-01 06:37:00|         0|
+--------------+--------------+-------------+----------------+-------------+---------+-------------------+----------+
only showing top 3 rows



In [5]:
val df_weather =
  (spark.read.format("bigquery")
  .option("table","flight-project-401118.silver.weather")
  .load()
  .cache())

print(df_weather.columns.size, df_weather.count())

(24,32631312)

df_weather = [WBAN: bigint, Date: bigint ... 22 more fields]


[WBAN: bigint, Date: bigint ... 22 more fields]

In [6]:
val df_wban = spark.read.format("bigquery")
  .option("table","flight-project-401118.silver.wban")
  .load()

print(df_wban.columns.size, df_wban.count())

(3,305)

df_wban = [AirportID: bigint, WBAN: bigint ... 1 more field]


[AirportID: bigint, WBAN: bigint ... 1 more field]

---

# Preprocessing & feat. engineering

https://stackoverflow.com/questions/53765077/rounding-hours-of-datetime-in-pyspark

## flight

In [7]:
val df_flights_wban = df_flights

    // join the WBAN datasets 2 times for DEP & ARR airports
    .join(df_wban, df_wban("AirportID") === df_flights("ARR_AIRPORT_ID"), "inner")
    .withColumnRenamed("WBAN", "WBAN_ARR")
    .withColumnRenamed("TimeZone", "TimeZone_ARR")
    .drop("AirportID")
    .join(df_wban, df_wban("AirportID") === df_flights("DEP_AIRPORT_ID"), "inner")
    .withColumnRenamed("WBAN", "WBAN_DEP")
    .withColumnRenamed("TimeZone", "TimeZone_DEP")
    .drop("AirportID")

    // round departure date time to the nearest hour
    .withColumnRenamed("FL_DATETIME", "LOCAL_DEP_DT")
    .withColumn("DEP_hour", hour((round(unix_timestamp(col("LOCAL_DEP_DT")) / 3600) * 3600).cast("timestamp")))
    .withColumn("LOCAL_DEP_DT_RND", unix_timestamp(to_date(col("LOCAL_DEP_DT"),"yyyy-MM-dd")))
    .withColumn("LOCAL_DEP_DT_RND", col("LOCAL_DEP_DT_RND") + col("DEP_hour") * 60 * 60)
    .withColumn("LOCAL_DEP_DT_RND", to_timestamp(from_unixtime(col("LOCAL_DEP_DT_RND"), "yyyy-MM-dd HH:mm:ss")))
    .drop(col("DEP_hour"))

    // compute LOCAL arrival datetime
    .withColumn("DEP_DT", col("LOCAL_DEP_DT"))
    .withColumn("DEP_DT", unix_timestamp(date_format(col("DEP_DT"), "yyyy-MM-dd HH:mm:ss")))
    .withColumn("LOCAL_ARR_DT", col("DEP_DT") + col("CRS_ELAPSED_TIME") * 60  + col("ARR_DELAY_NEW") * 60 + (col("TimeZone_ARR") - col("TimeZone_DEP")) * 60 * 60)
    .withColumn("LOCAL_ARR_DT", to_timestamp(from_unixtime(col("LOCAL_ARR_DT"), "yyyy-MM-dd HH:mm:ss")))
    .drop(col("DEP_DT")) 

    // round to the hour
    .withColumn("ARR_hour", hour((round(unix_timestamp(col("LOCAL_ARR_DT")) / 3600) * 3600).cast("timestamp")))
    .withColumn("LOCAL_ARR_DT_RND", unix_timestamp(to_date(col("LOCAL_ARR_DT"),"yyyy-MM-dd HH:mm:ss")))
    .withColumn("LOCAL_ARR_DT_RND", col("LOCAL_ARR_DT_RND") + col("ARR_hour") * 60 * 60)
    .withColumn("LOCAL_ARR_DT_RND", to_timestamp(from_unixtime(col("LOCAL_ARR_DT_RND"), "yyyy-MM-dd HH:mm:ss")))
    .withColumn("LOCAL_ARR_DT_RND", (col("LOCAL_ARR_DT_RND")))
    .drop(col("ARR_hour")) 


df_flights_wban.write.parquet("gs://dataset-flight/final-df_flights_wban.parquet")

// .drop(List("DEP_AIRPORT_ID", "ARR_AIRPORT_ID", "AirportID") : _*)  // ajouter TimeZone_DEP & TimeZone_ARR après
// df_flights_wban.drop(col("WEATHER_DELAY")).drop(col("WBAN_ARR")).drop(col("WBAN_DEP")).filter(col("TimeZone_ARR") =!= col("TimeZone_DEP")).show(3) // 2012-01-10 00:05:00

df_flights_wban = [DEP_AIRPORT_ID: bigint, ARR_AIRPORT_ID: bigint ... 13 more fields]


[DEP_AIRPORT_ID: bigint, ARR_AIRPORT_ID: bigint ... 13 more fields]

In [8]:
df_flights_wban.show(3)

+--------------+--------------+-------------+----------------+-------------+---------+-------------------+----------+--------+------------+--------+------------+-------------------+-------------------+-------------------+
|DEP_AIRPORT_ID|ARR_AIRPORT_ID|ARR_DELAY_NEW|CRS_ELAPSED_TIME|WEATHER_DELAY|NAS_DELAY|       LOCAL_DEP_DT|IS_DELAYED|WBAN_ARR|TimeZone_ARR|WBAN_DEP|TimeZone_DEP|   LOCAL_DEP_DT_RND|       LOCAL_ARR_DT|   LOCAL_ARR_DT_RND|
+--------------+--------------+-------------+----------------+-------------+---------+-------------------+----------+--------+------------+--------+------------+-------------------+-------------------+-------------------+
|         12889|         11298|           15|             165|            0|        0|2013-07-01 15:40:00|         0|    3927|          -6|   23169|          -8|2013-07-01 16:00:00|2013-07-01 20:40:00|2013-07-01 21:00:00|
|         10397|         10431|            0|              59|            0|        0|2013-07-01 09:05:00|      

columns à conserver pour ML:

In [None]:
"DEP_AIRPORT_ID", "ARR_AIRPORT_ID", "LOCAL_DEP_DT_RND", "LOCAL_ARR_DT_RND", "IS_DELAYED"

## weather

In [9]:
println(df_weather.count(), df_weather.columns.size)
df_weather.select("WBAN", "Date", "Time", "StationType", "SkyCondition", "Visibility", "WeatherType", "DryBulbFarenheit", "DryBulbCelsius", "RecordType", "HourlyPrecip").show(3)

(32631312,24)
+-----+--------+----+-----------+-------------+----------+-----------+----------------+--------------+----------+------------+
| WBAN|    Date|Time|StationType| SkyCondition|Visibility|WeatherType|DryBulbFarenheit|DryBulbCelsius|RecordType|HourlyPrecip|
+-----+--------+----+-----------+-------------+----------+-----------+----------------+--------------+----------+------------+
| 3069|20130402|1755|          0|SCT045 OVC060|        10|           |            null|          null|        AA|        null|
|93232|20120912| 953|          0|          CLR|        10|           |            null|          null|        AA|        null|
|53934|20120521| 335|          0|BKN070 BKN120|        10|           |            null|          null|        AA|        null|
+-----+--------+----+-----------+-------------+----------+-----------+----------------+--------------+----------+------------+
only showing top 3 rows



In [10]:
val windowSpec  = Window.partitionBy("WBAN").orderBy("DATETIME_RND")

val df_weather_clean = df_weather
    // select usefull columns
    .select("WBAN", "Date", "Time", "DryBulbFarenheit", "SkyCondition", "Visibility", "WindDirection", "WindSpeed", "StationPressure", "WetBulbFarenheit", "DewPointCelsius")

    // convert date & time to datetime with rounded hours
    .withColumn("StationPressure", round(col("StationPressure"), 2))
    .withColumn("Date", unix_timestamp(from_unixtime(unix_timestamp(col("Date").cast("string"), "yyyyMMdd"))))
    .withColumn("DATETIME_RND", col("Date") + col("Time") * 60)
    .withColumn("DATETIME_RND", to_timestamp(from_unixtime(col("DATETIME_RND"), "yyyy-MM-dd HH:mm:ss")))
    .withColumn("hour", hour((round(unix_timestamp(col("DATETIME_RND")) / 3600) * 3600).cast("timestamp")))
    .withColumn("DATETIME_RND", unix_timestamp(to_date(col("DATETIME_RND"),"yyyy-MM-dd")))
    .withColumn("DATETIME_RND", col("DATETIME_RND") + col("hour") * 60 * 60)
    .withColumn("DATETIME_RND", to_timestamp(from_unixtime(col("DATETIME_RND"), "yyyy-MM-dd HH:mm:ss")))
    .drop(col("hour"))

    // convert Skycondition OHE
    .withColumn("clearSky", when(col("SkyCondition").contains("CLR"), 1).otherwise(0))
    .withColumn("fewClouds", when(col("SkyCondition").contains("FEW"), 1).otherwise(0))
    .withColumn("scatterClouds", when(col("SkyCondition").contains("SCT"), 1).otherwise(0))
    .withColumn("brokenClouds", when(col("SkyCondition").contains("BKN"), 1).otherwise(0))
    .withColumn("overCast", when(col("SkyCondition").contains("OVC"), 1).otherwise(0))
    .withColumn("obscuredSky", when(col("SkyCondition").contains("VV"), 1).otherwise(0))
    .withColumn("partiallyObscuredSky", when(col("SkyCondition").contains("VV"), 1).otherwise(0))
    
    // drop useless, duplicates, & nulls
    .drop(List("Date", "Time", "SkyCondition") : _*)
    .dropDuplicates(Seq("WBAN", "DATETIME_RND"))
    .na.drop(10) // drop rows that have less than thresh non-null values.

    // create lags
    .withColumn("WindDirection1", lag("WindDirection", 1).over(windowSpec))
    .withColumn("WindSpeed1", lag("WindSpeed", 1).over(windowSpec))
    .withColumn("StationPressure1", lag("StationPressure", 1).over(windowSpec))
    .withColumn("WetBulbFarenheit1", lag("WetBulbFarenheit", 1).over(windowSpec))
    .withColumn("DewPointCelsius1", lag("DewPointCelsius", 1).over(windowSpec))
    .withColumn("clearSky1", lag("clearSky", 1).over(windowSpec))
    .withColumn("fewClouds1", lag("fewClouds", 1).over(windowSpec))
    .withColumn("scatterClouds1", lag("scatterClouds", 1).over(windowSpec))
    .withColumn("brokenClouds1", lag("brokenClouds", 1).over(windowSpec))
    .withColumn("overCast1", lag("overCast", 1).over(windowSpec))
    .withColumn("obscuredSky1", lag("obscuredSky", 1).over(windowSpec))
    .withColumn("partiallyObscuredSky1", lag("partiallyObscuredSky", 1).over(windowSpec))

    .withColumn("DryBulbFarenheit2", lag("DryBulbFarenheit", 2).over(windowSpec))
    .withColumn("Visibility2", lag("Visibility", 2).over(windowSpec))
    .withColumn("WindDirection2", lag("WindDirection", 2).over(windowSpec))
    .withColumn("WindSpeed2", lag("WindSpeed", 2).over(windowSpec))
    .withColumn("StationPressure2", lag("StationPressure", 2).over(windowSpec))
    .withColumn("WetBulbFarenheit2", lag("WetBulbFarenheit", 2).over(windowSpec))
    .withColumn("DewPointCelsius2", lag("DewPointCelsius", 2).over(windowSpec))
    .withColumn("clearSky2", lag("clearSky", 2).over(windowSpec))
    .withColumn("fewClouds2", lag("fewClouds", 2).over(windowSpec))
    .withColumn("scatterClouds2", lag("scatterClouds", 2).over(windowSpec))
    .withColumn("brokenClouds2", lag("brokenClouds", 2).over(windowSpec))
    .withColumn("overCast2", lag("overCast", 2).over(windowSpec))
    .withColumn("obscuredSky2", lag("obscuredSky", 2).over(windowSpec))
    .withColumn("partiallyObscuredSky2", lag("partiallyObscuredSky", 2).over(windowSpec))

    .withColumn("DryBulbFarenheit3", lag("DryBulbFarenheit", 3).over(windowSpec))
    .withColumn("Visibility3", lag("Visibility", 3).over(windowSpec))
    .withColumn("WindDirection3", lag("WindDirection", 3).over(windowSpec))
    .withColumn("WindSpeed3", lag("WindSpeed", 3).over(windowSpec))
    .withColumn("StationPressure3", lag("StationPressure", 3).over(windowSpec))
    .withColumn("WetBulbFarenheit3", lag("WetBulbFarenheit", 3).over(windowSpec))
    .withColumn("DewPointCelsius3", lag("DewPointCelsius", 3).over(windowSpec))
    .withColumn("clearSky3", lag("clearSky", 3).over(windowSpec))
    .withColumn("fewClouds3", lag("fewClouds", 3).over(windowSpec))
    .withColumn("scatterClouds3", lag("scatterClouds", 3).over(windowSpec))
    .withColumn("brokenClouds3", lag("brokenClouds", 3).over(windowSpec))
    .withColumn("overCast3", lag("overCast", 3).over(windowSpec))
    .withColumn("obscuredSky3", lag("obscuredSky", 3).over(windowSpec))
    .withColumn("partiallyObscuredSky3", lag("partiallyObscuredSky", 3).over(windowSpec))

    // drop les rows avec moins de 45 feat non nulls
    .na.drop(45)


println(df_weather_clean.count(), df_weather_clean.columns.size)
println(df_weather_clean.columns)

(9276991,56)
[Ljava.lang.String;@5ebcffd2


windowSpec = org.apache.spark.sql.expressions.WindowSpec@73020648
df_weather_clean = [WBAN: bigint, DryBulbFarenheit: bigint ... 54 more fields]


[WBAN: bigint, DryBulbFarenheit: bigint ... 54 more fields]

In [11]:
df_weather_clean.write.parquet("gs://dataset-flight/final-df_weather_clean.parquet")

split de SkyCondition à faire !

In [None]:
// df_weather_clean.select("SkyCondition", "clearSky", "fewClouds", "scatterClouds", "brokenClouds", "overCast", "obscuredSky", "partiallyObscuredSky").show(50)

In [12]:
df_weather_clean.select("WBAN", "DryBulbFarenheit", "Visibility", "WindDirection", "WindSpeed", "StationPressure", "WetBulbFarenheit", "DewPointCelsius", "DATETIME_RND").show(3)

+----+----------------+----------+-------------+---------+---------------+----------------+------------------+-------------------+
|WBAN|DryBulbFarenheit|Visibility|WindDirection|WindSpeed|StationPressure|WetBulbFarenheit|   DewPointCelsius|       DATETIME_RND|
+----+----------------+----------+-------------+---------+---------------+----------------+------------------+-------------------+
|3024|              32|        10|          290|       10|          27.25|              28|-6.099999904632568|2012-01-01 06:00:00|
|3024|              32|        10|          300|     null|          27.26|              28|-6.099999904632568|2012-01-01 08:00:00|
|3024|              30|        10|          270|     null|          27.28|              26|-6.699999809265137|2012-01-01 09:00:00|
+----+----------------+----------+-------------+---------+---------------+----------------+------------------+-------------------+
only showing top 3 rows



https://sparkbyexamples.com/pyspark/pyspark-lag-function/

In [13]:
df_weather_clean.columns

Array(WBAN, DryBulbFarenheit, Visibility, WindDirection, WindSpeed, StationPressure, WetBulbFarenheit, DewPointCelsius, DATETIME_RND, clearSky, fewClouds, scatterClouds, brokenClouds, overCast, obscuredSky, partiallyObscuredSky, WindDirection1, WindSpeed1, StationPressure1, WetBulbFarenheit1, DewPointCelsius1, clearSky1, fewClouds1, scatterClouds1, brokenClouds1, overCast1, obscuredSky1, partiallyObscuredSky1, DryBulbFarenheit2, Visibility2, WindDirection2, WindSpeed2, StationPressure2, WetBulbFarenheit2, DewPointCelsius2, clearSky2, fewClouds2, scatterClouds2, brokenClouds2, overCast2, obscuredSky2, partiallyObscuredSky2, DryBulbFarenheit3, Visibility3, WindDirection3, WindSpeed3, StationPressure3, WetBulbFarenheit3, DewPointCelsius3, clearSky3, fewClouds3, scatt...

---

# Join

In [14]:
val df_weather_clean = spark
    .read.parquet("gs://dataset-flight/final-df_weather_clean.parquet")
    .repartition(col("WBAN"), col("DATETIME_RND"))

println(df_weather_clean.count(), df_weather_clean.columns.size)

(9276927,56)


df_weather_clean = [WBAN: bigint, DryBulbFarenheit: bigint ... 54 more fields]


[WBAN: bigint, DryBulbFarenheit: bigint ... 54 more fields]

In [15]:
val df_flights_wban = spark
    .read.parquet("gs://dataset-flight/final-df_flights_wban.parquet")
    .repartition(col("WBAN_DEP"), col("LOCAL_DEP_DT_RND"))
    .select("WBAN_DEP", "WBAN_ARR", "LOCAL_DEP_DT_RND", "LOCAL_ARR_DT_RND", "IS_DELAYED")

println(df_flights_wban.count(), df_flights_wban.columns.size)

(16970218,5)


df_flights_wban = [WBAN_DEP: bigint, WBAN_ARR: bigint ... 3 more fields]


[WBAN_DEP: bigint, WBAN_ARR: bigint ... 3 more fields]

In [16]:
df_flights_wban.show(3)

+--------+--------+-------------------+-------------------+----------+
|WBAN_DEP|WBAN_ARR|   LOCAL_DEP_DT_RND|   LOCAL_ARR_DT_RND|IS_DELAYED|
+--------+--------+-------------------+-------------------+----------+
|   22521|   23234|2013-10-10 23:00:00|2013-10-11 07:00:00|         0|
|   14922|   13874|2013-10-21 18:00:00|2013-10-21 21:00:00|         0|
|   22521|   21504|2013-10-26 05:00:00|2013-10-26 06:00:00|         0|
+--------+--------+-------------------+-------------------+----------+
only showing top 3 rows



In [17]:
df_flights_wban.summary().show()

+-------+------------------+------------------+--------------------+
|summary|          WBAN_DEP|          WBAN_ARR|          IS_DELAYED|
+-------+------------------+------------------+--------------------+
|  count|          16970218|          16970218|            16970218|
|   mean|  28823.8713732493|28823.737506554127|0.030953579971689228|
| stddev|30022.687050380417|30016.363293676113| 0.17319196755670122|
|    min|              3017|              3017|                   0|
|    25%|             13739|             13739|                   0|
|    50%|             14819|             14819|                   0|
|    75%|             23234|             23234|                   0|
|    max|             94910|             94910|                   1|
+-------+------------------+------------------+--------------------+



In [18]:
val finalDropList = List("WBAN_DEP", "WBAN_ARR", "LOCAL_DEP_DT_RND", "LOCAL_ARR_DT_RND", "DEP_DATETIME_RND", "DEP_WBAN", "ARR_DATETIME_RND", "ARR_WBAN")
          
val df_final = df_flights_wban

    // join the weather dataset 2 times
    .join(df_weather_clean, df_weather_clean("WBAN") === df_flights_wban("WBAN_DEP") && df_weather_clean("DATETIME_RND") === df_flights_wban("LOCAL_DEP_DT_RND"), "inner")

    // rename all weather columns
    .withColumnRenamed("WBAN", "DEP_WBAN")
    .withColumnRenamed("DATETIME_RND", "DEP_DATETIME_RND")

    .withColumnRenamed("DryBulbFarenheit", "DEP_DryBulbFarenheit")
    .withColumnRenamed("Visibility", "DEP_Visibility")
    .withColumnRenamed("WindDirection", "DEP_WindDirection")
    .withColumnRenamed("WindSpeed", "DEP_WindSpeed")
    .withColumnRenamed("StationPressure", "DEP_StationPressure")
    .withColumnRenamed("WetBulbFarenheit", "DEP_WetBulbFarenheit")
    .withColumnRenamed("DewPointCelsius", "DEP_DewPointCelsius")
    .withColumnRenamed("clearSky", "DEP_clearSky")
    .withColumnRenamed("fewClouds", "DEP_fewClouds")
    .withColumnRenamed("scatterClouds", "DEP_scatterClouds")
    .withColumnRenamed("brokenClouds", "DEP_brokenClouds")
    .withColumnRenamed("overCast", "DEP_overCast")
    .withColumnRenamed("obscuredSky", "DEP_obscuredSky")
    .withColumnRenamed("partiallyObscuredSky", "DEP_partiallyObscuredSky")

    .withColumnRenamed("DryBulbFarenheit1", "DEP_DryBulbFarenheit1")
    .withColumnRenamed("Visibility1", "DEP_Visibility1")
    .withColumnRenamed("WindDirection1", "DEP_WindDirection1")
    .withColumnRenamed("WindSpeed1", "DEP_WindSpeed1")
    .withColumnRenamed("StationPressure1", "DEP_StationPressure1")
    .withColumnRenamed("WetBulbFarenheit1", "DEP_WetBulbFarenheit1")
    .withColumnRenamed("DewPointCelsius1", "DEP_DewPointCelsius1")
    .withColumnRenamed("clearSky1", "DEP_clearSky1")
    .withColumnRenamed("fewClouds1", "DEP_fewClouds1")
    .withColumnRenamed("scatterClouds1", "DEP_scatterClouds1")
    .withColumnRenamed("brokenClouds1", "DEP_brokenClouds1")
    .withColumnRenamed("overCast1", "DEP_overCast1")
    .withColumnRenamed("obscuredSky1", "DEP_obscuredSky1")
    .withColumnRenamed("partiallyObscuredSky1", "DEP_partiallyObscuredSky1")

    .withColumnRenamed("DryBulbFarenheit2", "DEP_DryBulbFarenheit2")
    .withColumnRenamed("Visibility2", "DEP_Visibility2")
    .withColumnRenamed("WindDirection2", "DEP_WindDirection2")
    .withColumnRenamed("WindSpeed2", "DEP_WindSpeed2")
    .withColumnRenamed("StationPressure2", "DEP_StationPressure2")
    .withColumnRenamed("WetBulbFarenheit2", "DEP_WetBulbFarenheit2")
    .withColumnRenamed("DewPointCelsius2", "DEP_DewPointCelsius2")
    .withColumnRenamed("clearSky2", "DEP_clearSky2")
    .withColumnRenamed("fewClouds2", "DEP_fewClouds2")
    .withColumnRenamed("scatterClouds2", "DEP_scatterClouds2")
    .withColumnRenamed("brokenClouds2", "DEP_brokenClouds2")
    .withColumnRenamed("overCast2", "DEP_overCast2")
    .withColumnRenamed("obscuredSky2", "DEP_obscuredSky2")
    .withColumnRenamed("partiallyObscuredSky2", "DEP_partiallyObscuredSky2")

    .withColumnRenamed("DryBulbFarenheit3", "DEP_DryBulbFarenheit3")
    .withColumnRenamed("Visibility3", "DEP_Visibility3")
    .withColumnRenamed("WindDirection3", "DEP_WindDirection3")
    .withColumnRenamed("WindSpeed3", "DEP_WindSpeed3")
    .withColumnRenamed("StationPressure3", "DEP_StationPressure3")
    .withColumnRenamed("WetBulbFarenheit3", "DEP_WetBulbFarenheit3")
    .withColumnRenamed("DewPointCelsius3", "DEP_DewPointCelsius3")
    .withColumnRenamed("clearSky3", "DEP_clearSky3")
    .withColumnRenamed("fewClouds3", "DEP_fewClouds3")
    .withColumnRenamed("scatterClouds3", "DEP_scatterClouds3")
    .withColumnRenamed("brokenClouds3", "DEP_brokenClouds3")
    .withColumnRenamed("overCast3", "DEP_overCast3")
    .withColumnRenamed("obscuredSky3", "DEP_obscuredSky3")
    .withColumnRenamed("partiallyObscuredSky3", "DEP_partiallyObscuredSky3")

    // repartition
    .repartition(col("WBAN_ARR"), col("LOCAL_ARR_DT_RND"))

    // join the weather dataset 2 times
    .join(df_weather_clean, df_weather_clean("WBAN") === df_flights_wban("WBAN_ARR") && df_weather_clean("DATETIME_RND") === df_flights_wban("LOCAL_ARR_DT_RND"), "inner")

    // rename all weather columns
    .withColumnRenamed("WBAN", "ARR_WBAN")
    .withColumnRenamed("DATETIME_RND", "ARR_DATETIME_RND")

    .withColumnRenamed("DryBulbFarenheit", "ARR_DryBulbFarenheit")
    .withColumnRenamed("Visibility", "ARR_Visibility")
    .withColumnRenamed("WindDirection", "ARR_WindDirection")
    .withColumnRenamed("WindSpeed", "ARR_WindSpeed")
    .withColumnRenamed("StationPressure", "ARR_StationPressure")
    .withColumnRenamed("WetBulbFarenheit", "ARR_WetBulbFarenheit")
    .withColumnRenamed("DewPointCelsius", "ARR_DewPointCelsius")
    .withColumnRenamed("clearSky", "ARR_clearSky")
    .withColumnRenamed("fewClouds", "ARR_fewClouds")
    .withColumnRenamed("scatterClouds", "ARR_scatterClouds")
    .withColumnRenamed("brokenClouds", "ARR_brokenClouds")
    .withColumnRenamed("overCast", "ARR_overCast")
    .withColumnRenamed("obscuredSky", "ARR_obscuredSky")
    .withColumnRenamed("partiallyObscuredSky", "ARR_partiallyObscuredSky")

    .withColumnRenamed("DryBulbFarenheit1", "ARR_DryBulbFarenheit1")
    .withColumnRenamed("Visibility1", "ARR_Visibility1")
    .withColumnRenamed("WindDirection1", "ARR_WindDirection1")
    .withColumnRenamed("WindSpeed1", "ARR_WindSpeed1")
    .withColumnRenamed("StationPressure1", "ARR_StationPressure1")
    .withColumnRenamed("WetBulbFarenheit1", "ARR_WetBulbFarenheit1")
    .withColumnRenamed("DewPointCelsius1", "ARR_DewPointCelsius1")
    .withColumnRenamed("clearSky1", "ARR_clearSky1")
    .withColumnRenamed("fewClouds1", "ARR_fewClouds1")
    .withColumnRenamed("scatterClouds1", "ARR_scatterClouds1")
    .withColumnRenamed("brokenClouds1", "ARR_brokenClouds1")
    .withColumnRenamed("overCast1", "ARR_overCast1")
    .withColumnRenamed("obscuredSky1", "ARR_obscuredSky1")
    .withColumnRenamed("partiallyObscuredSky1", "ARR_partiallyObscuredSky1")

    .withColumnRenamed("DryBulbFarenheit2", "ARR_DryBulbFarenheit2")
    .withColumnRenamed("Visibility2", "ARR_Visibility2")
    .withColumnRenamed("WindDirection2", "ARR_WindDirection2")
    .withColumnRenamed("WindSpeed2", "ARR_WindSpeed2")
    .withColumnRenamed("StationPressure2", "ARR_StationPressure2")
    .withColumnRenamed("WetBulbFarenheit2", "ARR_WetBulbFarenheit2")
    .withColumnRenamed("DewPointCelsius2", "ARR_DewPointCelsius2")
    .withColumnRenamed("clearSky2", "ARR_clearSky2")
    .withColumnRenamed("fewClouds2", "ARR_fewClouds2")
    .withColumnRenamed("scatterClouds2", "ARR_scatterClouds2")
    .withColumnRenamed("brokenClouds2", "ARR_brokenClouds2")
    .withColumnRenamed("overCast2", "ARR_overCast2")
    .withColumnRenamed("obscuredSky2", "ARR_obscuredSky2")
    .withColumnRenamed("partiallyObscuredSky2", "ARR_partiallyObscuredSky2")

    .withColumnRenamed("DryBulbFarenheit3", "ARR_DryBulbFarenheit3")
    .withColumnRenamed("Visibility3", "ARR_Visibility3")
    .withColumnRenamed("WindDirection3", "ARR_WindDirection3")
    .withColumnRenamed("WindSpeed3", "ARR_WindSpeed3")
    .withColumnRenamed("StationPressure3", "ARR_StationPressure3")
    .withColumnRenamed("WetBulbFarenheit3", "ARR_WetBulbFarenheit3")
    .withColumnRenamed("DewPointCelsius3", "ARR_DewPointCelsius3")
    .withColumnRenamed("clearSky3", "ARR_clearSky3")
    .withColumnRenamed("fewClouds3", "ARR_fewClouds3")
    .withColumnRenamed("scatterClouds3", "ARR_scatterClouds3")
    .withColumnRenamed("brokenClouds3", "ARR_brokenClouds3")
    .withColumnRenamed("overCast3", "ARR_overCast3")
    .withColumnRenamed("obscuredSky3", "ARR_obscuredSky3")
    .withColumnRenamed("partiallyObscuredSky3", "ARR_partiallyObscuredSky3")

    // drop useless cols
    .drop(finalDropList : _*) 


print(df_final.count(), df_final.columns.size)

(2800110,109)

finalDropList = List(WBAN_DEP, WBAN_ARR, LOCAL_DEP_DT_RND, LOCAL_ARR_DT_RND, DEP_DATETIME_RND, DEP_WBAN, ARR_DATETIME_RND, ARR_WBAN)
df_final = [IS_DELAYED: bigint, DEP_DryBulbFarenheit: bigint ... 107 more fields]


[IS_DELAYED: bigint, DEP_DryBulbFarenheit: bigint ... 107 more fields]

In [19]:
df_final.select("IS_DELAYED").summary().show()

+-------+--------------------+
|summary|          IS_DELAYED|
+-------+--------------------+
|  count|             2800110|
|   mean|0.030283453150054818|
| stddev| 0.17136620466983207|
|    min|                   0|
|    25%|                   0|
|    50%|                   0|
|    75%|                   0|
|    max|                   1|
+-------+--------------------+



In [20]:
df_final.write.parquet("gs://dataset-flight/final-for-ml.parquet")

In [22]:
df_final.write.format("bigquery")
  .option("table","gold.final-for-ml")
  .save()

null