<h2>Big Data Systems and Architectures - Spark Assignment 2021</h2>
<h3>Exploring International Flights in 2017 Data</h3>

---
> Georgia Vlassi p2822001<br />
> Business Anlytics <br />
> Athens University of Economics and Business <br/>

---

The main scope of this assignment is to analyse a dataset about international flights in 2017, using Apache Spark to reveal insights about these data. 

You can find the aforementioned data here:
http://andrea.imis.athena-innovation.gr/aueb-master/flights.csv.zip


### <h3>Task 3</h3>
As a final task, your supervisor assigned to you to investigate if it is possible to train a linear 
regression model that could predict the departure delay a flight may have by using, as input, 
its origin (column “ORIGIN”), its airways (column “CARRIER”), and its departure time (column 
“DEP_TIME”). Again you should use Python and DataFrames, this time with MLlib. You should 
pay attention to transform the string-based input features using the proper representation 
format, and you should explain your choices. Special attention should be given to the
“DEP_TIME” column: your supervisor told you that you should only consider the 
corresponding hour of the day (which, of course, should be transformed in a one-hot 
representation). For your training and testing workflow you should first remove outliers (see 
Task 2) and then split your dataset into two parts, one that will be used for training reasons 
and it will contain 70% of the entries, and a second one containing the remaining entries and 
which will be used for the assessment of the model. No need to implement a more 
sophisticated evaluation process (e.g., based on k-fold) is required in this phase. Your code 
should (a) prepare the feature vectors, (b) prepare the training and testing datasets, (c) train 
the model, (d) print on the screen the first 10 predictions, i.e., pairs of feature vectors (in 
compact format) and predicted outputs, on the screen, and (e) evaluate the accuracy of the 
model and display the corresponding metric on the screen

In [1]:
import findspark
findspark.init()

#Import the following
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.types import IntegerType, Row, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import col,percent_rank,count,avg,expr,desc,round,when,isnan

#Import ML
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
from pyspark.sql import types as T

#Import Linear Regression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

Our first step is to create a temporary view to read the data.

In [2]:
spark = SparkSession.builder.appName("FlightsAssignment").getOrCreate()
spark

After initialising the view we have to load our data. The data file named '671009038_T_ONTIME_REPORTING.csv', will be read by using using spark and save them into flights_data variable. In order to access the data download the file mentioned on description and unzip it.

In [3]:
#load data
flights_data = spark.read\
.option("header","true")\
.option("inferSchema","true")\
.csv("671009038_T_ONTIME_REPORTING.csv")

#Clear dataset from null values
flights_data=flights_data.filter(flights_data.DEP_DELAY.isNotNull())

In [4]:
#Keep only the four columns
flights_data = flights_data.select("DEP_DELAY","ORIGIN","CARRIER","DEP_TIME")
flights_data.show(10)

#Count unique values of ORIGIN 
flights_data.select("ORIGIN").distinct().count()

#Count unique values of CARRIER 
flights_data.select("CARRIER").distinct().count()

#Check types
flights_data.dtypes

+---------+------+-------+--------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|
+---------+------+-------+--------+
|     -7.0|   AVL|     9E|    1658|
|     -8.0|   JFK|     9E|    1122|
|     -7.0|   CLE|     9E|    1334|
|     -1.0|   BHM|     9E|    1059|
|     -3.0|   GTF|     9E|    1057|
|      0.0|   GRB|     9E|     855|
|     -5.0|   AGS|     9E|     800|
|    -10.0|   CLT|     9E|    1350|
|     -4.0|   MEM|     9E|    1441|
|     -4.0|   MSP|     9E|     847|
+---------+------+-------+--------+
only showing top 10 rows



[('DEP_DELAY', 'double'),
 ('ORIGIN', 'string'),
 ('CARRIER', 'string'),
 ('DEP_TIME', 'int')]

In [5]:
#Change the type of DEP_TIME to string
flights_data = flights_data.withColumn("DEP_TIME",flights_data["DEP_TIME"].cast(T.StringType()))

# converted DEP_TIME
flights_data.dtypes

[('DEP_DELAY', 'double'),
 ('ORIGIN', 'string'),
 ('CARRIER', 'string'),
 ('DEP_TIME', 'string')]

In [6]:
#Fill with zeros if length = 3 digits 
flights_data = flights_data.withColumn('DEP_TIME', F.format_string("%04d", F.col('DEP_TIME').cast("int"))) 
flights_data.show(10)

#Convert it type to string
flights_data = flights_data.withColumn("DEP_TIME",flights_data["DEP_TIME"].cast(T.StringType()))
print(flights_data.dtypes)

+---------+------+-------+--------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|
+---------+------+-------+--------+
|     -7.0|   AVL|     9E|    1658|
|     -8.0|   JFK|     9E|    1122|
|     -7.0|   CLE|     9E|    1334|
|     -1.0|   BHM|     9E|    1059|
|     -3.0|   GTF|     9E|    1057|
|      0.0|   GRB|     9E|    0855|
|     -5.0|   AGS|     9E|    0800|
|    -10.0|   CLT|     9E|    1350|
|     -4.0|   MEM|     9E|    1441|
|     -4.0|   MSP|     9E|    0847|
+---------+------+-------+--------+
only showing top 10 rows

[('DEP_DELAY', 'double'), ('ORIGIN', 'string'), ('CARRIER', 'string'), ('DEP_TIME', 'string')]


In [7]:
# create new column with only two digts
flights_data = flights_data.withColumn("DEP_TIME_HOUR", flights_data.DEP_TIME.substr(1,2))
flights_data.show()

flights_data.dtypes

flights_data.select("DEP_TIME_HOUR").distinct().count()  

#Replace if the hour=24 to 00
flights_data = flights_data.withColumn("DEP_TIME_HOUR", F.regexp_replace("DEP_TIME_HOUR", "24", "00"))

# see the nu value 
flights_data.select("DEP_TIME_HOUR").distinct().show()

+---------+------+-------+--------+-------------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|DEP_TIME_HOUR|
+---------+------+-------+--------+-------------+
|     -7.0|   AVL|     9E|    1658|           16|
|     -8.0|   JFK|     9E|    1122|           11|
|     -7.0|   CLE|     9E|    1334|           13|
|     -1.0|   BHM|     9E|    1059|           10|
|     -3.0|   GTF|     9E|    1057|           10|
|      0.0|   GRB|     9E|    0855|           08|
|     -5.0|   AGS|     9E|    0800|           08|
|    -10.0|   CLT|     9E|    1350|           13|
|     -4.0|   MEM|     9E|    1441|           14|
|     -4.0|   MSP|     9E|    0847|           08|
|     -5.0|   ATL|     9E|    1856|           18|
|     -9.0|   AGS|     9E|    1427|           14|
|     -6.0|   ATL|     9E|    1259|           12|
|     -6.0|   CVG|     9E|    0834|           08|
|     -1.0|   RDU|     9E|    1324|           13|
|     -5.0|   GSO|     9E|    1155|           11|
|    124.0|   BIS|     9E|    0809|           08|


In [8]:
#Handle outliers at ORIGIN and CARRIER columns
#ORIGIN
percentiles_ap = flights_data.groupBy("ORIGIN").count()\
                .select("ORIGIN","count", percent_rank().over(Window.partitionBy().orderBy("count")).alias("percent_rank_ap"))
                
percentiles_ap.show()

flights_percentiles_ap = flights_data.alias('flights').join(percentiles_ap.alias('percentiles'), col('percentiles.ORIGIN') == col('flights.ORIGIN'))\
            .select([col('flights.'+xx) for xx in flights_data.columns] + [col('percentiles.percent_rank_ap')])

flights_percentiles_ap.show()

#remove outliers (aka < 0.01) and store the rest on a new dataframe
flights_data = flights_percentiles_ap.where("percent_rank_ap > 0.01")


+------+-----+--------------------+
|ORIGIN|count|     percent_rank_ap|
+------+-----+--------------------+
|   AKN|   61|                 0.0|
|   PGV|   77|0.002785515320334262|
|   DLG|   82|0.005571030640668524|
|   GST|   82|0.005571030640668524|
|   HYA|   83|0.011142061281337047|
|   ADK|   98|0.013927576601671309|
|   OWB|  101|0.016713091922005572|
|   OGD|  104|0.019498607242339833|
|   PPG|  120|0.022284122562674095|
|   STC|  130|0.025069637883008356|
|   BFM|  152|0.027855153203342618|
|   HGR|  182| 0.03064066852367688|
|   SMX|  193|0.033426183844011144|
|   XWA|  205|0.036211699164345405|
|   ART|  210| 0.03899721448467967|
|   BKG|  221| 0.04178272980501393|
|   GUC|  238| 0.04456824512534819|
|   WYS|  264| 0.04735376044568245|
|   OTH|  361| 0.05013927576601671|
|   PSM|  423|0.052924791086350974|
+------+-----+--------------------+
only showing top 20 rows

+---------+------+-------+--------+-------------+------------------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|DEP_TI

In [9]:
#CARRIER
#calculate percentiles
percentiles_aw = flights_data.groupBy("CARRIER").count()\
                .select("CARRIER","count", percent_rank().over(Window.partitionBy().orderBy("count")).alias("percent_rank_aw"))
                
percentiles_aw.show()

#join percentiles_aw to initial dataset
flights_percentiles_aw = flights_data.alias('flights').join(percentiles_aw.alias('percentiles'), col('percentiles.CARRIER') == col('flights.CARRIER'))\
            .select([col('flights.'+xx) for xx in flights_data.columns] + [col('percentiles.percent_rank_aw')])

flights_percentiles_aw.show()


#remove outliers (aka < 0.01) and store the rest on a new dataframe
flights_data = flights_percentiles_aw.where("percent_rank_aw > 0.01")

+-------+-------+---------------+
|CARRIER|  count|percent_rank_aw|
+-------+-------+---------------+
|     HA|  83772|            0.0|
|     G4| 104703|         0.0625|
|     EV| 128711|          0.125|
|     F9| 133393|         0.1875|
|     NK| 201390|           0.25|
|     YV| 221422|         0.3125|
|     9E| 253104|          0.375|
|     AS| 261693|         0.4375|
|     OH| 282897|            0.5|
|     B6| 293761|         0.5625|
|     MQ| 316146|          0.625|
|     YX| 321887|         0.6875|
|     UA| 620767|           0.75|
|     OO| 819738|         0.8125|
|     AA| 927448|          0.875|
|     DL| 990195|         0.9375|
|     WN|1330598|            1.0|
+-------+-------+---------------+

+---------+------+-------+--------+-------------+------------------+---------------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|DEP_TIME_HOUR|   percent_rank_ap|percent_rank_aw|
+---------+------+-------+--------+-------------+------------------+---------------+
|      1.0|   MSY|     UA|    

In [10]:
# string indexer ORIGIN

indexer = StringIndexer()\
.setInputCol("ORIGIN")\
.setOutputCol("ORIGIN_INDEXED")

# one hot encoder ORIGIN
encoder = OneHotEncoder(dropLast=False)\
.setInputCols(["ORIGIN_INDEXED"])\
.setOutputCols(["ORIGIN_ENCODED"])


# string indexer CARRIER

indexer2 = StringIndexer()\
.setInputCol("CARRIER")\
.setOutputCol("CARRIER_INDEXED")

# one hot encoder CARRIER
encoder2 = OneHotEncoder(dropLast=False)\
.setInputCols(["CARRIER_INDEXED"])\
.setOutputCols(["CARRIER_ENCODED"])


# string indexer DEP_TIME_HOUR

indexer3 = StringIndexer()\
.setInputCol("DEP_TIME_HOUR")\
.setOutputCol("DEP_TIME_HOUR_INDEXED")


# one hot encoder DEP_TIME_HOUR
encoder3 = OneHotEncoder(dropLast=False)\
.setInputCols(["DEP_TIME_HOUR_INDEXED"])\
.setOutputCols(["DEP_TIME_HOUR_ENCODED"])


# vector_assembler
vector_assembler = VectorAssembler()\
.setInputCols(["ORIGIN_ENCODED", "CARRIER_ENCODED", "DEP_TIME_HOUR_ENCODED"])\
.setOutputCol("FEATURES")


In [11]:
# Make the pipeline  
pipe = Pipeline(stages=[indexer, encoder, indexer2, encoder2, indexer3, encoder3, vector_assembler])


# Fit and transform the data
piped_data = pipe.fit(flights_data).transform(flights_data)


In [12]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.7, .3])

In [13]:
#from pyspark.ml.regression import LinearRegression 
lr = LinearRegression(featuresCol ='FEATURES', labelCol ='DEP_DELAY')
#lr = LinearRegression(featuresCol ='FEATURES', labelCol ='DEP_DELAY',regParam=0.7, elasticNetParam=0.8)

In [14]:
#Fit the train and show the summary of the model
lrModel = lr.fit(training)
summary = lrModel.summary
summary.rootMeanSquaredError

47.85794831476784

In [15]:
#Make predictions on test dataset
predictions = lrModel.transform(test)
predictions.show()

+---------+------+-------+--------+-------------+-------------------+---------------+--------------+-----------------+---------------+---------------+---------------------+---------------------+--------------------+-------------------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME|DEP_TIME_HOUR|    percent_rank_ap|percent_rank_aw|ORIGIN_INDEXED|   ORIGIN_ENCODED|CARRIER_INDEXED|CARRIER_ENCODED|DEP_TIME_HOUR_INDEXED|DEP_TIME_HOUR_ENCODED|            FEATURES|         prediction|
+---------+------+-------+--------+-------------+-------------------+---------------+--------------+-----------------+---------------+---------------+---------------------+---------------------+--------------------+-------------------+
|    -30.0|   RDU|     UA|    2035|           20| 0.9108635097493036|           0.75|          32.0| (355,[32],[1.0])|            4.0| (16,[4],[1.0])|                 14.0|      (24,[14],[1.0])|(395,[32,359,385]...| 25.074535279359033|
|    -28.0|   SPN|     UA|    0847|           08|0.06128

In [16]:
#TBD can be omitted
final_df = predictions.select("DEP_DELAY","ORIGIN","CARRIER","DEP_TIME_HOUR","prediction")
final_df.show(10)

+---------+------+-------+-------------+-------------------+
|DEP_DELAY|ORIGIN|CARRIER|DEP_TIME_HOUR|         prediction|
+---------+------+-------+-------------+-------------------+
|    -30.0|   RDU|     UA|           20| 25.074535279359033|
|    -28.0|   SPN|     UA|           08|-3.1074951342738046|
|    -28.0|   SPN|     UA|           08|-3.1074951342738046|
|    -28.0|   SPN|     UA|           08|-3.1074951342738046|
|    -27.0|   RNO|     UA|           17|  9.733120707562449|
|    -27.0|   STT|     UA|           14|  6.529847541057964|
|    -26.0|   HDN|     UA|           14| 20.402847201672678|
|    -26.0|   PSP|     UA|           22| 29.841986075473077|
|    -26.0|   SAV|     UA|           13| 14.700379478700103|
|    -25.0|   KOA|     UA|           13|  0.663420075830917|
+---------+------+-------+-------------+-------------------+
only showing top 10 rows



In [17]:
lr_predictions = lrModel.transform(test)
lr_predictions.select("prediction","DEP_DELAY","FEATURES").show(10)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="DEP_DELAY",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

test_result = lrModel.evaluate(test)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

+-------------------+---------+--------------------+
|         prediction|DEP_DELAY|            FEATURES|
+-------------------+---------+--------------------+
| 25.074535279359033|    -30.0|(395,[32,359,385]...|
|-3.1074951342738046|    -28.0|(395,[336,359,372...|
|-3.1074951342738046|    -28.0|(395,[336,359,372...|
|-3.1074951342738046|    -28.0|(395,[336,359,372...|
|  9.733120707562449|    -27.0|(395,[64,359,374]...|
|  6.529847541057964|    -27.0|(395,[163,359,381...|
| 20.402847201672678|    -26.0|(395,[253,359,381...|
| 29.841986075473077|    -26.0|(395,[89,359,388]...|
| 14.700379478700103|    -26.0|(395,[73,359,383]...|
|  0.663420075830917|    -25.0|(395,[121,359,383...|
+-------------------+---------+--------------------+
only showing top 10 rows

R Squared (R2) on test data = 0.0516929
Root Mean Squared Error (RMSE) on test data = 47.8972


In [18]:
#Pandas
#pip install pandas
test.describe().toPandas()[["summary","DEP_DELAY"]]

Unnamed: 0,summary,DEP_DELAY
0,count,2163876.0
1,mean,11.07030763315458
2,stddev,49.18529507156865
3,min,-52.0
4,max,2064.0
