In [27]:
from pyspark.sql import SparkSession

## Initiate a Spark Session & Load data

In [28]:
spark= SparkSession.builder.master('local[*]').appName("test").getOrCreate()

In [29]:
print(spark.version)

2.3.2


In [30]:
!wget 'https://assets.datacamp.com/production/repositories/3918/datasets/c601f67a55e03400acb6c72ac0937c4b95906e88/flights.csv'

--2020-03-17 20:52:58--  https://assets.datacamp.com/production/repositories/3918/datasets/c601f67a55e03400acb6c72ac0937c4b95906e88/flights.csv
Resolving assets.datacamp.com (assets.datacamp.com)... 99.86.57.3, 99.86.57.98, 99.86.57.120, ...
Connecting to assets.datacamp.com (assets.datacamp.com)|99.86.57.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1756579 (1.7M) [text/csv]
Saving to: ‘flights.csv.1’


2020-03-17 20:52:58 (12.7 MB/s) - ‘flights.csv.1’ saved [1756579/1756579]



In [31]:
flights=spark.read.csv("flights.csv", sep=',', header=True, inferSchema=True, nullValue='NA')

In [32]:
flights.show(3)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 3 rows



In [33]:
print("Ther data contain %d records."% flights.count())

Ther data contain 50000 records.


In [34]:
flights.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

## Data Cleaning and wrangling

#### Delete columns and remove NA data

In [35]:
flights=flights.drop("flight")

In [36]:
flights.filter("DELAY IS NULL").count()

2978

In [37]:
flights=flights.filter('DELAY IS NOT NULL')

In [38]:
flights=flights.dropna()

#### Column manipulation

In [40]:
from pyspark.sql.functions import round

In [41]:
flights.columns

['mon', 'dom', 'dow', 'carrier', 'org', 'mile', 'depart', 'duration', 'delay']

In [44]:
flights_km= flights.withColumn('km', round(flights.mile * 1.60934, 0)).drop('mile')

According to FAA, a flight is considered delayed when it arrives 15 mins or more after its scheduled time 

We create a label boolean column that labels flights as delayed (1) or not delayed (0)

In [45]:
flights_km= flights_km.withColumn('label', (flights_km.delay >= 15).cast("integer"))

In [47]:
flights_km.show()

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
|  1| 16|  6|     UA|ORD|   8.0|     232|   -7|2317.0|    0|
|  1| 22|  5|     UA|SJC|  7.98|     250|  -13|2943.0|    0|
| 11|  8|  1|     OO|SFO|  7.77|      60|   88| 254.0|    1|
|  4| 26|  1|     AA|SFO| 13.25|     210|  -10|2356.0|    0|
|  4| 25|  0|     AA|ORD| 13.75|     160|   31|1574.0|    1|
|  8| 30|  2|     UA|ORD| 13.28|     151|   16|1157.0|    1|
|  3| 16|  3|     UA|ORD|   9.0|     264|    3|2808.0|    0|
|  0|  3|  4|     AA|LGA| 17.08|     190|   32|1765.0|    1|
|  5|  9|  1|     UA|SFO

#### Categorical variables

It's important to note that the StringOrderType is reversed, so the 0.0 index is the most frequent index

In [48]:
from pyspark.ml.feature import StringIndexer

In [49]:
indexer=StringIndexer(inputCol="carrier", outputCol="carrier_idx")

In [64]:
indexer_model=indexer.fit(flights_km)

In [65]:
flights_indexed=indexer_model.transform(flights_km)

In [66]:
flights_indexed= StringIndexer(inputCol="org", outputCol="org_idx").fit(flights_indexed).transform(flights_indexed)

In [67]:
flights_indexed.show()

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|
|  1| 16|  6|     UA|ORD|   8.0|     232|   -7|2317.0|    0|        0.0|    0.0|
|  1| 22|  5|     UA|SJC|  7.98|     250|  -13|2943.0|    0|        0.0|    5.0|
| 11|  8|  1|     OO|SFO|  7.77|      60|   88| 254.0|    1|        2.0|    1.0|
|  4| 26|  1|     AA|SFO| 13.25|     210|  -10|2356.0|    0|        1.0|    1.0|
|  4| 25|  0|     AA|ORD| 13

In spark, in order to run a machine learning tasks, the features should all be assembled into one column using vectorassembler class

In [57]:
from pyspark.ml.feature import VectorAssembler

In [61]:
assembler= VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol='features')

In [71]:
flights_assembled = assembler.transform(flights_indexed)

In [73]:
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|-5   |
|[5.0,2.0,1.0,0.0,1.0,885.0,7.98,102.0]   |2    |
|[7.0,2.0,6.0,1.0,0.0,1180.0,10.83,135.0] |54   |
+-----------------------------------------+-----+
only showing top 5 rows



### Build ML: Decision Trees

In [74]:
flights_train, flights_test= flights_assembled.randomSplit([0.8, 0.2], seed=17)

In [75]:
training_ratio=flights_train.count()/flights_test.count()

In [76]:
print(training_ratio)

3.9522906793048973


In [77]:
from pyspark.ml.classification import DecisionTreeClassifier

In [78]:
tree=DecisionTreeClassifier()

In [80]:
tree_model=tree.fit(flights_train)

In [82]:
prediction=tree_model.transform(flights_test)

In [83]:
prediction.select("label", "prediction", "probability").show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.4902093180283592,0.5097906819716408] |
|1    |1.0       |[0.39567511906294245,0.6043248809370575]|
|1    |1.0       |[0.31828757561656584,0.6817124243834342]|
|1    |1.0       |[0.31828757561656584,0.6817124243834342]|
|1    |1.0       |[0.31828757561656584,0.6817124243834342]|
+-----+----------+----------------------------------------+
only showing top 5 rows



#### Create a confusion matrix

In [84]:
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1218|
|    0|       0.0| 2422|
|    1|       1.0| 3606|
|    0|       1.0| 2249|
+-----+----------+-----+



In [86]:
TN= prediction.filter('prediction= 0 AND label=0').count()
TP= prediction.filter('prediction= 1 AND label=1').count()
FN= prediction.filter('prediction= 0 AND label=1').count()
FP= prediction.filter('prediction= 1 AND label=0').count()


In [89]:
accuracy=(TN+TP)/(TP+TN+FP+FN)
print(accuracy)

0.6348604528699315


### Build ML: Logistic Regression

In [90]:
from pyspark.ml.classification import LogisticRegression

In [91]:
logistic=LogisticRegression().fit(flights_train)

In [92]:
prediction_l=logistic.transform(flights_test)

In [93]:
prediction.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1218|
|    0|       0.0| 2422|
|    1|       1.0| 3606|
|    0|       1.0| 2249|
+-----+----------+-----+



In [96]:
TN_= prediction.filter('prediction= 0 AND label=0').count()
TP_= prediction.filter('prediction= 1 AND label=1').count()
FN_= prediction.filter('prediction= 0 AND label=1').count()
FP_= prediction.filter('prediction= 1 AND label=0').count()


In [95]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [104]:
precision=TP_/(TP_+FP_) #how precise are you in predicting a positive outcome
# for all the flights predicted to be delayed, what proportion id actually delayed

In [105]:
recall=TP_/(TP_+FN_) #did the machine learn how to account for unexpected events
#for all the flights delayed, what proportion is correctly predicted by the model

In [103]:
print('precision = {:.2f} \n recall= {:.2f}'.format(precision, recall))

precision = 0.62 
 recall= 0.75


The issue here is the precision and recall are formulated in the positive target class, alternatively, we could use a weighted versions of these metrics

In [106]:
multi_evaluator=MulticlassClassificationEvaluator()

Depending on the business context, the better metric would be chosen. However, for the purpose of this project, I would assume recall is more important than precision as
flights predicted to be delayed but are not or flights not predicted to be delayed but actually get delayed cause major disruption for passengers 

In [110]:
weighted_recall= multi_evaluator.evaluate(prediction_l, {multi_evaluator.metricName: 'weightedRecall'})
print(weighted_recall)

0.6126382306477093


In [113]:
binary_evalator=BinaryClassificationEvaluator()
auc=binary_evalator.evaluate(prediction_l, {binary_evalator.metricName: 'areaUnderROC'})
print(auc)

0.6501216177018289


If you notice, the index values we have are not the right representation of our cateogrical variables

In [114]:
from pyspark.ml.feature import OneHotEncoderEstimator

In [115]:
flights_assembled.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|[0.0,22.0,2.0,0.0...|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|[2.0,20.0,4.0,0.0...|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|[9.0,13.0,1.0,1.0...|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|[5.0,2.0,1.0,0.0,...|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|[7.0,2.0,6.0,1.0,...|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
only showing top 5 rows



In [120]:
one_hot=OneHotEncoderEstimator(inputCols=['org_idx'], outputCols=['org_dummy'])

In [121]:
onehot=one_hot.fit(flights_assembled)

In [122]:
flights_onehot=onehot.transform(flights_assembled)

In [126]:
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



### Predicting Duration of flight

In [183]:
subset=flights_assembled.select("km", "duration")

In [195]:
subset=flights_onehot.select("km","org_dummy",  "duration")
subset=VectorAssembler(inputCols=["km", "org_dummy"], outputCol="features").transform(subset)

In [196]:
subset.show()

+------+-------------+--------+--------------------+
|    km|    org_dummy|duration|            features|
+------+-------------+--------+--------------------+
| 509.0|(7,[0],[1.0])|      82|(8,[0,1],[509.0,1...|
| 542.0|(7,[1],[1.0])|      82|(8,[0,2],[542.0,1...|
|1989.0|(7,[0],[1.0])|     195|(8,[0,1],[1989.0,...|
| 885.0|(7,[1],[1.0])|     102|(8,[0,2],[885.0,1...|
|1180.0|(7,[0],[1.0])|     135|(8,[0,1],[1180.0,...|
|2317.0|(7,[0],[1.0])|     232|(8,[0,1],[2317.0,...|
|2943.0|(7,[5],[1.0])|     250|(8,[0,6],[2943.0,...|
| 254.0|(7,[1],[1.0])|      60|(8,[0,2],[254.0,1...|
|2356.0|(7,[1],[1.0])|     210|(8,[0,2],[2356.0,...|
|1574.0|(7,[0],[1.0])|     160|(8,[0,1],[1574.0,...|
|1157.0|(7,[0],[1.0])|     151|(8,[0,1],[1157.0,...|
|2808.0|(7,[0],[1.0])|     264|(8,[0,1],[2808.0,...|
|1765.0|(7,[3],[1.0])|     190|(8,[0,4],[1765.0,...|
|1556.0|(7,[1],[1.0])|     158|(8,[0,2],[1556.0,...|
|2792.0|(7,[0],[1.0])|     265|(8,[0,1],[2792.0,...|
|1291.0|(7,[0],[1.0])|     160|(8,[0,1],[1291.

In [212]:
subset.features[0]

Column<b'features[0]'>

In [None]:
subset=VectorAssembler(inputCols=["km"], outputCol="features").transform(subset)

In [188]:
from pyspark.ml.regression import LinearRegression 
from pyspark.ml.evaluation import RegressionEvaluator

In [199]:
flights_train, flights_test= subset.randomSplit([0.8, 0.2], seed=17)

In [200]:
regression=LinearRegression(labelCol="duration").fit(flights_train)

In [201]:
predictions=regression.transform(flights_test)

In [192]:
predictions.select('duration', 'prediction').show(26 ,False)

+--------+-----------------+
|duration|prediction       |
+--------+-----------------+
|44      |52.31997678812364|
|46      |52.31997678812364|
|47      |52.31997678812364|
|47      |52.31997678812364|
|47      |52.31997678812364|
|47      |52.31997678812364|
|48      |52.31997678812364|
|48      |52.31997678812364|
|48      |52.31997678812364|
|48      |52.31997678812364|
|49      |52.31997678812364|
|49      |52.31997678812364|
|49      |52.31997678812364|
|51      |52.31997678812364|
|53      |52.31997678812364|
|39      |53.53016578145956|
|39      |53.53016578145956|
|40      |53.53016578145956|
|40      |53.53016578145956|
|41      |53.53016578145956|
|42      |53.53016578145956|
|42      |53.53016578145956|
|42      |53.53016578145956|
|42      |53.53016578145956|
|43      |53.53016578145956|
|43      |53.53016578145956|
+--------+-----------------+
only showing top 26 rows



In [202]:
predictions.select('duration', 'prediction').show(5, False)

+--------+-----------------+
|duration|prediction       |
+--------+-----------------+
|44      |52.33681808115269|
|46      |52.33681808115269|
|47      |52.33681808115269|
|47      |52.33681808115269|
|47      |52.33681808115269|
+--------+-----------------+
only showing top 5 rows



In [203]:
RegressionEvaluator(labelCol='duration').evaluate(predictions)

11.050019756733828

In [194]:
inter=regression.intercept
print(inter) #average mins on the ground

44.151201083106216


In [161]:
coefs=regression.coefficients
print(coefs)
minutes_per_km= regression.coefficients[0]

[0.0756368120834947]


In [163]:
avg_speed=60/minutes_per_km
print(avg_speed)

793.2645275129605


In [214]:
inter=regression.intercept
print(inter) #average mins on the ground
avg_ground_jfk= inter + regression.coefficients[3]
print(avg_ground_jfk)
avg_ground_lga= inter + regression.coefficients[4]
print(avg_ground_lga)

15.64805785642961
68.20907647901029
62.39772859362184
