In [1]:
import findspark
from pyspark.sql import SparkSession
# import pyspark
findspark.init()

In [23]:
 from pyspark.sql.functions import *
 import pandas as pd
 from pyspark.ml.regression import LinearRegression
 from pyspark.ml.feature import VectorAssembler,StringIndexer, OneHotEncoder
 from pyspark.ml  import  Pipeline
 from pyspark.ml.evaluation import RegressionEvaluator

Load and explore the data's structure / form

In [3]:
spark = SparkSession.builder.appName("Flights").getOrCreate()

In [4]:
flightData = spark.read.option("header","true").option("inferSchema","true").csv("./flights.csv")

In [5]:
flightData.head(5)

[Row(FL_DATE='2019-01-01', TAIL_NUM='N8974C', CARRIER='9E', ORIGIN='AVL', ORIGIN_CITY_NAME='Asheville, NC', DEST='ATL', DEST_CITY_NAME='Atlanta, GA', DEP_TIME=1658, DEP_DELAY=-7.0, ARR_TIME=1758, ARR_DELAY=-22.0, CANCELLED=0.0, CANCELLATION_CODE=None, DIVERTED=0.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, _c19=None),
 Row(FL_DATE='2019-01-01', TAIL_NUM='N922XJ', CARRIER='9E', ORIGIN='JFK', ORIGIN_CITY_NAME='New York, NY', DEST='RDU', DEST_CITY_NAME='Raleigh/Durham, NC', DEP_TIME=1122, DEP_DELAY=-8.0, ARR_TIME=1255, ARR_DELAY=-29.0, CANCELLED=0.0, CANCELLATION_CODE=None, DIVERTED=0.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, _c19=None),
 Row(FL_DATE='2019-01-01', TAIL_NUM='N326PQ', CARRIER='9E', ORIGIN='CLE', ORIGIN_CITY_NAME='Cleveland, OH', DEST='DTW', DEST_CITY_NAME='Detroit, MI', DEP_TIME=1334, DEP_DELAY=-7.0, ARR_TIME=1417, ARR_DELAY=-31.0, CANCELLED=0.0, CANC

Create a temp view, to be able to work easier

In [6]:
flightData.createOrReplaceTempView("flights_table") 

Q1

In [7]:
sqlDF = spark.sql("SELECT avg(DEP_DELAY) as averageDepartureDelay,avg(ARR_DELAY) as averageArrivalDelay FROM flights_table") 
sqlDF.show() 

+---------------------+-------------------+
|averageDepartureDelay|averageArrivalDelay|
+---------------------+-------------------+
|   10.923267333861132|  5.414849168270909|
+---------------------+-------------------+



In [8]:
dataf_way = flightData.agg({"DEP_DELAY": "avg","ARR_DELAY": "avg"}) 
dataf_way.show() 

+------------------+-----------------+
|    avg(DEP_DELAY)|   avg(ARR_DELAY)|
+------------------+-----------------+
|10.923267333861132|5.414849168270909|
+------------------+-----------------+



Q2

For this question I simplified the load of what the results we want, by fetching both the median and avg. Not sure if we wanted to remove the 1% with less rows or values. But I chose to not do that in SQL and do it in-code (cheating), which is slightly faster for me to write. We'd be probably creating a sort of another view from the following tables and perform the queries on top of it. 

In [10]:
sqlDelaysAir = spark.sql("SELECT c.averageDepartureDelay, c.MedianDelay, c.TotalFlights, c.Airline FROM (SELECT avg(DEP_DELAY) as averageDepartureDelay,percentile_approx(DEP_DELAY, 0.5) as MedianDelay, COUNT(DEP_DELAY) as TotalFlights, CARRIER as Airline FROM flights_table GROUP BY CARRIER) as c ").sort(col("TotalFlights").desc())
sqlDelaysAir.show(5)

+---------------------+-----------+------------+-------+
|averageDepartureDelay|MedianDelay|TotalFlights|Airline|
+---------------------+-----------+------------+-------+
|   10.178762481230244|        0.0|     1330598|     WN|
|    8.155754169633253|       -2.0|      990195|     DL|
|   12.114915337571487|       -2.0|      927448|     AA|
|   12.564053392669365|       -3.0|      819738|     OO|
|   13.004563709088917|       -3.0|      620767|     UA|
+---------------------+-----------+------------+-------+
only showing top 5 rows



In [13]:
sqlDelaysAirPorts = spark.sql("SELECT c.averageDepartureDelay, c.MedianDelay, c.TotalFlights, c.Origin FROM (SELECT avg(DEP_DELAY) as averageDepartureDelay,percentile_approx(DEP_DELAY, 0.5) as MedianDelay, COUNT(DEP_DELAY) as TotalFlights, ORIGIN as Origin FROM flights_table GROUP BY ORIGIN) as c").sort(col("TotalFlights").desc())
sqlDelaysAirPorts.show(5)

+---------------------+-----------+------------+------+
|averageDepartureDelay|MedianDelay|TotalFlights|Origin|
+---------------------+-----------+------------+------+
|    8.815516969138223|       -1.0|      392654|   ATL|
|    14.68775798853924|       -2.0|      329472|   ORD|
|   13.077739563367613|       -2.0|      296863|   DFW|
|   13.582163097345848|       -1.0|      247386|   DEN|
|   10.720337668381726|       -2.0|      232062|   CLT|
+---------------------+-----------+------------+------+
only showing top 5 rows



Write the CSVs, but I'm cheating

In [70]:
delaysAirPd = sqlDelaysAir.toPandas()
delaysAirPortsPd = sqlDelaysAirPorts.toPandas()

In [73]:
delaysAirPd[0:int(len(delaysAirPd)*0.9)].to_csv('delaysAirline.csv')
delaysAirPortsPd[0:int(len(delaysAirPortsPd)*0.9)].to_csv('delaysAirPortsPd.csv')

Q3

Cleaning the data, removing Null values if existing

In [14]:
totDelayData = spark.sql("SELECT *, DEP_DELAY + ARR_DELAY as TOT_DELAY FROM flights_table").filter("CARRIER IS NOT NULL and ORIGIN IS NOT NULL and DEP_TIME IS NOT NULL and ARR_DELAY IS NOT NULL")
totDelayData.show(5)

+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+---------+
|   FL_DATE|TAIL_NUM|CARRIER|ORIGIN|ORIGIN_CITY_NAME|DEST|    DEST_CITY_NAME|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|_c19|TOT_DELAY|
+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+---------+
|2019-01-01|  N8974C|     9E|   AVL|   Asheville, NC| ATL|       Atlanta, GA|    1658|     -7.0|    1758|    -22.0|      0.0|             null|     0.0|         null|         null|     null|          null|               null|null|    -29.0|
|2019-01-01|  N922XJ|     9E|   JFK|

Generate factors from the string variables, to have it available for the model. What I did not do was proper cleaning/remove outliers, due to lacking some time.

In [15]:
carrIndexer = StringIndexer(inputCol="CARRIER", outputCol="CARRIER_I")
carrEncoder = OneHotEncoder(inputCol="CARRIER_I", outputCol="CARRIER_F")

destIndexer = StringIndexer(inputCol="ORIGIN", outputCol="ORIGIN_I")
destEncoder = OneHotEncoder(inputCol="ORIGIN_I", outputCol="ORIGIN_F")

In [16]:
vectorAssembler = VectorAssembler(inputCols = ["ORIGIN_F","CARRIER_F", "DEP_TIME"], outputCol = 'FEATURES')

In [17]:
flights_pipe = Pipeline(stages=[destIndexer, destEncoder, carrIndexer, carrEncoder, vectorAssembler])

In [18]:
piped_data = flights_pipe.fit(totDelayData).transform(totDelayData)

Generate a train-test dataset. Use the data to create, train and test the regression model

In [20]:
splits = piped_data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [21]:
lr = LinearRegression(featuresCol = 'FEATURES', labelCol="TOT_DELAY")
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [24.051072686092787,36.71517809557618,32.069323746815435,33.248870876038424,26.06521346287511,25.378222274301336,27.42159240730509,25.09763477274653,32.48140856991804,37.896759756543744,28.724947162555015,26.95236179404192,24.19015376328084,32.43733089314453,27.22447301437999,33.98458554437816,29.269221767721632,42.028871952198486,25.739272129875708,27.12591594184418,19.707244602747785,29.4150095626563,32.43639628725368,23.92018578382683,26.966033322661485,31.1272589238254,33.28588320699031,30.331120161764325,30.817381856008062,26.945785455656416,32.28415956962207,30.105303007236884,34.20494035368983,23.098784759819925,22.031265771450194,31.824883602453504,31.79984781046538,27.766782246010248,27.28144938057476,15.447555518441824,23.442243202315776,27.57696152665111,29.008077985730065,30.53965554922803,34.79696622189321,29.118441560022145,23.713596374126706,26.246427111547703,27.83605826231599,29.08421679726241,25.233344790810275,28.997936331851964,26.28725572579549,15.112

Summarising the model

In [22]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 97.537574
r2: 0.022665


In [24]:
testResults = lr_model.transform(test_df)
testResults.select("prediction", "TOT_DELAY", "FEATURES")

evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="TOT_DELAY",metricName="r2")
print("R Squared (R2) on test data = %g" % evaluator.evaluate(testResults))

R Squared (R2) on test data = 0.0223041


To be expected since I did not correctly clean the data :) 

In [27]:
spark.stop()

Q4
That's not to get points or whatever. But as an idea, I think it would be nice, to have very small assignments throughout the lectures for each system (e.g. a very small one each week), so students follow the lectures and get a more practical idea. For instance for this assignment, we could break it in two parts. First week you install and run the query part, second week you work with the models and regression. I think it's nice, cause you will be also getting feedback from the students during the second lecture (when you teach regression). <br>
Hopefully I managed to transfer what I wanted to say properly :D 
Overall I like the course quite a lot, but I'm all worn out from the other assignments that pilled up the last week :(