<br />

# International Flights

##  Big Data Systems and Architectures

## Spark Assignment


---

> Anastasios Theodorou, Student <br />
> Master of Science in Business Analytics <br />
> Department of Management Science and Technology <br />
> Athens University of Economics and Business <br />
> AM: p2822007


In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
import pyspark.ml.evaluation
from pyspark.ml.feature import IndexToString, MinMaxScaler, StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
#read the whole dataset
flight_data = spark.read.option("inferSchema", "true").option("header", "true").csv("671009038_T_ONTIME_REPORTING.csv")
flight_data.count()

7422037

In [4]:
flight_data.show(3)

+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+
|   FL_DATE|TAIL_NUM|CARRIER|ORIGIN|ORIGIN_CITY_NAME|DEST|    DEST_CITY_NAME|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|_c19|
+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+
|2019-01-01|  N8974C|     9E|   AVL|   Asheville, NC| ATL|       Atlanta, GA|    1658|     -7.0|    1758|    -22.0|      0.0|             null|     0.0|         null|         null|     null|          null|               null|null|
|2019-01-01|  N922XJ|     9E|   JFK|    New York, NY| RDU|Raleigh/Durham, NC

In [5]:
flight_data.createOrReplaceTempView("flight_data")

## Task 1

In [6]:
#replace the null values with 0 number
delay = spark.sql("""
SELECT ifnull(DEP_DELAY, '0') as DEP_DELAY, ifnull(ARR_DELAY, '0') as ARR_DELAY
FROM flight_data 
""")
delay.show()

+---------+---------+
|DEP_DELAY|ARR_DELAY|
+---------+---------+
|     -7.0|    -22.0|
|     -8.0|    -29.0|
|     -7.0|    -31.0|
|     -1.0|     -8.0|
|     -3.0|    -17.0|
|      0.0|     10.0|
|     -5.0|    -16.0|
|    -10.0|    -29.0|
|     -4.0|    -18.0|
|     -4.0|     -6.0|
|     -5.0|    -20.0|
|     -9.0|     -4.0|
|     -6.0|    -18.0|
|     -6.0|    -18.0|
|     -1.0|    -14.0|
|     -5.0|      3.0|
|    124.0|    109.0|
|     -4.0|    -10.0|
|     -4.0|     -7.0|
|     -1.0|     -4.0|
+---------+---------+
only showing top 20 rows



In [7]:
delay.createOrReplaceTempView("delay")

In [8]:
delay = spark.sql("""
SELECT avg(DEP_DELAY) as departures_delay, avg(ARR_DELAY) as arrivals_delay
FROM flight_data
""")
delay.show()

+------------------+-----------------+
|  departures_delay|   arrivals_delay|
+------------------+-----------------+
|10.923267333861132|5.414849168270909|
+------------------+-----------------+



## Task 2

In [9]:
#convert the NULL values with 0 number
flight_data = spark.sql("""
SELECT *, ifnull(DEP_DELAY, '0') as DEP_DELAY1, ifnull(ARR_DELAY, '0') as ARR_DELAY1
FROM flight_data 
""")
flight_data.show(3)

+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+----------+----------+
|   FL_DATE|TAIL_NUM|CARRIER|ORIGIN|ORIGIN_CITY_NAME|DEST|    DEST_CITY_NAME|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|_c19|DEP_DELAY1|ARR_DELAY1|
+----------+--------+-------+------+----------------+----+------------------+--------+---------+--------+---------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+----------+----------+
|2019-01-01|  N8974C|     9E|   AVL|   Asheville, NC| ATL|       Atlanta, GA|    1658|     -7.0|    1758|    -22.0|      0.0|             null|     0.0|         null|         null|     null|          null|               null|null|      -7.0|

In [10]:
#drop the previous columns referring to the delays
flight_data = flight_data.drop("DEP_DELAY", "ARR_DELAY")
flight_data.createOrReplaceTempView("flight_data")

In [11]:
flight_data.show(3)

+----------+--------+-------+------+----------------+----+------------------+--------+--------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+----------+----------+
|   FL_DATE|TAIL_NUM|CARRIER|ORIGIN|ORIGIN_CITY_NAME|DEST|    DEST_CITY_NAME|DEP_TIME|ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|_c19|DEP_DELAY1|ARR_DELAY1|
+----------+--------+-------+------+----------------+----+------------------+--------+--------+---------+-----------------+--------+-------------+-------------+---------+--------------+-------------------+----+----------+----------+
|2019-01-01|  N8974C|     9E|   AVL|   Asheville, NC| ATL|       Atlanta, GA|    1658|    1758|      0.0|             null|     0.0|         null|         null|     null|          null|               null|null|      -7.0|     -22.0|
|2019-01-01|  N922XJ|     9E|   JFK|    New York, NY| RDU|Raleigh/Du

In [12]:
#create the dataset of the airports (in the DEST column the airports were
#the same as in the origin column)
flights = spark.sql("""
SELECT ORIGIN AS airports, count(ORIGIN) as flight_numbers
FROM flight_data 
GROUP BY ORIGIN
HAVING count(ORIGIN) > (SELECT count(ORIGIN)*0.01 FROM flight_data)
""")
flights.show() #the number of flights for each airport

+--------+--------------+
|airports|flight_numbers|
+--------+--------------+
|     DCA|        139388|
|     IAH|        179688|
|     LGA|        171665|
|     BOS|        150564|
|     EWR|        136081|
|     LAS|        164020|
|     DEN|        252026|
|     SEA|        142857|
|     CLT|        235496|
|     BNA|         82654|
|     MIA|         89214|
|     TPA|         76599|
|     BWI|        104652|
|     PHX|        175328|
|     DFW|        304344|
|     SFO|        170918|
|     ATL|        395009|
|     FLL|         98409|
|     ORD|        339606|
|     MDW|         83763|
+--------+--------------+
only showing top 20 rows



In [13]:
print(flights.count())#the size of the unique airports

28


In [14]:
#create the dataset of the airways
airways = spark.sql("""
SELECT CARRIER AS airways, count(CARRIER) as flight_numbers
FROM flight_data 
GROUP BY CARRIER
HAVING count(CARRIER) > (SELECT count(CARRIER)*0.01 FROM flight_data)
""")
airways.show()

+-------+--------------+
|airways|flight_numbers|
+-------+--------------+
|     UA|        625910|
|     NK|        204845|
|     AA|        946776|
|     EV|        134683|
|     B6|        297411|
|     DL|        991986|
|     OO|        836445|
|     F9|        135543|
|     YV|        227888|
|     MQ|        327007|
|     OH|        289304|
|     HA|         83891|
|     G4|        105305|
|     YX|        329149|
|     AS|        264816|
|     WN|       1363946|
|     9E|        257132|
+-------+--------------+



In [15]:
print(airways.count())#the size of the unique airways

17


In [16]:
#make those datasets views in order to use them in the below queries
flights.createOrReplaceTempView("flights")
airways.createOrReplaceTempView("airways")

### Report 1

In [17]:
#we want the avg for the departures delays for every airport (origin)
avg_airports = spark.sql("""
SELECT fd.ORIGIN as Airports, round(avg(fd.DEP_DELAY1), 3) as Departures_Delay
FROM flight_data as fd, flights as f
WHERE fd.ORIGIN = f.airports
GROUP BY fd.ORIGIN
ORDER BY avg(fd.DEP_DELAY1) DESC""")
avg_airports.show()

+--------+----------------+
|Airports|Departures_Delay|
+--------+----------------+
|     EWR|          17.895|
|     LGA|          14.579|
|     ORD|          14.249|
|     BOS|          13.804|
|     FLL|           13.76|
|     MCO|          13.523|
|     DEN|          13.332|
|     SFO|          13.301|
|     MDW|          13.092|
|     DFW|          12.756|
|     JFK|          12.466|
|     IAH|          12.399|
|     BNA|          10.909|
|     BWI|          10.764|
|     MIA|          10.573|
|     CLT|          10.564|
|     TPA|          10.517|
|     DCA|          10.454|
|     LAS|          10.307|
|     PHL|          10.172|
+--------+----------------+
only showing top 20 rows



In [18]:
avg_airports.toPandas().to_csv('task2-ap-avg.csv', header=False)

### Report 2

In [19]:
#place an iterator
c_airports = spark.sql("""
SELECT ROW_NUMBER() OVER(PARTITION BY ORIGIN 
                            ORDER BY DEP_DELAY1 ASC) AS NUM_ROW,
        ORIGIN, DEP_DELAY1 AS DEP_DELAY
FROM flight_data, flights as f
WHERE ORIGIN = f.airports
""")
c_airports.show()

+-------+------+---------+
|NUM_ROW|ORIGIN|DEP_DELAY|
+-------+------+---------+
|      1|   DCA|     -1.0|
|      2|   DCA|     -1.0|
|      3|   DCA|     -1.0|
|      4|   DCA|     -1.0|
|      5|   DCA|     -1.0|
|      6|   DCA|     -1.0|
|      7|   DCA|     -1.0|
|      8|   DCA|     -1.0|
|      9|   DCA|     -1.0|
|     10|   DCA|     -1.0|
|     11|   DCA|     -1.0|
|     12|   DCA|     -1.0|
|     13|   DCA|     -1.0|
|     14|   DCA|     -1.0|
|     15|   DCA|     -1.0|
|     16|   DCA|     -1.0|
|     17|   DCA|     -1.0|
|     18|   DCA|     -1.0|
|     19|   DCA|     -1.0|
|     20|   DCA|     -1.0|
+-------+------+---------+
only showing top 20 rows



In [20]:
#save the length of the airport's flights
count_airports = spark.sql("""
SELECT ORIGIN, count(ORIGIN) as COR
FROM flight_data, flights as f
WHERE ORIGIN = f.airports
GROUP BY ORIGIN
""")
count_airports.show()

+------+------+
|ORIGIN|   COR|
+------+------+
|   DCA|139388|
|   IAH|179688|
|   LGA|171665|
|   BOS|150564|
|   EWR|136081|
|   LAS|164020|
|   DEN|252026|
|   SEA|142857|
|   BNA| 82654|
|   CLT|235496|
|   MIA| 89214|
|   BWI|104652|
|   TPA| 76599|
|   PHX|175328|
|   DFW|304344|
|   SFO|170918|
|   ATL|395009|
|   FLL| 98409|
|   ORD|339606|
|   MDW| 83763|
+------+------+
only showing top 20 rows



In [21]:
count_airports.createOrReplaceTempView("count_airports")

In [22]:
#find the lines where the medians are
count_airports = spark.sql("""
SELECT ORIGIN, COR, CAST(IF(MOD(COR, 2)=0, COR/2, (COR+1)/2) AS int) AS P,
        CAST(IF(MOD(COR, 2)=0, COR/2+1, 0) AS int) AS P1
FROM count_airports
""")
count_airports.show()

+------+------+------+------+
|ORIGIN|   COR|     P|    P1|
+------+------+------+------+
|   DCA|139388| 69694| 69695|
|   IAH|179688| 89844| 89845|
|   LGA|171665| 85833|     0|
|   BOS|150564| 75282| 75283|
|   EWR|136081| 68041|     0|
|   LAS|164020| 82010| 82011|
|   DEN|252026|126013|126014|
|   SEA|142857| 71429|     0|
|   BNA| 82654| 41327| 41328|
|   CLT|235496|117748|117749|
|   MIA| 89214| 44607| 44608|
|   BWI|104652| 52326| 52327|
|   TPA| 76599| 38300|     0|
|   PHX|175328| 87664| 87665|
|   DFW|304344|152172|152173|
|   SFO|170918| 85459| 85460|
|   ATL|395009|197505|     0|
|   FLL| 98409| 49205|     0|
|   ORD|339606|169803|169804|
|   MDW| 83763| 41882|     0|
+------+------+------+------+
only showing top 20 rows



In [23]:
c_airports.createOrReplaceTempView("c_airports")
count_airports.createOrReplaceTempView("count_airports")

In [24]:
#match to each P value the recording row of the delay
p_set = spark.sql("""
SELECT air.ORIGIN as AIRPORT, DEP_DELAY as DELAY1
FROM c_airports as air, count_airports as c
WHERE air.ORIGIN = c.ORIGIN AND NUM_ROW = P
""")
p_set.show()

+-------+------+
|AIRPORT|DELAY1|
+-------+------+
|    SAN|  -7.0|
|    MDW|   1.0|
|    SFO|  -8.0|
|    CLT|  -7.0|
|    MIA|  -7.0|
|    MCO|  -8.0|
|    PHX|  -8.0|
|    FLL|  -8.0|
|    ATL|  -6.0|
|    BOS|  -7.0|
|    SLC|  -6.0|
|    BNA|  -8.0|
|    DCA|  -7.0|
|    DFW|  -7.0|
|    SEA|  -7.0|
|    EWR|  -8.0|
|    PHL|  -7.0|
|    JFK|  -7.0|
|    DTW|  -6.0|
|    MSP|  -5.0|
+-------+------+
only showing top 20 rows



In [25]:
#match to each P1 value the recording row of the delay
p1_set = spark.sql("""
SELECT air.ORIGIN as AIRPORT, DEP_DELAY as DELAY2
FROM c_airports as air, count_airports as c
WHERE air.ORIGIN = c.ORIGIN AND NUM_ROW = P1
""")
p1_set.show()

+-------+------+
|AIRPORT|DELAY2|
+-------+------+
|    SAN|  -7.0|
|    PHX|  -8.0|
|    DFW|  -7.0|
|    MIA|  -7.0|
|    BOS|  -7.0|
|    BWI|     0|
|    LAX|  -8.0|
|    SFO|  -8.0|
|    DEN|  -9.0|
|    MSP|  -5.0|
|    DTW|  -6.0|
|    DCA|  -7.0|
|    BNA|  -8.0|
|    ORD|  -7.0|
|    LAS|  -8.0|
|    MCO|  -8.0|
|    SLC|  -6.0|
|    IAH|  -6.0|
|    CLT|  -7.0|
+-------+------+



In [26]:
p_set.createOrReplaceTempView("p_set")
p1_set.createOrReplaceTempView("p1_set")

In [27]:
#join the above tables to a final one
med_air = spark.sql("""
SELECT p_set.AIRPORT, DELAY1, DELAY2
FROM p_set
LEFT JOIN p1_set
ON p_set.AIRPORT = p1_set.AIRPORT
""")
med_air.show()

+-------+------+------+
|AIRPORT|DELAY1|DELAY2|
+-------+------+------+
|    DCA|  -7.0|  -7.0|
|    IAH|  -6.0|  -6.0|
|    LGA|  -7.0|  null|
|    BOS|  -7.0|  -7.0|
|    EWR|  -8.0|  null|
|    LAS|  -8.0|  -8.0|
|    DEN|  -9.0|  -9.0|
|    SEA|  -7.0|  null|
|    BNA|  -8.0|  -8.0|
|    CLT|  -7.0|  -7.0|
|    MIA|  -7.0|  -7.0|
|    BWI|     0|     0|
|    TPA|  -7.0|  null|
|    PHX|  -8.0|  -8.0|
|    DFW|  -7.0|  -7.0|
|    SFO|  -8.0|  -8.0|
|    ATL|  -6.0|  null|
|    FLL|  -8.0|  null|
|    ORD|  -7.0|  -7.0|
|    MDW|   1.0|  null|
+-------+------+------+
only showing top 20 rows



In [28]:
med_air.createOrReplaceTempView("med_air")

In [29]:
#find the median and sort it
med_air = spark.sql("""
SELECT AIRPORT, IF((DELAY2 IS NULL), DELAY1, (DELAY1 + DELAY2)/2) AS MEDIAN
FROM med_air
ORDER BY MEDIAN DESC
""")
med_air.show()

+-------+------+
|AIRPORT|MEDIAN|
+-------+------+
|    MDW|   1.0|
|    BWI|   0.0|
|    DEN|  -9.0|
|    MCO|  -8.0|
|    EWR|  -8.0|
|    LAS|  -8.0|
|    PHX|  -8.0|
|    SFO|  -8.0|
|    LAX|  -8.0|
|    FLL|  -8.0|
|    BNA|  -8.0|
|    DCA|  -7.0|
|    BOS|  -7.0|
|    LGA|  -7.0|
|    SEA|  -7.0|
|    MIA|  -7.0|
|    TPA|  -7.0|
|    DFW|  -7.0|
|    CLT|  -7.0|
|    SAN|  -7.0|
+-------+------+
only showing top 20 rows



In [30]:
med_air.toPandas().to_csv('task2-ap-med.csv', header=False)

### Report 3

In [31]:
#we want the avg for the departures delays for every airway (carrier)
avg_airways = spark.sql("""
SELECT fd.CARRIER as Airways, round(avg(fd.DEP_DELAY1), 3) as Departures_Delay
FROM flight_data as fd, airways as a
WHERE fd.CARRIER = a.airways
GROUP BY fd.CARRIER
ORDER BY avg(fd.DEP_DELAY1) DESC""")
avg_airways.show()

+-------+----------------+
|Airways|Departures_Delay|
+-------+----------------+
|     B6|          17.528|
|     EV|          16.451|
|     F9|          14.346|
|     YV|          13.412|
|     UA|          12.898|
|     OO|          12.313|
|     AA|          11.868|
|     NK|          10.756|
|     OH|          10.468|
|     9E|          10.085|
|     G4|          10.065|
|     WN|            9.93|
|     MQ|           8.967|
|     YX|           8.356|
|     DL|           8.141|
|     AS|            4.98|
|     HA|           1.295|
+-------+----------------+



In [32]:
avg_airways.toPandas().to_csv('task2-aw-avg.csv', header=False)

### Report 4

In [33]:
#The same steps as the "Report 2"
c_airways = spark.sql("""
SELECT ROW_NUMBER() OVER(PARTITION BY CARRIER 
                            ORDER BY DEP_DELAY1 ASC) AS NUM_ROW,
        CARRIER, DEP_DELAY1 AS DEP_DELAY
FROM flight_data, airways as a
WHERE CARRIER = a.airways 
""")
c_airways.show()

+-------+-------+---------+
|NUM_ROW|CARRIER|DEP_DELAY|
+-------+-------+---------+
|      1|     UA|     -1.0|
|      2|     UA|     -1.0|
|      3|     UA|     -1.0|
|      4|     UA|     -1.0|
|      5|     UA|     -1.0|
|      6|     UA|     -1.0|
|      7|     UA|     -1.0|
|      8|     UA|     -1.0|
|      9|     UA|     -1.0|
|     10|     UA|     -1.0|
|     11|     UA|     -1.0|
|     12|     UA|     -1.0|
|     13|     UA|     -1.0|
|     14|     UA|     -1.0|
|     15|     UA|     -1.0|
|     16|     UA|     -1.0|
|     17|     UA|     -1.0|
|     18|     UA|     -1.0|
|     19|     UA|     -1.0|
|     20|     UA|     -1.0|
+-------+-------+---------+
only showing top 20 rows



In [34]:
count_airways = spark.sql("""
SELECT CARRIER, count(CARRIER) as CAR
FROM flight_data, airways as a
WHERE CARRIER = a.airways
GROUP BY CARRIER
""")
count_airways.show()

+-------+-------+
|CARRIER|    CAR|
+-------+-------+
|     UA| 625910|
|     NK| 204845|
|     AA| 946776|
|     EV| 134683|
|     B6| 297411|
|     DL| 991986|
|     OO| 836445|
|     F9| 135543|
|     YV| 227888|
|     MQ| 327007|
|     OH| 289304|
|     HA|  83891|
|     G4| 105305|
|     YX| 329149|
|     AS| 264816|
|     WN|1363946|
|     9E| 257132|
+-------+-------+



In [35]:
count_airways.createOrReplaceTempView("count_airways")

In [36]:
count_airways = spark.sql("""
SELECT CARRIER, CAR, CAST(IF(MOD(CAR, 2)=0, CAR/2, (CAR+1)/2) AS int) AS P,
        CAST(IF(MOD(CAR, 2)=0, CAR/2+1, 0) AS int) AS P1
FROM count_airways
""")
count_airways.show()

+-------+-------+------+------+
|CARRIER|    CAR|     P|    P1|
+-------+-------+------+------+
|     UA| 625910|312955|312956|
|     NK| 204845|102423|     0|
|     AA| 946776|473388|473389|
|     EV| 134683| 67342|     0|
|     B6| 297411|148706|     0|
|     DL| 991986|495993|495994|
|     OO| 836445|418223|     0|
|     F9| 135543| 67772|     0|
|     YV| 227888|113944|113945|
|     MQ| 327007|163504|     0|
|     OH| 289304|144652|144653|
|     HA|  83891| 41946|     0|
|     G4| 105305| 52653|     0|
|     YX| 329149|164575|     0|
|     AS| 264816|132408|132409|
|     WN|1363946|681973|681974|
|     9E| 257132|128566|128567|
+-------+-------+------+------+



In [37]:
c_airways.createOrReplaceTempView("c_airways")
count_airways.createOrReplaceTempView("count_airways")

In [38]:
p_set = spark.sql("""
SELECT air.CARRIER as AIRWAY, DEP_DELAY as DELAY1
FROM c_airways as air, count_airways as c
WHERE air.CARRIER = c.CARRIER AND NUM_ROW = P
""")
p_set.show()

+------+------+
|AIRWAY|DELAY1|
+------+------+
|    EV|  -7.0|
|    UA|  -7.0|
|    NK|  -6.0|
|    9E|  -6.0|
|    G4|  -8.0|
|    HA|  -7.0|
|    YX|  -6.0|
|    WN|   0.0|
|    AS|  -7.0|
|    YV|  -7.0|
|    B6|  -8.0|
|    MQ|  -6.0|
|    OO|  -6.0|
|    OH|  -7.0|
|    F9|  -8.0|
|    DL|  -6.0|
|    AA|  -7.0|
+------+------+



In [39]:
p1_set = spark.sql("""
SELECT air.CARRIER as AIRWAY, DEP_DELAY as DELAY2
FROM c_airways as air, count_airways as c
WHERE air.CARRIER = c.CARRIER AND NUM_ROW = P1
""")
p1_set.show()

+------+------+
|AIRWAY|DELAY2|
+------+------+
|    WN|   0.0|
|    9E|  -6.0|
|    DL|  -6.0|
|    YV|  -7.0|
|    AS|  -7.0|
|    OH|  -7.0|
|    UA|  -7.0|
|    AA|  -7.0|
+------+------+



In [40]:
p_set.createOrReplaceTempView("p_set")
p1_set.createOrReplaceTempView("p1_set")

In [41]:
med_air = spark.sql("""
SELECT p_set.AIRWAY, DELAY1, DELAY2
FROM p_set
LEFT JOIN p1_set
ON p_set.AIRWAY = p1_set.AIRWAY
""")
med_air.show()

+------+------+------+
|AIRWAY|DELAY1|DELAY2|
+------+------+------+
|    UA|  -7.0|  -7.0|
|    NK|  -6.0|  null|
|    AA|  -7.0|  -7.0|
|    EV|  -7.0|  null|
|    B6|  -8.0|  null|
|    DL|  -6.0|  -6.0|
|    OO|  -6.0|  null|
|    F9|  -8.0|  null|
|    YV|  -7.0|  -7.0|
|    MQ|  -6.0|  null|
|    OH|  -7.0|  -7.0|
|    HA|  -7.0|  null|
|    G4|  -8.0|  null|
|    YX|  -6.0|  null|
|    AS|  -7.0|  -7.0|
|    WN|   0.0|   0.0|
|    9E|  -6.0|  -6.0|
+------+------+------+



In [42]:
med_air.createOrReplaceTempView("med_air")

In [43]:
med_air = spark.sql("""
SELECT AIRWAY, IF((DELAY2 IS NULL), DELAY1, (DELAY1 + DELAY2)/2) AS MEDIAN
FROM med_air
ORDER BY MEDIAN DESC
""")
med_air.show()

+------+------+
|AIRWAY|MEDIAN|
+------+------+
|    WN|   0.0|
|    B6|  -8.0|
|    G4|  -8.0|
|    F9|  -8.0|
|    UA|  -7.0|
|    AA|  -7.0|
|    EV|  -7.0|
|    YV|  -7.0|
|    OH|  -7.0|
|    HA|  -7.0|
|    AS|  -7.0|
|    NK|  -6.0|
|    MQ|  -6.0|
|    DL|  -6.0|
|    9E|  -6.0|
|    OO|  -6.0|
|    YX|  -6.0|
+------+------+



In [44]:
med_air.toPandas().to_csv('task2-aw-med.csv', header=False)

## Task 3

In [45]:
#combine the above tables to one
data = spark.sql("""
SELECT ORIGIN AS airports, CARRIER AS airways, DEP_TIME AS dep_time, DEP_DELAY1 AS delay
FROM flight_data, flights, airways
WHERE CARRIER = airways AND ORIGIN = airports
""")
data.show()

+--------+-------+--------+-----+
|airports|airways|dep_time|delay|
+--------+-------+--------+-----+
|     DCA|     UA|    1602| -3.0|
|     DCA|     UA|     821| -9.0|
|     DCA|     UA|    1711| 26.0|
|     DCA|     UA|    1649| 24.0|
|     DCA|     UA|    1144| -1.0|
|     DCA|     UA|     602| -8.0|
|     DCA|     UA|     737| -8.0|
|     DCA|     UA|     838| -7.0|
|     DCA|     UA|    1559| -6.0|
|     DCA|     UA|     828| -2.0|
|     DCA|     UA|    1640| -5.0|
|     DCA|     UA|    1634|  9.0|
|     DCA|     UA|     648| -7.0|
|     DCA|     UA|     625| 15.0|
|     DCA|     UA|     740| -5.0|
|     DCA|     UA|    1836|151.0|
|     DCA|     UA|     820|-10.0|
|     DCA|     UA|    1656| 11.0|
|     DCA|     UA|    1649| 24.0|
|     DCA|     UA|     644|-11.0|
+--------+-------+--------+-----+
only showing top 20 rows



In [46]:
data.createOrReplaceTempView("data")

* Prepare feature vectors.

In [47]:
#place 0 when the time had 3 digits and select only the hour
data = spark.sql("""
SELECT airports,
airways, CAST(SUBSTRING(IF(0<(dep_time DIV 1000) AND (dep_time DIV 1000)<=9, dep_time, CONCAT(0,dep_time)),1,2) AS int) AS dep_time,
delay
FROM data
""")
data.show()
data.createOrReplaceTempView("data")

+--------+-------+--------+-----+
|airports|airways|dep_time|delay|
+--------+-------+--------+-----+
|     DCA|     UA|      16| -3.0|
|     DCA|     UA|       8| -9.0|
|     DCA|     UA|      17| 26.0|
|     DCA|     UA|      16| 24.0|
|     DCA|     UA|      11| -1.0|
|     DCA|     UA|       6| -8.0|
|     DCA|     UA|       7| -8.0|
|     DCA|     UA|       8| -7.0|
|     DCA|     UA|      15| -6.0|
|     DCA|     UA|       8| -2.0|
|     DCA|     UA|      16| -5.0|
|     DCA|     UA|      16|  9.0|
|     DCA|     UA|       6| -7.0|
|     DCA|     UA|       6| 15.0|
|     DCA|     UA|       7| -5.0|
|     DCA|     UA|      18|151.0|
|     DCA|     UA|       8|-10.0|
|     DCA|     UA|      16| 11.0|
|     DCA|     UA|      16| 24.0|
|     DCA|     UA|       6|-11.0|
+--------+-------+--------+-----+
only showing top 20 rows



In [48]:
#create the "df" for the one-hot encoding
df = data.na.fill(0)

df.select("dep_time").distinct().orderBy("dep_time").show(5)

+--------+
|dep_time|
+--------+
|       0|
|       1|
|       2|
|       3|
|       4|
+--------+
only showing top 5 rows



* One - Hot - Encoding

In [49]:
#one hot encoding of all the string columns
categorical_columns= ['airports', 'airways', 'dep_time']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(df)
newdf = model.transform(df)
newdf.show(5)

+--------+-------+--------+-----+----------------+---------------+----------------+------------------------+-----------------------+------------------------+--------------------+
|airports|airways|dep_time|delay|airports_indexed|airways_indexed|dep_time_indexed|airports_indexed_encoded|airways_indexed_encoded|dep_time_indexed_encoded|            features|
+--------+-------+--------+-----+----------------+---------------+----------------+------------------------+-----------------------+------------------------+--------------------+
|     DCA|     UA|      16| -3.0|            16.0|            3.0|             9.0|         (28,[16],[1.0])|         (17,[3],[1.0])|          (25,[9],[1.0])|(70,[16,31,54],[1...|
|     DCA|     UA|       8| -9.0|            16.0|            3.0|             0.0|         (28,[16],[1.0])|         (17,[3],[1.0])|          (25,[0],[1.0])|(70,[16,31,45],[1...|
|     DCA|     UA|      17| 26.0|            16.0|            3.0|             7.0|         (28,[16],[1.0

In [50]:
newdf.printSchema()

root
 |-- airports: string (nullable = true)
 |-- airways: string (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- delay: string (nullable = false)
 |-- airports_indexed: double (nullable = false)
 |-- airways_indexed: double (nullable = false)
 |-- dep_time_indexed: double (nullable = false)
 |-- airports_indexed_encoded: vector (nullable = true)
 |-- airways_indexed_encoded: vector (nullable = true)
 |-- dep_time_indexed_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [51]:
newdf.count()

4632951

* Training - Test Sets

In [52]:
#we are going to need features and delay columns
lrdata = newdf.select('airports', 'airways', 'dep_time','features', newdf.delay.cast("float"))
lrdata.show(5)

+--------+-------+--------+--------------------+-----+
|airports|airways|dep_time|            features|delay|
+--------+-------+--------+--------------------+-----+
|     DCA|     UA|      16|(70,[16,31,54],[1...| -3.0|
|     DCA|     UA|       8|(70,[16,31,45],[1...| -9.0|
|     DCA|     UA|      17|(70,[16,31,52],[1...| 26.0|
|     DCA|     UA|      16|(70,[16,31,54],[1...| 24.0|
|     DCA|     UA|      11|(70,[16,31,50],[1...| -1.0|
+--------+-------+--------+--------------------+-----+
only showing top 5 rows



In [53]:
#split to training and test sets
training, test = lrdata.randomSplit(weights = [0.70, 0.30], seed = 1)
print("Size of training set: " + str(training.count()))
print("Size of test set: " + str(test.count()))

Size of training set: 3244238
Size of test set: 1388713


* Create Linear Regression Model

In [54]:
#create the linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='delay', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(training)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-2.5095520308126877,3.2798728865289153,0.7703989217405849,0.17208236459496357,-0.7916723847024338,-1.8631365549360148,0.0,-1.9779656987909404,3.4422780018382517,0.0,-2.1986825471654723,0.0,0.0,1.4679644021154479,0.5089598092216664,-1.8177257233461326,0.0,5.573663496164053,0.0,0.0,-3.38710535109035,0.0,0.0,-0.7361105636361487,0.0,0.0,0.0,0.0,0.0,-1.4463920074517074,0.0,0.0,0.0,2.77317160891493,-3.15606104220751,-4.798075508572511,-2.0310058858863123,-1.062197305559125,-0.14886754164384464,0.0,1.503232764240975,0.0,0.0,0.0,0.0,-6.643712988665897,-4.47551993337573,5.028108777226659,0.0,-2.682604107407095,-1.6208124642905375,-7.636206359192928,2.430600216768728,-0.6063945873935096,1.6125604775000009,0.0,3.964418970873359,5.216521680387868,0.0,-9.221566746189062,11.742318207199745,12.133337497713047,-8.896448176575875,-9.599529385910342,37.931061818522366,75.9804466787146,10.2598878586563,88.16745716551331,68.7302414737115,47.65280331904116]
Intercept: 11.248573553586644


In [55]:
#summary and goodness of fit
training.select('delay','dep_time').describe().show()

+-------+------------------+------------------+
|summary|             delay|          dep_time|
+-------+------------------+------------------+
|  count|           3244238|           3244238|
|   mean|11.538565912858427|13.313012177281692|
| stddev|46.530599534866546|5.3244586509628915|
|    min|             -59.0|                 0|
|    max|            2672.0|                24|
+-------+------------------+------------------+



In [56]:
trainingSummary = lr_model.summary
print("R Squared (R2) on training data = %f" % trainingSummary.r2)
print("Root Mean Squared Error (RMSE) on training data = %f" % trainingSummary.rootMeanSquaredError)

R Squared (R2) on training data = 0.052316
Root Mean Squared Error (RMSE) on training data = 45.297093


* Predictions with test set

In [57]:
lr_predictions = lr_model.transform(test)
lr_predictions.select("prediction","delay",'airports', 'airways', 'dep_time').distinct().show(10)

+--------------------+-----+--------+-------+--------+
|          prediction|delay|airports|airways|dep_time|
+--------------------+-----+--------+-------+--------+
| -0.8605078631363856|  0.0|     ATL|     UA|       0|
|   96.90647868828727|403.0|     ATL|     UA|       2|
|   77.46926299648545|242.0|     ATL|     UA|       3|
|   77.46926299648545|243.0|     ATL|     UA|       3|
|   77.46926299648545|273.0|     ATL|     UA|       3|
|-0.15742665380191845| -8.0|     ATL|     UA|       5|
|-0.15742665380191845| -6.0|     ATL|     UA|       5|
|-0.15742665380191845| -4.0|     ATL|     UA|       5|
|-0.15742665380191845| -3.0|     ATL|     UA|       5|
|-0.15742665380191845| -2.0|     ATL|     UA|       5|
+--------------------+-----+--------+-------+--------+
only showing top 10 rows



* Model evaluation

In [58]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="delay",metricName="r2")
test_eval = lr_model.evaluate(test)
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
print("Root Mean Squared Error (RMSE) on test data = %g" % test_eval.rootMeanSquaredError)

R Squared (R2) on test data = 0.0532136
Root Mean Squared Error (RMSE) on test data = 45.3351


In [59]:
test.select('delay','dep_time').describe().show()

+-------+------------------+------------------+
|summary|             delay|          dep_time|
+-------+------------------+------------------+
|  count|           1388713|           1388713|
|   mean|11.551303257044472|13.322641179278945|
| stddev|46.591704364518684| 5.320371820639896|
|    min|             -48.0|                 0|
|    max|            1790.0|                24|
+-------+------------------+------------------+



In [60]:
sc.stop()