In [1]:
import time

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, Bucketizer
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import LinearRegression

In [3]:
start = time.time()

In [4]:
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate() # local cluster with all available nodes

In [5]:
print(spark.version)

3.0.0


## Load Data

In [6]:
data = spark.read.csv('flights.csv', header=True, inferSchema=True, nullValue='NA')

In [7]:
data.count() # number of records

275000

In [8]:
data.dtypes # columns and types

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

In [9]:
data.show(8)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|   377|LGA|1076| 13.33|     182|   70|
|  5| 28|  6|     B6|   904|ORD| 740|  9.58|     130|   47|
|  1| 19|  2|     UA|   820|SFO| 679| 12.75|     123|  135|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 8 rows



Data dictionary:
- mon — month (integer between 1 and 12)
- dom — day of month (integer between 1 and 31)
- dow — day of week (integer; 1 = Monday and 7 = Sunday)
- org — origin airport (IATA code)
- mile — distance (miles)
- carrier — carrier (IATA code)
- depart — departure time (decimal hour)
- duration — expected duration (minutes)
- delay — delay (minutes)

## Data Wrangling

### Drop columns and nulls

We want to predict delay, let's drop the flight column because is an uninformative column.

In [10]:
data2 = data.drop('flight')

We want only the records with no null entries in the delay column

In [11]:
data2.filter('delay IS NULL').count()

16711

In [12]:
data2 = data2.filter('delay IS NOT NULL')

In [13]:
data2.count()

258289

In [14]:
data2.dropna().count()

258289

There are no more nulls in the data

### Create target column

The Federal Aviation Administration (FAA) considers a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time.

Let's create a boolean column indicating whether or not a flight was delayed

In [15]:
data2 = data2.withColumn('delayed', (data2.delay >= 15).cast('integer'))

In [16]:
data2.show(5)

+---+---+---+-------+---+----+------+--------+-----+-------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|delayed|
+---+---+---+-------+---+----+------+--------+-----+-------+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|      1|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|      0|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|      1|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|      1|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|      1|
+---+---+---+-------+---+----+------+--------+-----+-------+
only showing top 5 rows



### Categorical columns

#### String Indexer

In [17]:
data_idx = StringIndexer(inputCol='carrier', outputCol='carrier_idx').fit(data2).transform(data2)
# the order for the categories is assingn according to frequency: most to least.
# Use stringOrderType to change it.

In [18]:
data_idx = StringIndexer(inputCol='org', outputCol='org_idx').fit(data_idx).transform(data_idx)

In [19]:
data_idx.show(5)

+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|delayed|carrier_idx|org_idx|
+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|      1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|      0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|      1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|      1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|      1|        4.0|    3.0|
+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+
only showing top 5 rows



#### One hot encoding

In [20]:
data_1hot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy']).fit(data_idx).transform(data_idx)

In [21]:
data_1hot = OneHotEncoder(inputCols=['carrier_idx'], outputCols=['carrier_dummy']).fit(data_1hot).transform(data_1hot)

In [22]:
data_1hot.show(5) # the dummy columns are DenseVector: indicates the len of the vector and the index and values not 0

+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+-------------+-------------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|delayed|carrier_idx|org_idx|    org_dummy|carrier_dummy|
+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+-------------+-------------+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|      1|        2.0|    0.0|(7,[0],[1.0])|(8,[2],[1.0])|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|      0|        2.0|    0.0|(7,[0],[1.0])|(8,[2],[1.0])|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|      1|        4.0|    2.0|(7,[2],[1.0])|(8,[4],[1.0])|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|      1|        3.0|    5.0|(7,[5],[1.0])|(8,[3],[1.0])|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|      1|        4.0|    3.0|(7,[3],[1.0])|(8,[4],[1.0])|
+---+---+---+-------+---+----+------+--------+-----+-------+-----------+-------+-------------+-------------+
only showing top 5 

### Bucketing

In [23]:
# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0,3,6,9,12,15,18,21,24], inputCol='depart', outputCol='depart_bucket')

In [24]:
bucketed = buckets.transform(data_1hot)
bucketed.select('depart','depart_bucket').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
|  8.18|          2.0|
|  7.17|          2.0|
| 21.17|          7.0|
| 12.92|          4.0|
| 13.33|          4.0|
+------+-------------+
only showing top 5 rows



In [25]:
data_1hot_buck = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy']).fit(bucketed).transform(bucketed)
data_1hot_buck.select('depart','depart_bucket','depart_dummy').show(5)

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
|  8.18|          2.0|(7,[2],[1.0])|
|  7.17|          2.0|(7,[2],[1.0])|
| 21.17|          7.0|    (7,[],[])|
| 12.92|          4.0|(7,[4],[1.0])|
| 13.33|          4.0|(7,[4],[1.0])|
+------+-------------+-------------+
only showing top 5 rows



### Assamble columns
consolidate all of the predictor columns into a single column. Note: not using the buckets yet

In [26]:
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_dummy', 'org_dummy', 'mile', 'depart', 'duration'], outputCol='features')

In [27]:
data_assambled = assembler.transform(data_1hot)

In [28]:
data_assambled.select('mon', 'dom', 'dow', 'carrier_dummy', 'org_dummy', 'mile', 'depart', 'duration', 'features','delayed').show(5, truncate=True)

+---+---+---+-------------+-------------+----+------+--------+--------------------+-------+
|mon|dom|dow|carrier_dummy|    org_dummy|mile|depart|duration|            features|delayed|
+---+---+---+-------------+-------------+----+------+--------+--------------------+-------+
| 10| 10|  1|(8,[2],[1.0])|(7,[0],[1.0])| 157|  8.18|      51|(21,[0,1,2,5,11,1...|      1|
| 11| 22|  1|(8,[2],[1.0])|(7,[0],[1.0])| 738|  7.17|     127|(21,[0,1,2,5,11,1...|      0|
|  2| 14|  5|(8,[4],[1.0])|(7,[2],[1.0])|2248| 21.17|     365|(21,[0,1,2,7,13,1...|      1|
|  5| 25|  3|(8,[3],[1.0])|(7,[5],[1.0])| 386| 12.92|      85|(21,[0,1,2,6,16,1...|      1|
|  3| 28|  1|(8,[4],[1.0])|(7,[3],[1.0])|1076| 13.33|     182|(21,[0,1,2,7,14,1...|      1|
+---+---+---+-------------+-------------+----+------+--------+--------------------+-------+
only showing top 5 rows



## Machine Learning

### Split train-test data

In [29]:
train, test = data_assambled.randomSplit([0.8,0.2], seed=42)

### Decision tree

In [30]:
tree = DecisionTreeClassifier(labelCol='delayed', featuresCol='features').fit(train)

In [31]:
predictions = tree.transform(test)

In [32]:
predictions.select('delayed', 'prediction', 'probability').show(5, truncate=False)

+-------+----------+---------------------------------------+
|delayed|prediction|probability                            |
+-------+----------+---------------------------------------+
|0      |1.0       |[0.3350510235134203,0.6649489764865797]|
|1      |1.0       |[0.3350510235134203,0.6649489764865797]|
|1      |0.0       |[0.5567498633630898,0.4432501366369102]|
|1      |1.0       |[0.3350510235134203,0.6649489764865797]|
|1      |1.0       |[0.3350510235134203,0.6649489764865797]|
+-------+----------+---------------------------------------+
only showing top 5 rows



#### Evaluation

Confusion matrix

In [33]:
predictions.groupBy('delayed', 'prediction').count().show()

+-------+----------+-----+
|delayed|prediction|count|
+-------+----------+-----+
|      1|       0.0| 8753|
|      0|       0.0|15219|
|      1|       1.0|17265|
|      0|       1.0|10453|
+-------+----------+-----+



In [34]:
TN = predictions.filter('prediction = 0 AND delayed = prediction').count()
TP = predictions.filter('prediction = 1 AND delayed = prediction').count()
FN = predictions.filter('prediction = 0 AND delayed <> prediction').count()
FP = predictions.filter('prediction = 1 AND delayed <> prediction').count()

In [35]:
accuracy = (TN + TP)/ (TN + TP + FN + FP)
print(accuracy)

0.628438769587928


In [36]:
precision = TP/ (TP + FP)
recall = TP / (TP + FN)
print(precision, recall)

0.6228804387040912 0.663579060650319


## Logistic Regression

In [37]:
log = LogisticRegression(labelCol='delayed', featuresCol='features').fit(train)

In [38]:
preds = log.transform(test)

#### Evaluation

Confusion matrix

In [39]:
preds.groupBy('delayed', 'prediction').count().show()

+-------+----------+-----+
|delayed|prediction|count|
+-------+----------+-----+
|      1|       0.0| 9188|
|      0|       0.0|14693|
|      1|       1.0|16830|
|      0|       1.0|10979|
+-------+----------+-----+



Weighted Precision

In [40]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'delayed')
weighted_precision = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: "weightedPrecision"})
print(weighted_precision)

0.6101957069746587


AUC (area under ROC)

In [41]:
binary_evaluator = BinaryClassificationEvaluator(labelCol = 'delayed')
auc = binary_evaluator.evaluate(preds, {binary_evaluator.metricName:'areaUnderROC'})
print(auc)

0.653045229480245


## Linear Regression to predict the duration

- Start with just one feature: distance

In [42]:
assembler2 = VectorAssembler(inputCols=['mile'], outputCol='features')

In [43]:
df_assambled = assembler2.transform(data_1hot)

In [44]:
df_train, df_test = df_assambled.randomSplit([0.8,0.2], seed=42)

In [45]:
reg = LinearRegression(labelCol='duration').fit(df_train)

In [46]:
pred_dur = reg.transform(df_test)
pred_dur.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |213.57145459809462|
|160     |133.36577053199244|
|155     |137.14445777522826|
|190     |177.73487235579364|
|255     |213.32766832433748|
+--------+------------------+
only showing top 5 rows



In [47]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur) #RMSE

17.08160179005299

Coefficients

In [48]:
# Intercept (average minutes on ground)
inter = reg.intercept
print(inter)

44.018101199997126


In [49]:
# Coefficients: just one. Indicates the average minutes per mile (slope for distance)
coefs = reg.coefficients
print(coefs)

[0.12189313687857477]


Let's see if it makes sense. To transform to minutes by km: divide by 1.60934. To transform to hours per km: divide by 60. And then take the inverse to have km/hr

In [50]:
1/(0.12189313687857477 / 1.60934/60)

792.1725740489371

According to google this makes sense.

- Add Origin

In [51]:
assembler3 = VectorAssembler(inputCols=['mile','org_dummy'], outputCol='features')
df3_assambled = assembler3.transform(data_1hot)
df3_train, df3_test = df3_assambled.randomSplit([0.8,0.2], seed=42)

In [52]:
reg3 = LinearRegression(labelCol='duration').fit(df3_train)

In [53]:
pred_dur2 = reg3.transform(df3_test)
pred_dur2.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |234.81889722676988|
|160     |150.3855457396908 |
|155     |154.0903953711782 |
|190     |193.88765109005902|
|255     |228.78494439310168|
+--------+------------------+
only showing top 5 rows



In [54]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur2) #RMSE

11.124669149751762

coefficients

In [55]:
coefs = reg3.coefficients
print(coefs)

[0.11951127843507756,28.40886411142309,20.39563635811636,52.69779736409431,46.902867087296265,15.80421429487965,18.154485106121992,18.020384912431833]


To understand them we need to see how was converted the org column.

In [56]:
data_1hot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



So:
- 0: Miles
- 1: ORD
- 2: SFO
- 3: JFK
- 4: LGA
- 5: SMF
- 6: SJC
- 7: TUS

and the baseline is OGG.

In [57]:
# Intercept (average minutes on ground at IGG)
inter = reg3.intercept
print(inter)

15.880911559482685


In [58]:
# average speed
1/(reg3.coefficients[0] / 1.60934/60)

807.9605645960416

In [59]:
# Average minutes on ground at JFK
reg3.intercept + reg3.coefficients[3]

68.578708923577

- Add bucked departure time

In [60]:
assembler4 = VectorAssembler(inputCols=['mile', 'org_dummy', 'depart_dummy'], outputCol='features')
df4_assambled = assembler4.transform(data_1hot_buck)
df4_train, df4_test = df4_assambled.randomSplit([0.8,0.2], seed=42)

In [61]:
reg4 = LinearRegression(labelCol='duration').fit(df4_train)
pred_dur3 = reg4.transform(df4_test)
pred_dur3.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |233.31621375729964|
|160     |152.92426907580506|
|155     |151.81915743234356|
|190     |196.45808858909226|
|255     |231.38082292392698|
+--------+------------------+
only showing top 5 rows



In [62]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur3) #RMSE

10.799165629335906

coefficients

In [63]:
coefs = reg4.coefficients
print(coefs)
print(len(coefs))

[0.11959840525628343,27.25835676711819,20.157493166573733,51.94647921354594,46.03664842895673,15.242213454835511,17.434676924001568,17.537128241574294,-14.365281516247915,0.6723170044798787,4.217389056086441,7.233363970245014,4.816414500763589,9.030051262492709,9.039585895977948]
15


To understand them we need to see how was converted the depart column.

In [64]:
data_1hot_buck.select('depart_bucket', 'depart_dummy').distinct().sort('depart_bucket').show()

+-------------+-------------+
|depart_bucket| depart_dummy|
+-------------+-------------+
|          0.0|(7,[0],[1.0])|
|          1.0|(7,[1],[1.0])|
|          2.0|(7,[2],[1.0])|
|          3.0|(7,[3],[1.0])|
|          4.0|(7,[4],[1.0])|
|          5.0|(7,[5],[1.0])|
|          6.0|(7,[6],[1.0])|
|          7.0|    (7,[],[])|
+-------------+-------------+



So:
- 0: Miles
- 1: ORD
- 2: SFO
- 3: JFK
- 4: LGA
- 5: SMF
- 6: SJC
- 7: TUS
- 8: 00:00 to 03:00
- 9: 03:00 to 06:00
- 10: 06:00 to 09:00
- 11: 09:00 to 12:00
- 12: 12:00 to 15:00
- 13: 15:00 to 18:00
- 14: 18:00 to 21:00

and the baseline is OGG and 21:00 to 24:00

In [65]:
# Average minutes on ground at OGG for flights departing between 21:00 and 24:00
reg4.intercept

10.191938331499856

In [66]:
# Average minutes on ground at JFK for flights departing between 00:00 and 03:00
reg4.intercept + reg4.coefficients[3] + reg4.coefficients[8]

47.77313602879788

In [67]:
# average speed
1/(reg4.coefficients[0] / 1.60934/60)

807.3719694931043

- 5 features

In [68]:
data_1hot5 = OneHotEncoder(inputCols=['dow'], outputCols=['dow_dummy']).fit(data_1hot_buck).transform(data_1hot_buck)

In [70]:
data_1hot5 = OneHotEncoder(inputCols=['mon'], outputCols=['mon_dummy']).fit(data_1hot5).transform(data_1hot5)

In [74]:
assembler5 = VectorAssembler(inputCols=['mile', 'org_dummy', 'depart_dummy', 'dow_dummy', 'mon_dummy'], outputCol='features')
df5_assambled = assembler5.transform(data_1hot5)
df5_train, df5_test = df5_assambled.randomSplit([0.8,0.2], seed=42)

In [75]:
reg5 = LinearRegression(labelCol='duration').fit(df5_train)
pred_dur5 = reg5.transform(df5_test)
pred_dur5.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |234.30441937768907|
|160     |153.88787618453668|
|155     |152.80073903831067|
|190     |197.43318110430903|
|255     |232.36512900698355|
+--------+------------------+
only showing top 5 rows



In [76]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur5) #RMSE

10.717450371936241

In [77]:
reg5.coefficients

DenseVector([0.1196, 27.4282, 20.28, 52.1327, 46.2526, 15.3396, 17.5738, 17.6311, -14.7498, 0.3762, 4.1211, 7.1466, 4.7367, 8.9168, 8.9682, 0.0175, -0.0392, -0.0501, 0.0005, 0.0252, -0.0171, -1.6671, -1.8515, -1.9208, -3.3453, -3.9572, -3.8475, -3.8998, -3.9325, -3.7864, -2.5797, -0.3695])

### Regularization

- Lasso

Many of the coefficients in the resulting model will be set to zero. This means that only a subset of the predictors actually contribute to the model.



In [78]:
reg6 = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=1).fit(df5_train)

In [79]:
pred_dur_L1 = reg6.transform(df5_test)
pred_dur_L1.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |231.81039426128888|
|160     |148.01060178884518|
|155     |150.62527959619297|
|190     |191.07545176011865|
|255     |225.6219797590523 |
+--------+------------------+
only showing top 5 rows



In [80]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur_L1) #RMSE

11.673318969531302

In [82]:
reg6.coefficients

DenseVector([0.1183, 5.5958, 0.0, 29.2854, 22.2807, -2.0903, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0529, 1.2351, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

A lot of the coefficients are zero now

In [84]:
sum([beta == 0 for beta in reg6.coefficients])

25

In [85]:
len(reg6.coefficients)

32

Only 7 coefficients are not null

- Ridge

In [86]:
reg7 = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=0).fit(df5_train)
pred_dur_L2 = reg7.transform(df5_test)
pred_dur_L2.select('duration', 'prediction').show(5, False)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|245     |233.36732473040763|
|160     |153.40662281668952|
|155     |152.59141875049886|
|190     |196.41404397325738|
|255     |230.91450270325134|
+--------+------------------+
only showing top 5 rows



In [87]:
RegressionEvaluator(labelCol='duration').evaluate(pred_dur_L2) #RMSE

10.850811950754553

In [88]:
reg7.coefficients

DenseVector([0.1182, 18.1529, 11.1271, 43.0449, 36.5799, 5.7506, 8.0254, 8.068, -13.6228, 0.103, 3.8725, 6.8231, 4.1019, 8.3504, 8.4395, 0.039, -0.0121, -0.0175, 0.0132, 0.0185, -0.0208, -1.3692, -1.5407, -1.5126, -2.9169, -3.508, -3.4036, -3.5037, -3.5622, -3.4392, -2.2871, -0.1227])

Ridge performs a little better but we have a lot more of coefficients.

In [71]:
localall = 115.71719408035278

In [72]:
local1 = 185.51063537597656

In [73]:
#spark.stop() # close the conection is a good practice