In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder \
   .appName("Neural Network Model") \
   .config("spark.executor.memory", "3gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [8]:
sc

**2. Data Understanding using SparkSQL**

In [9]:
! wget https://storage.googleapis.com/class25jan2022/share/2008.csv

--2022-02-11 14:36:46--  https://storage.googleapis.com/class25jan2022/share/2008.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.142.128, 74.125.195.128, 142.250.99.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.142.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 689413344 (657M) [text/csv]
Saving to: ‘2008.csv’


2022-02-11 14:36:49 (238 MB/s) - ‘2008.csv’ saved [689413344/689413344]



In [10]:
! wc -l ./2008.csv

7009729 ./2008.csv


In [11]:
! head -3 2008.csv

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA


In [12]:
raw_df = spark.read.format('csv').\
option('header','true').option('mode','DROPMALFORMED')\
.load('2008.csv')

In [13]:
raw_df.count()

7009728

In [19]:
raw_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

**3. Data Preparation using SparkSQL**

In [20]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [21]:
crunched_df = raw_df.\
withColumn('DepTime',col('DepTime').\
           cast(DoubleType())).\
withColumn('TaxiOut',col('TaxiOut').\
           cast(DoubleType())).\
withColumn('TaxiIn',col('TaxiIn').\
           cast(DoubleType())).\
withColumn('DepDelay',col('DepDelay').\
           cast(DoubleType())).\
withColumn('DayOfWeek',col('DayOfWeek').\
           cast(DoubleType())).\
withColumn('Distance',col('Distance').\
           cast(DoubleType())).\
withColumn('ArrDelay',col('ArrDelay').\
           cast(DoubleType()))

In [22]:
crunched_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

In [23]:
crunched_df\
.select(['DepTime','TaxiOut','TaxiIn',\
                     'DayOfWeek','Distance','ArrDelay'])\
                     .describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
DepTime,6873482,1333.8300461105448,478.06889486629836,1.0,2400.0
TaxiOut,6872670,16.453045177492882,11.332798654232155,0.0,429.0
TaxiIn,6858079,6.860851704974527,4.933649371300466,0.0,308.0
DayOfWeek,7009728,3.9241815088973495,1.9882589459851212,1.0,7.0
Distance,7009728,726.3870294253928,562.1018034840403,11.0,4962.0
ArrDelay,6855029,8.16845238729114,38.50193694882867,-519.0,2461.0


In [24]:
def t_timeperiod(origin):
    if origin is None:
        period = None
    elif origin > 0 and origin < 600:
        period = '00.01-05.59'
    elif origin >= 600 and origin <=1200:
        period = '06.00-11.59'
    elif origin >= 1200 and origin <= 1800:
        period = '12.00-17.59'
    elif origin >= 1800 and origin <= 2400:
        period = '18.00-24.00'
    else:
        period = 'NA'
    return period

In [25]:
timeperiod = udf(lambda x: t_timeperiod(x),StringType())

In [26]:
type(timeperiod)

function

In [27]:
discretized_df = crunched_df.\
withColumn('DepTime',timeperiod(crunched_df['DepTime']))

In [28]:
discretized_df.show()

+----+-----+----------+---------+-----------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|    DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-----------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|      4.0|18.00-24.00|      1955|   2211|      2225|       

In [29]:
discretized_df.select(['DepTime','TaxiOut','TaxiIn',\
                     'DayOfWeek','Distance','ArrDelay']).describe().show()

+-------+-----------+------------------+-----------------+------------------+-----------------+-----------------+
|summary|    DepTime|           TaxiOut|           TaxiIn|         DayOfWeek|         Distance|         ArrDelay|
+-------+-----------+------------------+-----------------+------------------+-----------------+-----------------+
|  count|    6873482|           6872670|          6858079|           7009728|          7009728|          6855029|
|   mean|       null|16.453045177492882|6.860851704974527|3.9241815088973495|726.3870294253928| 8.16845238729114|
| stddev|       null|11.332798654232155|4.933649371300466|1.9882589459851212|562.1018034840403|38.50193694882867|
|    min|00.01-05.59|               0.0|              0.0|               1.0|             11.0|           -519.0|
|    max|18.00-24.00|             429.0|            308.0|               7.0|           4962.0|           2461.0|
+-------+-----------+------------------+-----------------+------------------+-----------

In [30]:
from pyspark.sql.functions import *
max_distance = discretized_df.select(max('Distance')).collect()[0][0]
min_distance = discretized_df.select(min('Distance')).collect()[0][0]

In [31]:
max_ArrDelay = discretized_df.select(max('ArrDelay')).collect()[0][0]
min_ArrDelay = discretized_df.select(min('ArrDelay')).collect()[0][0]

In [32]:
def t_normalized_distance(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_distance)/(max_distance-min_distance))

In [33]:
def t_normalized_ArrDelay(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_ArrDelay)/(max_ArrDelay-min_ArrDelay))

In [34]:
normalized_distance = udf(lambda x: t_normalized_distance(x),DoubleType())

In [35]:
normalized_ArrDelay = udf(lambda x: t_normalized_ArrDelay(x),DoubleType())

In [36]:
normalized_df = discretized_df.\
withColumn('Distance', normalized_distance(discretized_df['Distance'])).\
withColumn('ArrDelay', normalized_ArrDelay(discretized_df['ArrDelay']))

In [37]:
normalized_df.show()

+----+-----+----------+---------+-----------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+-------------------+--------+------+----+--------------------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|    DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|           ArrDelay|DepDelay|Origin|Dest|            Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-----------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+-------------------+--------+------+----+--------------------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1| 

In [38]:
features_df = normalized_df.\
select(['UniqueCarrier','Origin','Dest',\
        'DepTime','TaxiOut','TaxiIn','DepDelay',\
        'DayOfWeek','Distance','ArrDelay'])

In [39]:
final_df = features_df.dropna()

In [40]:
final_df.count()

6855029

In [41]:
final_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|            Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|           WN|   IAD| TPA|18.00-24.00|    8.0|   4.0|     8.0|      4.0| 0.16138153908301353|0.16946308724832215|
|           WN|   IAD| TPA|06.00-11.59|   10.0|   5.0|    19.0|      4.0| 0.16138153908301353|0.17483221476510066|
|           WN|   IND| BWI|06.00-11.59|   17.0|   3.0|     8.0|      4.0|  0.1017976166431024|0.17885906040268457|
|           WN|   IND| BWI|06.00-11.59|    7.0|   3.0|    -4.0|      4.0|  0.1017976166431024|0.17214765100671142|
|           WN|   IND| BWI|18.00-24.00|   10.0|   3.0|    34.0|      4.0|  0.1017976166431024| 0.1855704697986577|
|           WN|   IND| JAX|18.00-24.00|   10.0|   4.0|    25.0|      4.0|  0.136

4. Modeling (Spark ML and Spark ML Pipeline)

In [42]:
training_df,test_df = final_df.\
randomSplit([0.80,0.20], seed = 12)

In [43]:
training_df.count()

5484163

In [44]:
training_df.printSchema()

root
 |-- UniqueCarrier: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- ArrDelay: double (nullable = true)



In [45]:
training_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+-------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|           Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+-------------------+-------------------+
|           9E|   ABE| DTW|00.01-05.59|   13.0|  13.0|    -9.0|      4.0|0.08341749141587558|0.16744966442953022|
|           9E|   ABE| DTW|00.01-05.59|   14.0|  16.0|    -7.0|      4.0|0.08341749141587558|0.17248322147651007|
|           9E|   ABE| DTW|00.01-05.59|   15.0|   9.0|    -8.0|      4.0|0.08341749141587558|0.16845637583892617|
|           9E|   ABE| DTW|00.01-05.59|   22.0|   9.0|    -6.0|      2.0|0.08341749141587558|0.17281879194630873|
|           9E|   ABE| DTW|06.00-11.59|   12.0|  12.0|    -5.0|      5.0|0.08341749141587558|0.16711409395973154|
|           9E|   ABE| DTW|06.00-11.59|   13.0|   9.0|    -1.0|      5.0|0.0834174914158

In [46]:
from pyspark.ml.feature import StringIndexer,OneHotEncoder

In [47]:
DepTimeIndexer = StringIndexer\
(inputCol='DepTime',outputCol='DepTimeIndexed')

In [48]:
DepTimeOneHotEncoder = OneHotEncoder\
(dropLast=False,inputCol='DepTimeIndexed', \
 outputCol='DepTimeVec')

In [49]:
UniqueCarrierIndexer = StringIndexer\
(inputCol='UniqueCarrier',\
 outputCol='UniqueCarrierIndexed')

In [50]:
UniqueCarrierOneHotEncoder = OneHotEncoder\
(dropLast=False,inputCol='UniqueCarrierIndexed',\
 outputCol='UniquecarrierVec')

In [51]:
OriginIndexer = StringIndexer(inputCol='Origin',\
                              outputCol='OriginIndexed')

In [52]:
OriginOneHotEncoder = OneHotEncoder\
(dropLast=False,inputCol='OriginIndexed',\
 outputCol='OriginVec')

In [53]:
DestIndexer = StringIndexer(inputCol='Dest',\
                            outputCol='DestIndexed')

In [54]:
DestOneHotEncoder = OneHotEncoder\
(dropLast=False,inputCol='DestIndexed',\
 outputCol='DestVec')

In [55]:
from pyspark.mllib.linalg import Vectors

In [56]:
from pyspark.ml.feature import VectorAssembler

In [57]:
from pyspark.ml import Pipeline

In [58]:
featureAssembler = VectorAssembler\
(inputCols=['UniqueCarrierIndexed',\
            'OriginVec',\
            #'DestVec',\
            'DepTimeVec',\
            'TaxiOut','TaxiIn',\
            'DepDelay',\
            'DayOfWeek',\
            'Distance'
           ], outputCol='***features')

In [59]:
#from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression

In [61]:
#dt = RandomForestRegressor\
#(labelCol='ArrDelay',featuresCol='***features')
dt = LinearRegression\
(labelCol = 'ArrDelay', featuresCol='***features')

In [62]:
pipeline_dt = Pipeline().\
setStages([UniqueCarrierIndexer,\
           UniqueCarrierOneHotEncoder,\
           DepTimeIndexer,\
           DepTimeOneHotEncoder,\
           OriginIndexer ,\
           OriginOneHotEncoder,\
           DestIndexer,\
           DestOneHotEncoder,\
           featureAssembler,dt])

In [63]:
dtModel = pipeline_dt.fit(training_df)

In [64]:
test_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+-------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|           Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+-------------------+-------------------+
|           9E|   ABE| DTW|06.00-11.59|   12.0|  13.0|   -15.0|      3.0|0.08341749141587558| 0.1644295302013423|
|           9E|   ABE| DTW|06.00-11.59|   14.0|  21.0|     3.0|      1.0|0.08341749141587558|0.17718120805369128|
|           9E|   ABE| DTW|06.00-11.59|   15.0|  22.0|     8.0|      2.0|0.08341749141587558|0.18053691275167785|
|           9E|   ABE| DTW|06.00-11.59|   17.0|  17.0|    79.0|      2.0|0.08341749141587558|0.20536912751677852|
|           9E|   ABE| DTW|06.00-11.59|   27.0|   7.0|    -3.0|      4.0|0.08341749141587558| 0.1761744966442953|
|           9E|   ABE| DTW|06.00-11.59|   29.0|  11.0|    -5.0|      5.0|0.0834174914158

5. Model Evalution

In [65]:
result_df = dtModel.transform(test_df.dropna())

In [66]:
result_df.select(['ArrDelay','Prediction']).show(10)

+-------------------+-------------------+
|           ArrDelay|         Prediction|
+-------------------+-------------------+
| 0.1644295302013423|0.16955115827178044|
|0.17718120805369128|0.17815278756265734|
|0.18053691275167785|0.18036333334300111|
|0.20536912751677852|0.20346226848384552|
| 0.1761744966442953|0.17678780397988875|
|0.17885906040268457|0.17770501809286662|
|0.18053691275167785|0.18377839606695134|
|0.16778523489932887|0.17121378792098482|
|  0.174496644295302|0.17579333895192012|
|0.16946308724832215|0.17059556235456266|
+-------------------+-------------------+
only showing top 10 rows



In [67]:
from pyspark.ml.evaluation import RegressionEvaluator

In [68]:
everesult_df = dtModel.transform(test_df.dropna())

In [69]:
lr_evaluator_r2 = RegressionEvaluator\
(predictionCol="prediction",labelCol="ArrDelay",metricName="r2")
print("R Squared (R2) on test data = %g" \
      % lr_evaluator_r2.evaluate(everesult_df))

R Squared (R2) on test data = 0.939758


In [70]:
lr_evaluator_rmse = RegressionEvaluator\
(predictionCol="prediction",labelCol="ArrDelay",metricName="rmse")
print("Root Mean Squared Error (RMSE) on test data = %g" \
      % lr_evaluator_rmse.evaluate(everesult_df))

Root Mean Squared Error (RMSE) on test data = 0.00317039
