In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
spark = SparkSession.builder.appName('ChicagoFoodInspection').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/*'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.driver.port', '37971'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'md01.rcc.local,md02.rcc.local'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.executorEnv.PYTHONPATH',
  '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/py4j-0.10.7-src.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/pyspark.zip'),


In [3]:
import pandas as pd
pd.set_option('display.max_columns', None)

## Data Exploration

In [4]:
df = spark.read.csv("/user/junruiw/dta.csv",inferSchema=True, header=True )

In [5]:
df.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'double'),
 ('CRSDepTime', 'int'),
 ('ArrTime', 'double'),
 ('CRSArrTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'double'),
 ('CRSElapsedTime', 'double'),
 ('AirTime', 'double'),
 ('ArrDelay', 'double'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'double'),
 ('TaxiIn', 'double'),
 ('TaxiOut', 'double'),
 ('Cancelled', 'double'),
 ('CancellationCode', 'string'),
 ('Diverted', 'double'),
 ('CarrierDelay', 'double'),
 ('WeatherDelay', 'double'),
 ('NASDelay', 'double'),
 ('SecurityDelay', 'double'),
 ('LateAircraftDelay', 'double')]

In [6]:
#number of partitions
df.rdd.getNumPartitions()

103

In [7]:
# extracting number of rows from the Dataframe
row = df.count()
   
# extracting number of columns from the Dataframe
col = len(df.columns)

# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (123534969, 29)
Number of Rows are: 123534969
Number of Columns are: 29


## filter rows which have non-empty origin and dest airport (no matter cancel or not)
* df: the raw dataset
* df3: the dataset with non-empty 'origin' and 'dest'

In [8]:
df2=df.filter(F.col("Origin").cast("int").isNull())
df3=df2.filter(F.col("Dest").cast("int").isNull())

In [9]:
122379717/123534969

0.9906483807026333

## check missing values of df 3

* Find the number of missing values for each column

In [10]:
from pyspark.sql.functions import isnan, when, count, col
df3.select([count(when(df3[c].isNull(), c)).alias(c) for c in df3.columns]).show()


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+--------+-----------------+--------------+--------+--------+--------+------+----+--------+--------+--------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum| TailNum|ActualElapsedTime|CRSElapsedTime| AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|  TaxiIn| TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+--------+-----------------+--------------+--------+--------+--------+------+----+--------+--------+--------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|   0|    0|         0|        0|2104357|         0|2384266|         0|        

* find the percentage of missing values

In [11]:
# missing value percentage
#from pyspark.sql.functions import *
#df3_miss_perc = df3.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df3.columns])
#df3_miss_perc.toPandas()

## drop useless columns

* drop scheduled time cuz we have the actual time and time delay (actual - schedule = delay)

In [12]:
df4 = df3.drop('CRSDepTime', 'CRSArrTime', 'CRSElapsedTime')

* flight number has the same use as tailnum, then drop tailnum

In [13]:
df5 = df4.drop('TailNum')

* cancellation code has 99% missing values

In [14]:
df6 = df5.drop('CancellationCode')

In [15]:
#df6_miss_perc = df6.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df6.columns])
#df6_miss_perc.toPandas()

# 1. check df w/ cancelled = 0

In [16]:
# in our project, we mainly focus on cancellation and delay.


In [17]:
df3.groupBy('Cancelled').count().show()

+---------+---------+
|Cancelled|    count|
+---------+---------+
|      0.0|120274172|
|      1.0|  2105545|
+---------+---------+



In [18]:
2105545/122379717

0.01720501608939004

In [19]:
# check missing values when cancellation = 0
df_no_cancel = df6.filter("Cancelled == '0.0'")

In [20]:
# extracting number of rows from the Dataframe
row = df_no_cancel.count()
   
# extracting number of columns from the Dataframe
col = len(df_no_cancel.columns)

# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (120274172, 24)
Number of Rows are: 120274172
Number of Columns are: 24


In [21]:
from pyspark.sql.functions import isnan, when, count, col
df_no_cancel.select([count(when(col(c).isNull(), c)).alias(c) for c in df_no_cancel.columns]).show()

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+--------+--------+--------+------+----+--------+--------+--------+---------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime| AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|  TaxiIn| TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+--------+--------+--------+------+----+--------+--------+--------+---------+--------+------------+------------+--------+-------------+-----------------+
|   0|    0|         0|        0|      0| 278721|            0|        0|           281772|37012689|  281772|       0|     0|   0|  199960|36840464|36826249|        0|       0|    86665731|    86665731|86665731|     86665731|         86665731|
+----+-----+----------+-

### filter rows with cancelled = 0, but has missing values

In [22]:
df_no_cancel = df_no_cancel.dropna(subset=('ArrTime','ActualElapsedTime','ActualElapsedTime','Distance'))

In [23]:
pdf = df_no_cancel.select([count(when(df_no_cancel[c].isNull(), c)).alias(c) for c in df_no_cancel.columns])
pdf.toPandas()
# df.cancelled = 0  is cleaned!

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,0,0,0,0,0,36537615,0,0,0,0,0,36537494,36537494,0,0,86252984,86252984,86252984,86252984,86252984


In [24]:
df_no_cancel.count()

119793199

In [25]:
#seems like there is little difference between Airtime and TaxiIn, make the amount of missing values equal
#( as we know later, most of the missing values are before 1995)

In [26]:
df_ncancel1 = df_no_cancel.filter(df_no_cancel.Year.cast("int") <1995)

In [27]:
df_ncancel2 = df_no_cancel.filter(df_no_cancel.Year.cast("int") >=1995)

In [28]:
pdf = df_ncancel1.select([count(when(df_ncancel1[c].isNull(), c)).alias(c) for c in df_ncancel1.columns])
pdf.toPandas()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,0,0,0,0,0,36537494,0,0,0,0,0,36537494,36537494,0,0,36537494,36537494,36537494,36537494,36537494


In [29]:
pdf = df_ncancel2.select([count(when(df_ncancel2[c].isNull(), c)).alias(c) for c in df_ncancel2.columns])
pdf.toPandas()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,0,0,0,0,0,121,0,0,0,0,0,0,0,0,0,49715490,49715490,49715490,49715490,49715490


In [30]:
df_ncancel2 = df_ncancel2.na.drop(subset=["AirTime"])

In [31]:
df_no_cancel2 = df_ncancel1.union(df_ncancel2)

# 2. check the dataset w/ cancelled = 1

In [32]:
df_cancel = df6.filter(df6.Cancelled == '1.0')


In [33]:
# extracting number of rows from the Dataframe
row = df_cancel.count()
   
# extracting number of columns from the Dataframe
col = len(df_cancel.columns)

# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (2105545, 24)
Number of Rows are: 2105545
Number of Columns are: 24


In [34]:
pdf = df_cancel.select([count(when(df_cancel[c].isNull(), c)).alias(c) for c in df_cancel.columns])
pdf.toPandas()
# pdcancel.sort_values(by=['Year'])
# df.cancelled = 0  is cleaned!

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,2104357,2105545,0,0,2105545,2053497,2105545,2104357,0,0,2040,556831,556455,0,0,1508450,1508450,1508450,1508450,1508450


normally speaking, when flight is cancelled, depart time, arrive time and air time should be Null.
* airtime: 2053497
* rows: 2105545

### drop rows in the dataset w/ cancelled = 1

* if flight is cancelled, there is no reason having airtime

In [35]:
df_cancel2 = df_cancel.filter(df_cancel.AirTime.isNull())

In [36]:
df_cancel2.count()

2053497

In [37]:
df_cancel2.select([count(when(df_cancel2[c].isNull(), c)).alias(c) for c in df_cancel2.columns]).show()

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|   0|    0|         0|        0|2052309|2053497|            0|        0|          2053497|2053497| 2053497| 2052309|     0|   0|    2040|556831| 556455|        0|       0|     1456402|     1456402| 1456402|      1456402|          1456402|
+----+-----+----------+---------+-------

still some non-missing value in Deptime and Depdelay
* deptime: 2052309
* rows: 2053497

In [38]:
df_cancel3 = df_cancel2.filter(df_cancel2.DepTime.isNull())
pdf = df_cancel3.select([count(when(df_cancel3[c].isNull(), c)).alias(c) for c in df_cancel3.columns])
pdf.toPandas()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,2052309,2052309,0,0,2052309,2052309,2052309,2052309,0,0,2040,555643,555643,0,0,1455214,1455214,1455214,1455214,1455214


In [39]:
df_cancel3 = df_cancel3.na.drop(subset=["Distance"])

In [40]:
pdf = df_cancel3.select([count(when(df_cancel3[c].isNull(), c)).alias(c) for c in df_cancel3.columns])
pdf.toPandas()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,2050269,2050269,0,0,2050269,2050269,2050269,2050269,0,0,0,553679,553679,0,0,1453174,1453174,1453174,1453174,1453174


In [41]:
# now dataset with cancelled = 1 is cleaned. all rows with missing values = 2050269

# 3. combine dataset with cancelled = 1 and  cancelled = 0

In [42]:
df_clean = df_no_cancel2.union(df_cancel3)

In [43]:
df_clean.count()

121843347

In [44]:
2052309+119793199-121-2040 
# 121 drop from the AirTime after 1995 when flight cancelled = 0
# 2040 drop from the distance when flight cancelled = 1

121843347

In [45]:
df_clean.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'double'),
 ('ArrTime', 'double'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('ActualElapsedTime', 'double'),
 ('AirTime', 'double'),
 ('ArrDelay', 'double'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'double'),
 ('TaxiIn', 'double'),
 ('TaxiOut', 'double'),
 ('Cancelled', 'double'),
 ('Diverted', 'double'),
 ('CarrierDelay', 'double'),
 ('WeatherDelay', 'double'),
 ('NASDelay', 'double'),
 ('SecurityDelay', 'double'),
 ('LateAircraftDelay', 'double')]

## 1995 data (1995~2008 w/o Taxi_IN & Taxi_OUT missing values)

In [46]:
df1995 = df_clean.filter(df_clean.Year.cast("int") >=1995)

In [47]:
df1995.count()

84888420

In [48]:
pdf = df1995.select([count(when(df1995[c].isNull(), c)).alias(c) for c in df1995.columns])
pd1995 =pdf.toPandas()
pd1995.sort_values(by=['Year'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,1632836,1632836,0,0,1632836,1632836,1632836,1632836,0,0,0,136246,136246,0,0,50751110,50751110,50751110,50751110,50751110


In [49]:
#still has 136246 taxi_in and taxi_out

In [50]:
136246/84888420

0.0016050010119165843

In [51]:
df1995 = df1995.na.drop(subset=["TaxiIn"])

In [52]:
pdf = df1995.select([count(when(df1995[c].isNull(), c)).alias(c) for c in df1995.columns])
pd1995 =pdf.toPandas()
pd1995.sort_values(by=['Year'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,1496590,1496590,0,0,1496590,1496590,1496590,1496590,0,0,0,0,0,0,0,50614864,50614864,50614864,50614864,50614864


In [53]:
#further study
# if considering no deptime/arrtime null value
1496590/884888420
#it is reasonable to drop 

0.0016912753813639013

In [54]:
# df1995 = df1995.na.drop(subset=["DepTime"])

In [55]:
pdf = df1995.select([count(when(df1995[c].isNull(), c)).alias(c) for c in df1995.columns])
pd1995 =pdf.toPandas()
pd1995.sort_values(by=['Year'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,1496590,1496590,0,0,1496590,1496590,1496590,1496590,0,0,0,0,0,0,0,50614864,50614864,50614864,50614864,50614864


In [56]:
# now df1995 is cleaned!!!!

# 2004 data (2004~2007 delay dataset)

In [57]:
df2004 = df_clean.filter((df_clean.Year.cast("int") >=2004) & (df_clean.Year.cast("int") <=2007) )

In [58]:
df2004.count()

28803826

In [59]:
pdf = df2004.select([count(when(df2004[c].isNull(), c)).alias(c) for c in df2004.columns])
pd2004 =pdf.toPandas()
pd2004.sort_values(by=['Year'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,544169,544169,0,0,544169,544169,544169,544169,0,0,0,0,0,0,0,0,0,0,0,0


In [60]:
# the perc of missing values
544169/28803826

0.018892247161887452

In [61]:
df2004 = df2004.na.drop(subset=["DepTime"])

In [62]:
pdf = df2004.select([count(when(df2004[c].isNull(), c)).alias(c) for c in df2004.columns])
pd2004 =pdf.toPandas()
pd2004.sort_values(by=['Year'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,ArrTime,UniqueCarrier,FlightNum,ActualElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [63]:
#summary

# we have 5 dataset
# 1st: the raw dataset: df
# 2nd: the dataset that has no 'Origin' & 'Dest' missing values:  df6
# 3rd: the cleaned dataset: df_clean
# 4th: the dataset that based on df_clean and has no 'taxi_in' and 'taxi_out' missing values: df 1995
# 5th: the dataset that based on df_clean and used to do delay analysis: df2004

# Model Building - Cancelled - 1995

## One Hot Encoding

In [64]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
indexer1 = StringIndexer(inputCol = "UniqueCarrier", outputCol = "UniqueCarrierIdx").setHandleInvalid("skip")
indexer2 = StringIndexer(inputCol = "Origin", outputCol = "OriginIdx").setHandleInvalid("skip")
indexer3 = StringIndexer(inputCol = "Dest", outputCol = "DestIdx").setHandleInvalid("skip")
inputs = [indexer1.getOutputCol(), indexer2.getOutputCol(), indexer3.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols = inputs, outputCols = ["UniqueCarrierVec", "OriginVec", "DestVec"])
pipeline = Pipeline(stages = [indexer1, indexer2, indexer3, encoder])
encodedData1995 = pipeline.fit(df1995).transform(df1995)
encodedData1995.show(5)

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+----------------+---------+-------+----------------+---------------+----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|UniqueCarrierIdx|OriginIdx|DestIdx|UniqueCarrierVec|      OriginVec|         DestVec|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+----------------+---------+-------+----------------+---------------+----------------+
|1995|    1|         6|  

In [65]:
assembler = VectorAssembler(inputCols = ['UniqueCarrierVec', 'OriginVec', 'DestVec', 'Year', 'Month', 'DayofMonth', \
                                         'DayOfWeek', 'Distance'], \
                            outputCol = 'features')
encodedData1995 = assembler.transform(encodedData1995)

In [66]:
encodedData1995 = encodedData1995.drop("_c0")
train_df1995 = encodedData1995.sampleBy("Cancelled", fractions = {0: 0.8, 1: 0.8}, seed = 1)

# Undersampling data with non-cancelled
train_df1995_non_cancelled = train_df1995.filter(train_df1995.Cancelled == 0).sample(withReplacement = False, fraction = 0.02, \
                                                                                    seed = 1)
train_df1995_cancelled = train_df1995.filter(train_df1995.Cancelled == 1)
train_df1995 = train_df1995_non_cancelled.union(train_df1995_cancelled)

test_df1995 = encodedData1995.subtract(train_df1995)
train_df1995.show(1)

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+----------------+---------+-------+----------------+---------------+----------------+--------------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|UniqueCarrierIdx|OriginIdx|DestIdx|UniqueCarrierVec|      OriginVec|         DestVec|            features|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+----------------+---------+-------+----------------+---------------+-

## Logistic Regression

In [67]:
%%time
from pyspark.ml.classification import LogisticRegression
lgr = LogisticRegression(featuresCol = 'features', labelCol = 'Cancelled')
lgrm = lgr.fit(train_df1995)
predictions = lgrm.transform(test_df1995)

CPU times: user 21 ms, sys: 8.93 ms, total: 30 ms
Wall time: 1min 24s


In [68]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "Origin", "Dest", "Distance", "Cancelled", \
                   "prediction").show(20)

+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|Origin|Dest|Distance|Cancelled|prediction|
+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|1995|    1|         1|        7|           CO|   CHS| ATL|   259.0|      0.0|       0.0|
|1995|    1|         5|        4|           US|   PIT| BGM|   251.0|      0.0|       1.0|
|1995|    1|         9|        1|           WN|   MCI| PHX|  1044.0|      0.0|       0.0|
|1995|    1|        11|        3|           WN|   BHM| MSY|   321.0|      0.0|       0.0|
|1995|    1|        15|        7|           AA|   ORD| LAX|  1745.0|      0.0|       0.0|
|1995|    1|        18|        3|           DL|   RNO| FAT|   188.0|      0.0|       0.0|
|1995|    1|        19|        4|           HP|   ONT| PHX|   325.0|      0.0|       1.0|
|1995|    1|        26|        4|           US|   TPA| BOS|  1185.0|      0.0|       0.0|
|1995|    

In [69]:
predictions.filter(predictions.Cancelled == 1).select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "Origin", \
                                                      "Dest", "Distance", "Cancelled", "prediction") \
.show(20)

+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|Origin|Dest|Distance|Cancelled|prediction|
+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|1997|    8|        17|        7|           UA|   DEN| SFO|   967.0|      1.0|       0.0|
|2000|    7|        14|        5|           NW|   DTW| IND|   231.0|      1.0|       1.0|
|2006|    2|        19|        7|           WN|   HOU| DAL|   239.0|      1.0|       1.0|
|1996|    5|        16|        4|           UA|   ORD| LAX|  1745.0|      1.0|       1.0|
|1998|   12|         9|        3|           WN|   ONT| PHX|   325.0|      1.0|       0.0|
|1999|    1|        10|        7|           DL|   ATL| MCO|   403.0|      1.0|       0.0|
|2000|    2|        28|        1|           AS|   SEA| PDX|   129.0|      1.0|       1.0|
|2001|    6|        13|        3|           DL|   RDU| ATL|   356.0|      1.0|       1.0|
|2003|    

In [70]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = "Cancelled", predictionCol = "prediction")
print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))
evaluator_auc = BinaryClassificationEvaluator(labelCol = "Cancelled", rawPredictionCol = "prediction", \
                                              metricName = "areaUnderROC")
print(evaluator_auc.evaluate(predictions))

0.6634329367015867
0.7942959877280834
0.6250242205921233


## Random Forest

In [71]:
%%time
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(labelCol = "Cancelled", predictionCol = "prediction", seed = 1)
rfcm = rfc.fit(train_df1995)
predictions = rfcm.transform(test_df1995)

CPU times: user 47.1 ms, sys: 12.7 ms, total: 59.8 ms
Wall time: 4min 43s


In [72]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "Origin", "Dest", "Distance", "Cancelled", \
                   "prediction").show(20)

+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|Origin|Dest|Distance|Cancelled|prediction|
+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|1995|    1|         2|        1|           WN|   SAN| SMF|   480.0|      0.0|       0.0|
|1995|    1|         2|        1|           WN|   LIT| DAL|   296.0|      0.0|       0.0|
|1995|    1|         5|        4|           DL|   ROC| ATL|   749.0|      0.0|       0.0|
|1995|    1|         6|        5|           US|   GSO| PIT|   304.0|      0.0|       0.0|
|1995|    1|        12|        4|           DL|   SHV| DFW|   190.0|      0.0|       0.0|
|1995|    1|        13|        5|           UA|   DEN| PHX|   602.0|      0.0|       1.0|
|1995|    1|        22|        7|           NW|   ATL| DTW|   594.0|      0.0|       1.0|
|1995|    1|        25|        3|           DL|   PHX| CVG|  1569.0|      0.0|       0.0|
|1995|    

In [73]:
predictions.filter(predictions.Cancelled == 1).select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "Origin", \
                                                      "Dest", "Distance", "Cancelled", "prediction") \
.show(20)

+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|Origin|Dest|Distance|Cancelled|prediction|
+----+-----+----------+---------+-------------+------+----+--------+---------+----------+
|1996|    8|        15|        4|           UA|   ORD| DAY|   240.0|      1.0|       1.0|
|1998|    2|         3|        2|           HP|   LAX| LAS|   236.0|      1.0|       0.0|
|2004|    8|        14|        6|           DL|   MCO| EWR|   938.0|      1.0|       0.0|
|2005|    1|        17|        1|           WN|   HOU| HRL|   276.0|      1.0|       0.0|
|2005|    9|         5|        1|           CO|   MSY| IAH|   305.0|      1.0|       0.0|
|2006|   12|        14|        4|           EV|   ATL| OKC|   761.0|      1.0|       0.0|
|1996|    7|        31|        3|           DL|   BOS| LGA|   185.0|      1.0|       1.0|
|1998|    1|         2|        5|           US|   PIT| CRW|   164.0|      1.0|       0.0|
|1999|    

In [74]:
print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))
print(evaluator_auc.evaluate(predictions))

0.8963414347898413
0.9419079168613793
0.5642038037400553


# Model 1 Building - Delayed - 2004 (Without Delay Reason)

In [75]:
from pyspark.sql.functions import expr
df2004 = df2004.withColumn("dep_hour", expr("substring(DepTime, 1, length(DepTime)-4)"))
df2004 = df2004.withColumn("arr_hour", expr("substring(ArrTime, 1, length(ArrTime)-4)"))

In [76]:
df2004 = df2004.withColumn("dep_hour", df2004.dep_hour.cast('int'))
df2004 = df2004.withColumn("arr_hour", df2004.arr_hour.cast('int'))
df2004 = df2004.withColumn("DepDelay", df2004.arr_hour.cast('int'))

In [77]:
encodedData2004 = pipeline.fit(df2004).transform(df2004)
encodedData2004.show(5)

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------+--------+----------------+---------+-------+----------------+---------------+----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|dep_hour|arr_hour|UniqueCarrierIdx|OriginIdx|DestIdx|UniqueCarrierVec|      OriginVec|         DestVec|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------+--------+----------------+---------+-------+----------------+-----

In [78]:
assembler = VectorAssembler(inputCols = ['UniqueCarrierVec', 'OriginVec', 'DestVec', 'Year', 'Month', 'DayofMonth', \
                                         'DayOfWeek', 'dep_hour', 'arr_hour', 'DepDelay', 'Distance', 'Diverted'], \
                           outputCol = 'features')
encodedData2004 = assembler.transform(encodedData2004)

In [79]:
encodedData2004 = encodedData2004.drop("_c0")
train_df2004, test_df2004 = encodedData2004.randomSplit([0.8, 0.2], seed = 1)
encodedData2004.show(1)

+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------+--------+----------------+---------+-------+----------------+---------------+----------------+--------------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|dep_hour|arr_hour|UniqueCarrierIdx|OriginIdx|DestIdx|UniqueCarrierVec|      OriginVec|         DestVec|            features|
+----+-----+----------+---------+-------+-------+-------------+---------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------+--------+---------------

## Linear Regression

In [80]:
%%time
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'ArrDelay', regParam = 0.3, elasticNetParam = 0.8, maxIter = 10)
lrm = lr.fit(train_df2004)
predictions = lrm.transform(test_df2004)

Py4JJavaError: An error occurred while calling o3157.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 59 in stage 503.0 failed 4 times, most recent failure: Lost task 59.3 in stage 503.0 (TID 141758, hd02.rcc.local, executor 243): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<UniqueCarrierVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,OriginVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,DestVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,Year_double_VectorAssembler_5791872009df:double,Month_double_VectorAssembler_5791872009df:double,DayofMonth_double_VectorAssembler_5791872009df:double,DayOfWeek_double_VectorAssembler_5791872009df:double,dep_hour_double_VectorAssembler_5791872009df:double,arr_hour_double_VectorAssembler_5791872009df:double,DepDelay_double_VectorAssembler_5791872009df:double,Distance:double,Diverted:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
	at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:321)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:319)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:319)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:176)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<UniqueCarrierVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,OriginVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,DestVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,Year_double_VectorAssembler_5791872009df:double,Month_double_VectorAssembler_5791872009df:double,DayofMonth_double_VectorAssembler_5791872009df:double,DayOfWeek_double_VectorAssembler_5791872009df:double,dep_hour_double_VectorAssembler_5791872009df:double,arr_hour_double_VectorAssembler_5791872009df:double,DepDelay_double_VectorAssembler_5791872009df:double,Distance:double,Diverted:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 22 more


In [81]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "dep_hour", "arr_hour", "DepDelay", \
                   "Origin", "Dest", "Distance", "Diverted", "ArrDelay", "prediction").show(20)

AnalysisException: "cannot resolve '`dep_hour`' given input columns: [DepDelay, UniqueCarrier, Origin, DestVec, OriginIdx, features, Distance, prediction, rawPrediction, DepTime, DestIdx, UniqueCarrierIdx, Cancelled, Diverted, probability, Month, Dest, DayOfWeek, NASDelay, TaxiIn, UniqueCarrierVec, AirTime, FlightNum, ArrTime, SecurityDelay, TaxiOut, CarrierDelay, ActualElapsedTime, WeatherDelay, Year, OriginVec, ArrDelay, LateAircraftDelay, DayofMonth];;\n'Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, UniqueCarrier#18, 'dep_hour, 'arr_hour, DepDelay#25, Origin#26, Dest#27, Distance#28, Diverted#33, ArrDelay#24, prediction#7666]\n+- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 10 more fields]\n   +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 9 more fields]\n      +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 8 more fields]\n         +- Except false\n            :- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :  +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :     +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :        +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :           +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :              +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 7 more fields]\n            :                 +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 6 more fields]\n            :                    +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 3 more fields]\n            :                       +- Filter UDF(Dest#27)\n            :                          +- Filter AtLeastNNulls(n, Dest#27)\n            :                             +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, ... 2 more fields]\n            :                                +- Filter UDF(Origin#26)\n            :                                   +- Filter AtLeastNNulls(n, Origin#26)\n            :                                      +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38, UDF(cast(UniqueCarrier#18 as string)) AS UniqueCarrierIdx#3571]\n            :                                         +- Filter UDF(UniqueCarrier#18)\n            :                                            +- Filter AtLeastNNulls(n, UniqueCarrier#18)\n            :                                               +- Filter AtLeastNNulls(n, TaxiIn#29)\n            :                                                  +- Filter (cast(Year#10 as int) >= 1995)\n            :                                                     +- Union\n            :                                                        :- Filter (cast(Year#10 as int) < 1995)\n            :                                                        :  +- Filter AtLeastNNulls(n, ArrTime#16,ActualElapsedTime#21,ActualElapsedTime#21,Distance#28)\n            :                                                        :     +- Filter (Cancelled#31 = cast(0.0 as double))\n            :                                                        :        +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                        :           +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                        :              +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, TailNum#20, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, ... 2 more fields]\n            :                                                        :                 +- Filter isnull(cast(Dest#27 as int))\n            :                                                        :                    +- Filter isnull(cast(Origin#26 as int))\n            :                                                        :                       +- Relation[Year#10,Month#11,DayofMonth#12,DayOfWeek#13,DepTime#14,CRSDepTime#15,ArrTime#16,CRSArrTime#17,UniqueCarrier#18,FlightNum#19,TailNum#20,ActualElapsedTime#21,CRSElapsedTime#22,AirTime#23,ArrDelay#24,DepDelay#25,Origin#26,Dest#27,Distance#28,TaxiIn#29,TaxiOut#30,Cancelled#31,CancellationCode#32,Diverted#33,... 5 more fields] csv\n            :                                                        :- Filter AtLeastNNulls(n, AirTime#23)\n            :                                                        :  +- Filter (cast(Year#10 as int) >= 1995)\n            :                                                        :     +- Filter AtLeastNNulls(n, ArrTime#16,ActualElapsedTime#21,ActualElapsedTime#21,Distance#28)\n            :                                                        :        +- Filter (Cancelled#31 = cast(0.0 as double))\n            :                                                        :           +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                        :              +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                        :                 +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, TailNum#20, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, ... 2 more fields]\n            :                                                        :                    +- Filter isnull(cast(Dest#27 as int))\n            :                                                        :                       +- Filter isnull(cast(Origin#26 as int))\n            :                                                        :                          +- Relation[Year#10,Month#11,DayofMonth#12,DayOfWeek#13,DepTime#14,CRSDepTime#15,ArrTime#16,CRSArrTime#17,UniqueCarrier#18,FlightNum#19,TailNum#20,ActualElapsedTime#21,CRSElapsedTime#22,AirTime#23,ArrDelay#24,DepDelay#25,Origin#26,Dest#27,Distance#28,TaxiIn#29,TaxiOut#30,Cancelled#31,CancellationCode#32,Diverted#33,... 5 more fields] csv\n            :                                                        +- Filter AtLeastNNulls(n, Distance#28)\n            :                                                           +- Filter isnull(DepTime#14)\n            :                                                              +- Filter isnull(AirTime#23)\n            :                                                                 +- Filter (Cancelled#31 = cast(1.0 as double))\n            :                                                                    +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                                       +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, SecurityDelay#37, LateAircraftDelay#38]\n            :                                                                          +- Project [Year#10, Month#11, DayofMonth#12, DayOfWeek#13, DepTime#14, ArrTime#16, UniqueCarrier#18, FlightNum#19, TailNum#20, ActualElapsedTime#21, AirTime#23, ArrDelay#24, DepDelay#25, Origin#26, Dest#27, Distance#28, TaxiIn#29, TaxiOut#30, Cancelled#31, CancellationCode#32, Diverted#33, CarrierDelay#34, WeatherDelay#35, NASDelay#36, ... 2 more fields]\n            :                                                                             +- Filter isnull(cast(Dest#27 as int))\n            :                                                                                +- Filter isnull(cast(Origin#26 as int))\n            :                                                                                   +- Relation[Year#10,Month#11,DayofMonth#12,DayOfWeek#13,DepTime#14,CRSDepTime#15,ArrTime#16,CRSArrTime#17,UniqueCarrier#18,FlightNum#19,TailNum#20,ActualElapsedTime#21,CRSElapsedTime#22,AirTime#23,ArrDelay#24,DepDelay#25,Origin#26,Dest#27,Distance#28,TaxiIn#29,TaxiOut#30,Cancelled#31,CancellationCode#32,Diverted#33,... 5 more fields] csv\n            +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n               +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                  +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                     +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                        +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                           +- Union\n                              :- Sample 0.0, 0.02, false, 1\n                              :  +- Filter (Cancelled#4960 = cast(0 as double))\n                              :     +- Filter if (isnull(rand(1))) null else UDF(Cancelled#4960, rand(1))\n                              :        +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                              :           +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 6 more fields]\n                              :              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 3 more fields]\n                              :                 +- Filter UDF(Dest#4956)\n                              :                    +- Filter AtLeastNNulls(n, Dest#4956)\n                              :                       +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 2 more fields]\n                              :                          +- Filter UDF(Origin#4955)\n                              :                             +- Filter AtLeastNNulls(n, Origin#4955)\n                              :                                +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, UDF(cast(UniqueCarrier#4947 as string)) AS UniqueCarrierIdx#4763]\n                              :                                   +- Filter UDF(UniqueCarrier#4947)\n                              :                                      +- Filter AtLeastNNulls(n, UniqueCarrier#4947)\n                              :                                         +- Filter AtLeastNNulls(n, TaxiIn#4958)\n                              :                                            +- Filter (cast(Year#4939 as int) >= 1995)\n                              :                                               +- Union\n                              :                                                  :- Filter (cast(Year#4939 as int) < 1995)\n                              :                                                  :  +- Filter AtLeastNNulls(n, ArrTime#4945,ActualElapsedTime#4950,ActualElapsedTime#4950,Distance#4957)\n                              :                                                  :     +- Filter (Cancelled#4960 = cast(0.0 as double))\n                              :                                                  :        +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                  :           +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                  :              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                              :                                                  :                 +- Filter isnull(cast(Dest#4956 as int))\n                              :                                                  :                    +- Filter isnull(cast(Origin#4955 as int))\n                              :                                                  :                       +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n                              :                                                  :- Filter AtLeastNNulls(n, AirTime#4952)\n                              :                                                  :  +- Filter (cast(Year#4939 as int) >= 1995)\n                              :                                                  :     +- Filter AtLeastNNulls(n, ArrTime#4945,ActualElapsedTime#4950,ActualElapsedTime#4950,Distance#4957)\n                              :                                                  :        +- Filter (Cancelled#4960 = cast(0.0 as double))\n                              :                                                  :           +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                  :              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                  :                 +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                              :                                                  :                    +- Filter isnull(cast(Dest#4956 as int))\n                              :                                                  :                       +- Filter isnull(cast(Origin#4955 as int))\n                              :                                                  :                          +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n                              :                                                  +- Filter AtLeastNNulls(n, Distance#4957)\n                              :                                                     +- Filter isnull(DepTime#4943)\n                              :                                                        +- Filter isnull(AirTime#4952)\n                              :                                                           +- Filter (Cancelled#4960 = cast(1.0 as double))\n                              :                                                              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                                 +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                              :                                                                    +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                              :                                                                       +- Filter isnull(cast(Dest#4956 as int))\n                              :                                                                          +- Filter isnull(cast(Origin#4955 as int))\n                              :                                                                             +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n                              +- Filter (Cancelled#4960 = cast(1 as double))\n                                 +- Filter if (isnull(rand(1))) null else UDF(Cancelled#4960, rand(1))\n                                    +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 7 more fields]\n                                       +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 6 more fields]\n                                          +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 3 more fields]\n                                             +- Filter UDF(Dest#4956)\n                                                +- Filter AtLeastNNulls(n, Dest#4956)\n                                                   +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, ... 2 more fields]\n                                                      +- Filter UDF(Origin#4955)\n                                                         +- Filter AtLeastNNulls(n, Origin#4955)\n                                                            +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967, UDF(cast(UniqueCarrier#4947 as string)) AS UniqueCarrierIdx#4763]\n                                                               +- Filter UDF(UniqueCarrier#4947)\n                                                                  +- Filter AtLeastNNulls(n, UniqueCarrier#4947)\n                                                                     +- Filter AtLeastNNulls(n, TaxiIn#4958)\n                                                                        +- Filter (cast(Year#4939 as int) >= 1995)\n                                                                           +- Union\n                                                                              :- Filter (cast(Year#4939 as int) < 1995)\n                                                                              :  +- Filter AtLeastNNulls(n, ArrTime#4945,ActualElapsedTime#4950,ActualElapsedTime#4950,Distance#4957)\n                                                                              :     +- Filter (Cancelled#4960 = cast(0.0 as double))\n                                                                              :        +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                              :           +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                              :              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                                                                              :                 +- Filter isnull(cast(Dest#4956 as int))\n                                                                              :                    +- Filter isnull(cast(Origin#4955 as int))\n                                                                              :                       +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n                                                                              :- Filter AtLeastNNulls(n, AirTime#4952)\n                                                                              :  +- Filter (cast(Year#4939 as int) >= 1995)\n                                                                              :     +- Filter AtLeastNNulls(n, ArrTime#4945,ActualElapsedTime#4950,ActualElapsedTime#4950,Distance#4957)\n                                                                              :        +- Filter (Cancelled#4960 = cast(0.0 as double))\n                                                                              :           +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                              :              +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                              :                 +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                                                                              :                    +- Filter isnull(cast(Dest#4956 as int))\n                                                                              :                       +- Filter isnull(cast(Origin#4955 as int))\n                                                                              :                          +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n                                                                              +- Filter AtLeastNNulls(n, Distance#4957)\n                                                                                 +- Filter isnull(DepTime#4943)\n                                                                                    +- Filter isnull(AirTime#4952)\n                                                                                       +- Filter (Cancelled#4960 = cast(1.0 as double))\n                                                                                          +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                                             +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, SecurityDelay#4966, LateAircraftDelay#4967]\n                                                                                                +- Project [Year#4939, Month#4940, DayofMonth#4941, DayOfWeek#4942, DepTime#4943, ArrTime#4945, UniqueCarrier#4947, FlightNum#4948, TailNum#4949, ActualElapsedTime#4950, AirTime#4952, ArrDelay#4953, DepDelay#4954, Origin#4955, Dest#4956, Distance#4957, TaxiIn#4958, TaxiOut#4959, Cancelled#4960, CancellationCode#4961, Diverted#4962, CarrierDelay#4963, WeatherDelay#4964, NASDelay#4965, ... 2 more fields]\n                                                                                                   +- Filter isnull(cast(Dest#4956 as int))\n                                                                                                      +- Filter isnull(cast(Origin#4955 as int))\n                                                                                                         +- Relation[Year#4939,Month#4940,DayofMonth#4941,DayOfWeek#4942,DepTime#4943,CRSDepTime#4944,ArrTime#4945,CRSArrTime#4946,UniqueCarrier#4947,FlightNum#4948,TailNum#4949,ActualElapsedTime#4950,CRSElapsedTime#4951,AirTime#4952,ArrDelay#4953,DepDelay#4954,Origin#4955,Dest#4956,Distance#4957,TaxiIn#4958,TaxiOut#4959,Cancelled#4960,CancellationCode#4961,Diverted#4962,... 5 more fields] csv\n"

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol = "ArrDelay", predictionCol = "prediction", metricName = "rmse")
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" % r2)

## Random Forest

In [None]:
%%time
from pyspark.ml.regression import RandomForestRegressor
rfr = RandomForestRegressor(maxDepth = 5, numTrees = 15, impurity = "gini", labelCol = "ArrDelay", predictionCol = "prediction")
rfrm = rfr.fit(train_df2004)
predictions = rfrm.transform(test_df2004)

In [None]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "dep_hour", "arr_hour", "DepDelay", \
                   "Origin", "Dest", "Distance", "Diverted", "ArrDelay", "prediction").show(20)

In [None]:
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" % r2)

# Model Building - Delayed - 2004 (With Delay Reason)

In [None]:
df2004 = df2004.withColumn("isWeather", when(df2004.WeatherDelay <= 0, 0).otherwise(1))
df2004 = df2004.withColumn("isSecurity", when(df2004.SecurityDelay <= 0, 0).otherwise(1))
df2004 = df2004.withColumn("isNAS", when(df2004.NASDelay <= 0, 0).otherwise(1))
df2004 = df2004.withColumn("isLate", when(df2004.LateAircraftDelay <= 0, 0).otherwise(1))
df2004 = df2004.withColumn("isCarrier", when(df2004.CarrierDelay <= 0, 0).otherwise(1))

In [None]:
encodedData2004 = pipeline.fit(df2004).transform(df2004)
encodedData2004.show(5)

In [None]:
assembler = VectorAssembler(inputCols = ['UniqueCarrierVec', 'OriginVec', 'DestVec', 'Year', 'Month', 'DayofMonth', \
                                         'DayOfWeek', 'dep_hour', 'arr_hour', 'DepDelay', 'Distance', 'Diverted', \
                                         'isWeather', 'isSecurity', 'isNAS', 'isLate', 'isCarrier'], \
                           outputCol = 'features')
encodedData2004 = assembler.transform(encodedData2004)

In [None]:
encodedData2004 = encodedData2004.drop("_c0")
train_df2004, test_df2004 = encodedData2004.randomSplit([0.8, 0.2], seed = 1)
encodedData2004.show(1)

## Linear Regression

In [None]:
%%time
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'ArrDelay', regParam = 0.3, elasticNetParam = 0.8, maxIter = 10)
lrm = lr.fit(train_df2004)
predictions = lrm.transform(test_df2004)

In [None]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "dep_hour", "arr_hour", "DepDelay", \
                   "Origin", "Dest", "Distance", "Diverted", "isWeather", "isSecurity", "isNAS", "isLate", "isCarrier", \
                   "ArrDelay", "prediction").show(20)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol = "ArrDelay", predictionCol = "prediction", metricName = "rmse")
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" % r2)

## Random Forest

In [None]:
%%time
from pyspark.ml.regression import RandomForestRegressor
rfr = RandomForestRegressor(maxDepth = 5, numTrees = 15, impurity = "gini", labelCol = "ArrDelay", predictionCol = "prediction")
rfrm = rfr.fit(train_df2004)
predictions = rfrm.transform(test_df2004)

In [None]:
predictions.select("Year", "Month", "DayofMonth", "DayOfWeek", "UniqueCarrier", "dep_hour", "arr_hour", "DepDelay", \
                   "Origin", "Dest", "Distance", "Diverted", "isWeather", "isSecurity", "isNAS", "isLate", "isCarrier", \
                   "ArrDelay", "prediction").show(20)

In [None]:
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" % r2)