# ML on NYCFlights with PySpark ML

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, dayofweek, dayofmonth, month
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 17:54:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the data

In [2]:
flights = spark.read.parquet("data/flights.parquet")
weather = spark.read.parquet("data/weather.parquet")

In [3]:
flights.show(10)

+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|
+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+
|2013|    1|  1|     517|           515|      2.0|     830|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|  1400.0| 5.0|  15.0|2013-01-01 10:00:00|
|2013|    1|  1|     533|           529|      4.0|     850|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|  1416.0| 5.0|  29.0|2013-01-01 10:00:00|
|2013|    1|  1|     542|           540|      2.0|     923|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|  1089.0| 5.0|  40.0|2

In [4]:
weather.show(10)

+------+----+-----+---+----+-----+-----+-----+--------+------------------+---------+------+--------+-----+-------------------+
|origin|year|month|day|hour| temp| dewp|humid|wind_dir|        wind_speed|wind_gust|precip|pressure|visib|          time_hour|
+------+----+-----+---+----+-----+-----+-----+--------+------------------+---------+------+--------+-----+-------------------+
|   EWR|2013|    1|  1|   1|39.02|26.06|59.37|   270.0|10.357019999999999|     null|   0.0|  1012.0| 10.0|2013-01-01 06:00:00|
|   EWR|2013|    1|  1|   2|39.02|26.96|61.63|   250.0|           8.05546|     null|   0.0|  1012.3| 10.0|2013-01-01 07:00:00|
|   EWR|2013|    1|  1|   3|39.02|28.04|64.43|   240.0|           11.5078|     null|   0.0|  1012.5| 10.0|2013-01-01 08:00:00|
|   EWR|2013|    1|  1|   4|39.92|28.04|62.21|   250.0|12.658579999999999|     null|   0.0|  1012.2| 10.0|2013-01-01 09:00:00|
|   EWR|2013|    1|  1|   5|39.02|28.04|64.43|   260.0|12.658579999999999|     null|   0.0|  1011.9| 10.0|2013-

---

## Merge & Clean training data

In [15]:
t = (
    flights
    .withColumn("delayed", col("arr_delay") >= 30)
    .withColumn("date", col("time_hour").cast("date"))
    .join(weather, ["origin", "time_hour"])
    .select(
        "dep_time", "flight", "origin", "dest", "air_time", "distance", "carrier", "date",
        "temp", "dewp", "humid", "wind_dir", "wind_speed", "precip", "pressure", "visib",
        "delayed"
    )
    .dropna()
    .withColumn("date_dow", dayofweek(col("date")))
    .withColumn("date_month", month(col("date")))
    .withColumn("xmas_eve", when((month(col("date")) == 12) & (dayofmonth(col("date")) == 24), True).otherwise(False))
    .withColumn("xmas", when((month(col("date")) == 12) & (dayofmonth(col("date")) == 25), True).otherwise(False))
    .withColumn("july4", when((month(col("date")) == 7) & (dayofmonth(col("date")) == 4), True).otherwise(False))
    .drop("date")
)

t.show(10)

+--------+------+------+----+--------+--------+-------+-----+-----+-----+--------+------------------+------+--------+-----+-------+--------+----------+--------+-----+-----+
|dep_time|flight|origin|dest|air_time|distance|carrier| temp| dewp|humid|wind_dir|        wind_speed|precip|pressure|visib|delayed|date_dow|date_month|xmas_eve| xmas|july4|
+--------+------+------+----+--------+--------+-------+-----+-----+-----+--------+------------------+------+--------+-----+-------+--------+----------+--------+-----+-----+
|     517|  1545|   EWR| IAH|   227.0|  1400.0|     UA|39.02|28.04|64.43|   260.0|12.658579999999999|   0.0|  1011.9| 10.0|  false|       3|         1|   false|false|false|
|     533|  1714|   LGA| IAH|   227.0|  1416.0|     UA|39.92|24.98|54.81|   250.0|14.960139999999999|   0.0|  1011.4| 10.0|  false|       3|         1|   false|false|false|
|     542|  1141|   JFK| MIA|   160.0|  1089.0|     AA|39.02|26.96|61.63|   260.0|14.960139999999999|   0.0|  1012.1| 10.0|   true|    

In [16]:
(train, test) = t.randomSplit([0.2, 0.8])

---

## Build a Pipeline

In [29]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [56]:
pipeline = Pipeline(
    stages=[
        StringIndexer(
            inputCols=["origin", "dest", "carrier"],
            outputCols=["origin_indexed", "dest_indexed", "carrier_indexed"]
        )
    ]
)

In [69]:
pipeline_model = pipeline.fit(train)

In [70]:
pipeline_model.stages

[StringIndexerModel: uid=StringIndexer_2bfc016cc733, handleInvalid=error, numInputCols=3, numOutputCols=3]

In [71]:
result = pipeline_model.transform(train)

---

## Fit a Model

In [72]:
import xgboost as xgb

In [73]:
model = xgb.XGBClassifier(tree_method="hist", enable_categorical=True, objective="binary:hinge")

In [74]:
df = result.toPandas()
X = df.drop(["delayed", "origin", "dest", "carrier"], axis=1)
y = df["delayed"]

Unnamed: 0,dep_time,flight,air_time,distance,temp,dewp,humid,wind_dir,wind_speed,precip,pressure,visib,date_dow,date_month,xmas_eve,xmas,july4,origin_indexed,dest_indexed,carrier_indexed
0,1,839,199.0,1576.0,69.98,66.92,90.02,0.0,0.00000,0.00,1015.1,10.0,1,7,False,False,False,1.0,57.0,1.0
1,1,839,202.0,1576.0,24.98,8.06,48.03,320.0,4.60312,0.00,1024.4,10.0,3,12,False,False,False,1.0,57.0,1.0
2,1,1703,36.0,200.0,59.00,53.96,83.34,120.0,12.65858,0.17,1010.1,9.0,4,4,False,False,False,0.0,3.0,0.0
3,2,739,193.0,1617.0,46.94,30.92,53.47,260.0,18.41248,0.00,1023.8,10.0,5,1,False,False,False,1.0,71.0,1.0
4,2,1503,200.0,1598.0,71.06,69.08,93.47,180.0,10.35702,0.00,1018.1,9.0,3,7,False,False,False,1.0,18.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
56341,2400,363,101.0,733.0,91.94,68.00,45.63,270.0,10.35702,0.00,1013.5,10.0,1,7,False,False,False,2.0,1.0,4.0
56342,2400,425,129.0,1005.0,75.92,73.04,90.80,170.0,10.35702,0.00,1025.4,10.0,1,7,False,False,False,1.0,15.0,1.0
56343,2400,745,201.0,1617.0,73.94,69.08,84.80,230.0,5.75390,0.00,1020.4,10.0,4,8,False,False,False,1.0,71.0,1.0
56344,2400,985,37.0,187.0,69.98,62.96,78.41,190.0,12.65858,0.00,1012.5,10.0,3,6,False,False,False,1.0,3.0,3.0


In [75]:
model.fit(X, y)

---

## Evaluate the Model

In [79]:
test_result = pipeline_model.transform(test)

In [81]:
df_test = result.toPandas()
X_test = df_test.drop(["delayed", "origin", "dest", "carrier"], axis=1)
y_test = df_test["delayed"]

In [82]:
y_predict = model.predict(X_test)

In [83]:
from sklearn.metrics import accuracy_score

In [84]:
accuracy_score(y_test, y_predict)

0.9041280658786782

## Is that good?

In [85]:
import numpy as np

In [86]:
accuracy_score(y_test, np.zeros_like(y_test))

0.8564760586377028