# Download Data

In [1]:
!wget  https://s3.amazonaws.com/imcbucket/data/flights/2008.csv

--2020-04-13 06:23:26--  https://s3.amazonaws.com/imcbucket/data/flights/2008.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.136.13
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.136.13|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 689413344 (657M) [binary/octet-stream]
Saving to: ‘2008.csv.5’


2020-04-13 06:23:36 (62.5 MB/s) - ‘2008.csv.5’ saved [689413344/689413344]



# Upload to HDFS

In [2]:
!hdfs dfs -mkdir -p /user/williampoch42/input

In [3]:
!hadoop fs -put 2008.csv /user/williampoch42/input

put: `/user/williampoch42/input/2008.csv': File exists


In [4]:
!hdfs dfs -ls /user/williampoch42/input

Found 1 items
-rw-r--r--   2 root hadoop  689413344 2020-04-13 03:23 /user/williampoch42/input/2008.csv


# Load the CSV to a DataFrame

In [5]:
airline_df = spark.read.format('csv').\
option('header','true').option('mode','DROPMALFORMED')\
.load('/user/williampoch42/input/2008.csv')

In [6]:
airline_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

# Modeling (and making some data transformation)
## Convert some string fields to double

In [7]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf

airline_df = airline_df.\
withColumn('DepTime',airline_df['DepTime'].\
           cast(DoubleType())).\
withColumn('TaxiOut',airline_df['TaxiOut'].\
           cast(DoubleType())).\
withColumn('TaxiIn',airline_df['TaxiIn'].\
           cast(DoubleType())).\
withColumn('DepDelay',airline_df['DepDelay'].\
           cast(DoubleType())).\
withColumn('DayOfWeek',airline_df['DayOfWeek'].\
           cast(DoubleType())).\
withColumn('Distance',airline_df['Distance'].\
           cast(DoubleType())).\
withColumn('ArrDelay',airline_df['ArrDelay'].\
           cast(DoubleType()))

In [8]:
airline_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

In [9]:
airline_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime=2003.0, CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime=754.0, CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='20

## Discretize DepTime from scalar (double) into 1 of the size possible value.

In [10]:
def t_timeperiod(origin):
    if origin is None:
        period = None
    elif origin > 0 and origin < 600:
        period = '00.01-05.59'
    elif origin >= 600 and origin <=1200:
        period = '06.00-11.59'
    elif origin >= 1200 and origin <= 1800:
        period = '12.00-17.59'
    elif origin >= 1800 and origin <= 2400:
        period = '18.00-24.00'
    else:
        period = 'NA'
    return period

In [11]:
timeperiod = udf(lambda x: t_timeperiod(x),StringType())

In [12]:
airline_df2 = airline_df.withColumn('DepTime',timeperiod(airline_df['DepTime']))

In [13]:
airline_df2.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')

## Normalization
### Distance is a scalar value. Its range is unpredictable. We will normalize its value into 0..1.

In [14]:
from pyspark.sql.functions import *
max_distance = airline_df2.select(max('Distance')).collect()[0][0]
min_distance = airline_df2.select(min('Distance')).collect()[0][0]

In [15]:
max_distance,min_distance

(4962.0, 11.0)

In [16]:
def t_normalized_distance(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_distance)/(max_distance-min_distance))

In [17]:
normalized_distance = udf(lambda x: t_normalized_distance(x),DoubleType())

In [18]:
normalized_df = airline_df2.withColumn('Distance', normalized_distance(airline_df2['Distance']))

In [19]:
normalized_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='

### Normalize ArrDelay

In [20]:
max_ArrDelay = airline_df2.select(max('ArrDelay')).collect()[0][0]
min_ArrDelay = airline_df2.select(min('ArrDelay')).collect()[0][0]

In [21]:
max_ArrDelay,min_ArrDelay

(2461.0, -519.0)

In [22]:
def t_normalized_ArrDelay(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_ArrDelay)/(max_ArrDelay-min_ArrDelay))

In [23]:
normalized_ArrDelay = udf(lambda x: t_normalized_ArrDelay(x),DoubleType())

In [24]:
normalized_df = normalized_df.withColumn('ArrDelay', normalized_ArrDelay(normalized_df['ArrDelay']))

In [25]:
normalized_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=0.16946308724832215, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=0.17483221476510066, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', 

## Choose only intereesting fields

In [26]:
features_df = normalized_df.select(['UniqueCarrier','Origin','Dest',\
        'DepTime','TaxiOut','TaxiIn','DepDelay',\
        'DayOfWeek','Distance','ArrDelay'])

## Get rid of records with null

In [27]:
final_df = features_df.dropna()

In [28]:
final_df.count()

6855029

In [29]:
final_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|            Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|           WN|   IAD| TPA|18.00-24.00|    8.0|   4.0|     8.0|      4.0| 0.16138153908301353|0.16946308724832215|
|           WN|   IAD| TPA|06.00-11.59|   10.0|   5.0|    19.0|      4.0| 0.16138153908301353|0.17483221476510066|
|           WN|   IND| BWI|06.00-11.59|   17.0|   3.0|     8.0|      4.0|  0.1017976166431024|0.17885906040268457|
|           WN|   IND| BWI|06.00-11.59|    7.0|   3.0|    -4.0|      4.0|  0.1017976166431024|0.17214765100671142|
|           WN|   IND| BWI|18.00-24.00|   10.0|   3.0|    34.0|      4.0|  0.1017976166431024| 0.1855704697986577|
|           WN|   IND| JAX|18.00-24.00|   10.0|   4.0|    25.0|      4.0|  0.136

## Divide dataset into training and test sets

In [30]:
training_df,test_df = final_df.randomSplit([0.8,0.2],seed=12)

In [31]:
training_df.count()

5484163

## Display schema of the data frame

In [32]:
type(training_df)

pyspark.sql.dataframe.DataFrame

In [33]:
training_df.printSchema()

root
 |-- UniqueCarrier: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- ArrDelay: double (nullable = true)



## Transformation categorical to numerical 

In [34]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [35]:
DepTimeIndexer = StringIndexer\
(inputCol='DepTime', outputCol='DepTimeIndexed')

In [36]:
DepTimeOneHotEncoder = OneHotEncoder\
(dropLast=False,inputCol='DepTimeIndexed',\
outputCol='DepTimeVec')

In [37]:
UniqueCarrierIndexer = StringIndexer\
(inputCol='UniqueCarrier', outputCol='UniqueCarrierIndexed')

In [38]:
UniqueCarrierOneHotEncoder = OneHotEncoder\
(dropLast=False, inputCol='UniqueCarrierIndexed', outputCol='UniqueCarrierVec')

In [39]:
OriginIndexer = StringIndexer(inputCol='Origin', outputCol='OriginIndexed')

In [40]:
OriginOneHotEncoder = OneHotEncoder(dropLast=False, inputCol='OriginIndexed', outputCol='OriginVec')

In [41]:
DestIndexer = StringIndexer(inputCol='Dest', outputCol='DestIndexed')

In [42]:
DestOneHotEncoder = OneHotEncoder(dropLast=False, inputCol='DestIndexed', outputCol='DestVec')

## Combines selected columns into a single vector column

In [43]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [44]:
featureAssembler = VectorAssembler\
(inputCols=['UniqueCarrierIndexed',\
           'OriginVec',\
           #'DestVec',\
           'DepTimeVec',\
           'TaxiOut',\
           'TaxiIn',\
           'DepDelay',\
           'DayOfWeek',\
           'Distance'], outputCol='***features')

## Define an algorithm

In [45]:
from pyspark.ml.regression import RandomForestRegressor

In [46]:
dt = RandomForestRegressor(labelCol='ArrDelay', featuresCol='***features')

## Pipeline

In [47]:
pipeline_dt = Pipeline().\
setStages([UniqueCarrierIndexer, UniqueCarrierOneHotEncoder, DepTimeIndexer, DepTimeOneHotEncoder,\
          OriginIndexer,OriginOneHotEncoder, DestIndexer, DestOneHotEncoder, featureAssembler, dt])

## Launch pipeline and get model

In [48]:
dtModel = pipeline_dt.fit(training_df)

## Print out model structure

In [50]:
tree = dtModel.stages[9]

In [51]:
tree.toDebugString

'RandomForestRegressionModel (uid=RandomForestRegressor_4d36ab0b2755ec0956cd) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 310 <= 58.5)\n     If (feature 0 in {0.0,2.0,4.0,12.0,14.0,16.0,18.0,19.0})\n      If (feature 310 <= 14.5)\n       If (feature 41 in {0.0})\n        If (feature 309 <= 17.5)\n         Predict: 0.17243384400194783\n        Else (feature 309 > 17.5)\n         Predict: 0.17647507994776762\n       Else (feature 41 not in {0.0})\n        If (feature 310 <= 3.5)\n         Predict: 0.17296886082800608\n        Else (feature 310 > 3.5)\n         Predict: 0.17595276627477113\n      Else (feature 310 > 14.5)\n       If (feature 0 in {0.0,19.0})\n        If (feature 312 <= 0.0986669359725308)\n         Predict: 0.1832960042270056\n        Else (feature 312 > 0.0986669359725308)\n         Predict: 0.1818415007077769\n       Else (feature 0 not in {0.0,19.0})\n        If (feature 309 <= 16.5)\n         Predict: 0.18396214261185873\n        Else (feature 309 > 16.5)\n

# Evaluation

## Model prediction using test dataset without nulls.

In [52]:
result_df = dtModel.transform(test_df)

## Display comparison results between *Actual* Arival delay VS *Predicted* Arrival delay

In [53]:
result_df.select(['ArrDelay', 'Prediction']).show()b

SyntaxError: invalid syntax (<ipython-input-53-33f3cfd26e9e>, line 1)

In [54]:
from pyspark.ml.evaluation import RegressionEvaluator

### Filter some tuples (records

In [55]:
new_test_df1 = test_df.filter(test_df.Origin != 'TUP')\
.filter(test_df.Origin != 'PIR')\
.filter(test_df.Dest != 'TUP')\
.filter(test_df.Dest != 'PIR')\
.filter(test_df.Origin != 'PUB')\
.filter(test_df.DepTime != '425')\
.filter(test_df.DepTime != '328')\
.filter(test_df.DepTime != '437')\
.filter(test_df.DepTime != '400')\
.filter(test_df.DepTime != '414')\
.filter(test_df.DepTime != '424')\
.filter(test_df.DepTime != '439')\
.filter(test_df.DepTime != '338')\
.filter(test_df.DepTime != '359')\
.filter(test_df.DepTime != '412')\
.filter(test_df.DepTime != '430')

### Perform prediction using test dataset without nulls

In [58]:
everesult_df = dtModel.transform(new_test_df1)

In [59]:
lr_evaluator_r2 = RegressionEvaluator\
(predictionCol="prediction",labelCol="ArrDelay",metricName="r2")
print("R Squared (R2) on test data = %g" \
     % lr_evaluator_r2.evaluate(everesult_df))

R Squared (R2) on test data = 0.73922


In [61]:
lr_evaluator_rmse = RegressionEvaluator\
(predictionCol="prediction",labelCol="ArrDelay",metricName="rmse")
print("Root Mean Squared Error (RMSE) on test data = %g" \
     % lr_evaluator_rmse.evaluate(everesult_df))

Root Mean Squared Error (RMSE) on test data = 0.00659631
