### 1. Business Understanding

Problem Statement:  Prediction of Arrival Delay

Project Objective: (1) Regression Model (2) Model Deployment

### 2. Data Understanding

#### Data collection - ดึงไฟล์ 2008.csv จาก s3.amazonaws.com มาไว้ใน Linux File System

In [1]:
! wget https://s3.amazonaws.com/imcbucket/data/flights/2008.csv

--2021-10-10 14:53:27--  https://s3.amazonaws.com/imcbucket/data/flights/2008.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.173.192
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.173.192|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 689413344 (657M) [binary/octet-stream]
Saving to: ‘2008.csv.1’


2021-10-10 14:53:40 (52.8 MB/s) - ‘2008.csv.1’ saved [689413344/689413344]



#### นับจำนวนบรรทัดในไฟล์ 2008.csv

In [2]:
! wc -l ./2008.csv

7009729 ./2008.csv


#### ดู Content ในไฟล์ 2008.csv

In [3]:
! head -3 2008.csv

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA


#### ลบ Directory และไฟล์ 2008.csv ที่มีอยู่เดิมใน HDFS

In [4]:
! hdfs dfs -rm -f /user/cloudera/input/2008.csv

Deleted /user/cloudera/input/2008.csv


#### สร้าง Directory "input" ใน HDFS

In [5]:
! hdfs dfs -mkdir -p /user/cloudera/input

#### Copy ไฟล์ 2008.csv จาก Linux File System ไปไว้ใน Directory "input" ของ HDFS

In [6]:
! hadoop fs -put 2008.csv /user/cloudera/input

#### แสดงและตรวจดูให้แน่ใจว่ามีไฟล์ 2008.csv อยู่ใน "input"

In [7]:
! hdfs dfs -ls /user/cloudera/input

Found 1 items
-rw-r--r--   2 root hadoop  689413344 2021-10-10 14:53 /user/cloudera/input/2008.csv


#### Spark อ่านไฟล์ 2008.csv จาก HDFS มาเป็น DataFrame

In [8]:
airline_df = spark.read.format('csv').\
option('header','true').option('mode','DROPMALFORMED')\
.load('/user/cloudera/input/2008.csv')

                                                                                

#### Spark นับจำนวน tuple ใน DataFrame

In [9]:
airline_df.count()

                                                                                

7009728

#### Spark: Assign ค่าของตัวแปรเก่า ให้กับตัวแปรใหม่

In [10]:
airline_row_df = airline_df

#### Spark แสดง Schema ของข้อมูลใน DataFrame

In [11]:
airline_row_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

### 3. Data Preparation

#### Spark เรียกใช้ Data Types และ Functions ต่างๆ สำหรับจัดการข้อมูลใน DataFrame

In [12]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf

In [13]:
crunched_df = airline_row_df.\
withColumn('DepTime',airline_row_df['DepTime'].\
           cast(DoubleType())).\
withColumn('TaxiOut',airline_row_df['TaxiOut'].\
           cast(DoubleType())).\
withColumn('TaxiIn',airline_row_df['TaxiIn'].\
           cast(DoubleType())).\
withColumn('DepDelay',airline_row_df['DepDelay'].\
           cast(DoubleType())).\
withColumn('DayOfWeek',airline_row_df['DayOfWeek'].\
           cast(DoubleType())).\
withColumn('Distance',airline_row_df['Distance'].\
           cast(DoubleType())).\
withColumn('ArrDelay',airline_row_df['ArrDelay'].\
           cast(DoubleType()))

#### Spark แสดง Schema ของข้อมูลใน DataFrame หลังจาก cast type แล้ว

In [14]:
crunched_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

#### Python ติดตั้ง Module "pandas"

In [15]:
! pip install pandas



#### Spark ทำ Data Exploratory โดยใช้สถิติเบื้องต้นกับข้อมูลใน DataFrame

In [16]:
import pandas as pd
pd.DataFrame(crunched_df.\
             select(['DepTime','TaxiOut','TaxiIn',\
                     'DayOfWeek','Distance','ArrDelay']).\
             describe().take(6),\
             columns=crunched_df.\
             select(['DepTime','TaxiOut','TaxiIn',\
                     'DayOfWeek','Distance','ArrDelay']).\
             describe().columns).transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
DepTime,6873482,1333.8300461105448,478.0688948662984,1.0,2400.0
TaxiOut,6872670,16.453045177492882,11.332798654232155,0.0,429.0
TaxiIn,6858079,6.860851704974527,4.933649371300466,0.0,308.0
DayOfWeek,7009728,3.9241815088973495,1.9882589459851208,1.0,7.0
Distance,7009728,726.3870294253928,562.10180348404,11.0,4962.0
ArrDelay,6855029,8.16845238729114,38.50193694882867,-519.0,2461.0


#### Spark ทำ Data Transformation โดยใช้ Data Discretization กับ "DepTime" ใน DataFrame

In [17]:
def t_timeperiod(origin):
    if origin is None:
        period = None
    elif origin > 0 and origin < 600:
        period = '00.01-05.59'
    elif origin >= 600 and origin <=1200:
        period = '06.00-11.59'
    elif origin >= 1200 and origin <= 1800:
        period = '12.00-17.59'
    elif origin >= 1800 and origin <= 2400:
        period = '18.00-24.00'
    else:
        period = 'NA'
    return period

In [18]:
timeperiod = udf(lambda x: t_timeperiod(x),StringType())

In [19]:
discretized_df = crunched_df.\
withColumn('DepTime',timeperiod(crunched_df['DepTime']))

#### Spark ทำ Data Transformation โดยใช้ Data Normalization กับ "Distance" และ "ArrDelay" ใน DataFrame

In [20]:
from pyspark.sql.functions import *
max_distance = discretized_df.select(max('Distance')).collect()[0][0]
min_distance = discretized_df.select(min('Distance')).collect()[0][0]

                                                                                

In [21]:
max_ArrDelay = discretized_df.select(max('ArrDelay')).collect()[0][0]
min_ArrDelay = discretized_df.select(min('ArrDelay')).collect()[0][0]

                                                                                

In [22]:
def t_normalized_distance(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_distance)/(max_distance-min_distance))

In [23]:
def t_normalized_ArrDelay(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_ArrDelay)/(max_ArrDelay-min_ArrDelay))

In [24]:
normalized_distance = udf(lambda x: t_normalized_distance(x),DoubleType())

In [25]:
normalized_ArrDelay = udf(lambda x: t_normalized_ArrDelay(x),DoubleType())

In [26]:
normalized_df = discretized_df.\
withColumn('Distance', normalized_distance(discretized_df['Distance'])).\
withColumn('ArrDelay', normalized_ArrDelay(discretized_df['ArrDelay']))

#### Spark ทำ Feature Selection ด้วยการเลือกเฉพาะบาง Attributes มาเป็น Features

In [27]:
features_df = normalized_df.\
select(['UniqueCarrier','Origin','Dest',\
        'DepTime','TaxiOut','TaxiIn','DepDelay',\
        'DayOfWeek','Distance','ArrDelay'])

#### Spark กำจัดค่า Null ด้วยการลบทั้ง Tuple (Record) เมื่อพบว่ามี Attribute ใดมีค่า Null

In [28]:
final_df = features_df.dropna()

#### Spark นับจำนวน tuple ใน DataFrame

In [29]:
final_df.count()

                                                                                

6855029

In [30]:
final_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|            Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|           WN|   IAD| TPA|18.00-24.00|    8.0|   4.0|     8.0|      4.0| 0.16138153908301353|0.16946308724832215|
|           WN|   IAD| TPA|06.00-11.59|   10.0|   5.0|    19.0|      4.0| 0.16138153908301353|0.17483221476510066|
|           WN|   IND| BWI|06.00-11.59|   17.0|   3.0|     8.0|      4.0|  0.1017976166431024|0.17885906040268457|
|           WN|   IND| BWI|06.00-11.59|    7.0|   3.0|    -4.0|      4.0|  0.1017976166431024|0.17214765100671142|
|           WN|   IND| BWI|18.00-24.00|   10.0|   3.0|    34.0|      4.0|  0.1017976166431024| 0.1855704697986577|
|           WN|   IND| JAX|18.00-24.00|   10.0|   4.0|    25.0|      4.0|  0.136

In [31]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder,\
VectorIndexer, QuantileDiscretizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, DecisionTreeRegressor
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.clustering import *
from pyspark.ml.feature import Bucketizer
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.evaluation import RegressionEvaluator

In [32]:
def get_regressionModel(trainingSet,testingSet,algo,\
                   categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col


    indexers = [ StringIndexer(handleInvalid='keep',\
                               inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    
    
    
    
    
    featureCols = ['features']
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, \
                                outputCol='features')
    
    
    #var_name = algo
    ml_algorithm = algo(featuresCol='features', labelCol=labelCol)
    


    pipeline = Pipeline(stages=indexers + encoders + \
                        [assembler] + [ml_algorithm])
    
    

    model=pipeline.fit(trainingSet)

    return model

In [33]:
selected_algo = [RandomForestRegressor, DecisionTreeRegressor,LinearRegression]

In [34]:
label_Column = 'ArrDelay'

In [35]:
catcols = []
num_cols = []

In [36]:
final_df.dtypes

[('UniqueCarrier', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('DepTime', 'string'),
 ('TaxiOut', 'double'),
 ('TaxiIn', 'double'),
 ('DepDelay', 'double'),
 ('DayOfWeek', 'double'),
 ('Distance', 'double'),
 ('ArrDelay', 'double')]

In [37]:
for item in final_df.dtypes:
    if item[1] == 'string':
        catcols.append(item[0])
    else:
        num_cols.append(item[0])

In [38]:
catcols

['UniqueCarrier', 'Origin', 'Dest', 'DepTime']

In [39]:
num_cols.pop(5)

'ArrDelay'

In [40]:
num_cols

['TaxiOut', 'TaxiIn', 'DepDelay', 'DayOfWeek', 'Distance']

In [41]:
training_df, testing_df = final_df.randomSplit(weights = [0.90, 0.10], seed = 13)

In [42]:
from pyspark.sql import functions as F

In [None]:
for a in selected_algo:
    model = get_regressionModel(training_df,testing_df,a,catcols,num_cols,label_Column)
    prediction_df = model.transform(testing_df)
    
    print(model," with Features:", num_cols, catcols)
    
    
    prediction_df.select(label_Column,'prediction')\
    .withColumn('diff',F.col(label_Column)-F.col('prediction'))\
    .withColumn('diffPerc',((F.col(label_Column)-F.col('prediction'))/(F.col(label_Column)))*100).show(1000)
    
    lr_evaluator_r2 = RegressionEvaluator\
    (predictionCol="prediction",labelCol=label_Column,metricName="r2")
    print(model, ": R Squared (R2) on test data = %g" \
          % lr_evaluator_r2.evaluate(prediction_df))
    
    lr_evaluator_rmse = RegressionEvaluator\
    (predictionCol="prediction",labelCol=label_Column,metricName="rmse")
    print(model, ": Root Mean Squared Error (RMSE) on test data = %g" \
          % lr_evaluator_rmse.evaluate(prediction_df))
    
    analysis_df = prediction_df.select(label_Column,'prediction')\
    .withColumn('diff',F.col(label_Column)-F.col('prediction'))\
    .withColumn('diffPerc',((F.col(label_Column)-F.col('prediction'))/(F.col(label_Column)))*100)
    
    analysis_df.agg(F.avg(F.col('diffPerc')).alias('Average_diffPerc')\
                ,F.max(F.col('diffPerc')).alias('Max_diffPerc')\
               ,F.min(F.col('diffPerc')).alias('Min_diffPerc')).show()

                                                                                

PipelineModel_6c27f160851f  with Features: ['TaxiOut', 'TaxiIn', 'DepDelay', 'DayOfWeek', 'Distance'] ['UniqueCarrier', 'Origin', 'Dest', 'DepTime']


                                                                                

+-------------------+-------------------+--------------------+--------------------+
|           ArrDelay|         prediction|                diff|            diffPerc|
+-------------------+-------------------+--------------------+--------------------+
|0.17248322147651007|0.17273530547790386|-2.52084001393787...| -0.1461498685123515|
|0.17885906040268457|0.17393086222109078| 0.00492819818159379|  2.7553528294839578|
| 0.1761744966442953| 0.1735054808261563|0.002669015818138998|  1.5149842167722312|
|0.18053691275167785|0.17426723430910074|0.006269678442577115|   3.472795865962788|
|0.18053691275167785| 0.1785669330869111|0.001969979664766...|   1.091178327324335|
| 0.1738255033557047|0.17266511176474517|0.001160391590959542|  0.6675611855327095|
|0.17114093959731544| 0.1724723379367624|-0.00133139833944...| -0.7779543238337105|
|0.17516778523489934|0.17417713926395806| 9.90645970941284E-4|  0.5655411864760587|
| 0.1691275167785235| 0.1724723379367624|-0.00334482115823...| -1.9776918753

                                                                                

PipelineModel_6c27f160851f : R Squared (R2) on test data = 0.718647


                                                                                

PipelineModel_6c27f160851f : Root Mean Squared Error (RMSE) on test data = 0.00686019


                                                                                

+--------------------+-----------------+------------------+
|    Average_diffPerc|     Max_diffPerc|      Min_diffPerc|
+--------------------+-----------------+------------------+
|-0.13532617662323354|66.50104343105025|-30.77758543819079|
+--------------------+-----------------+------------------+



                                                                                

PipelineModel_cdd7a2984e96  with Features: ['TaxiOut', 'TaxiIn', 'DepDelay', 'DayOfWeek', 'Distance'] ['UniqueCarrier', 'Origin', 'Dest', 'DepTime']


                                                                                

+-------------------+-------------------+--------------------+--------------------+
|           ArrDelay|         prediction|                diff|            diffPerc|
+-------------------+-------------------+--------------------+--------------------+
|0.17248322147651007|0.17122296349456634|0.001260257981943...|  0.7306554058739945|
|0.17885906040268457| 0.1735428993101276|0.005316161092556976|   2.972262674637859|
| 0.1761744966442953| 0.1735428993101276|0.002631597334167707|  1.4937447725370983|
|0.18053691275167785| 0.1735428993101276|0.006994013441550262|   3.874007445319662|
|0.18053691275167785|0.17795519378676597|0.002581718964911...|   1.430022772386136|
| 0.1738255033557047|0.17449795704362756|-6.72453687922847...|-0.38685559652704327|
|0.17114093959731544|0.17122296349456634|-8.20238972508990...| -0.0479276889818979|
|0.17516778523489934|0.17122296349456634|0.003944821740333004|  2.2520246716843584|
| 0.1691275167785235|0.17122296349456634|-0.00209544671604...| -1.2389744471

                                                                                

PipelineModel_cdd7a2984e96 : R Squared (R2) on test data = 0.76765


                                                                                

PipelineModel_cdd7a2984e96 : Root Mean Squared Error (RMSE) on test data = 0.00623421


                                                                                

+--------------------+-----------------+-------------------+
|    Average_diffPerc|     Max_diffPerc|       Min_diffPerc|
+--------------------+-----------------+-------------------+
|-0.08034889109339728|65.34120252890038|-30.256877017444637|
+--------------------+-----------------+-------------------+



21/10/10 15:19:43 WARN org.apache.spark.ml.util.Instrumentation: [13bd81de] regParam is zero, which might cause numerical instability and overfitting.
21/10/10 15:20:36 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/10 15:20:36 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/10/10 15:20:36 WARN com.github.fommil.netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
21/10/10 15:20:36 WARN com.github.fommil.netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
21/10/10 15:20:36 WARN org.apache.spark.ml.util.Instrumentation: [13bd81de] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

PipelineModel_6d7b85f0056c  with Features: ['TaxiOut', 'TaxiIn', 'DepDelay', 'DayOfWeek', 'Distance'] ['UniqueCarrier', 'Origin', 'Dest', 'DepTime']


                                                                                

+-------------------+-------------------+--------------------+--------------------+
|           ArrDelay|         prediction|                diff|            diffPerc|
+-------------------+-------------------+--------------------+--------------------+
|0.17248322147651007|0.17217781626247064|3.054052140394336...|  0.1770637233146911|
|0.17885906040268457|0.17689748788283371|0.001961572519850...|  1.0967140917740221|
| 0.1761744966442953|0.17479441797986403|0.001380078664431...|  0.7833589371438429|
|0.18053691275167785|0.17806608910684515|0.002470823644832698|  1.3685974835690409|
|0.18053691275167785|  0.178972669228093|0.001564243523584...|   0.866439721242173|
| 0.1738255033557047|0.17373013331184214|9.537004386256376E-5| 0.05486539202904247|
|0.17114093959731544|0.16997733952671906|0.001163600070596...|   0.679907492230824|
|0.17516778523489934|0.17417826950348694|9.895157314123992E-4|  0.5648959539480746|
| 0.1691275167785235|0.17033401170445148|-0.00120649492592...|  -0.713364063

                                                                                

PipelineModel_6d7b85f0056c : R Squared (R2) on test data = 0.944798


                                                                                

PipelineModel_6d7b85f0056c : Root Mean Squared Error (RMSE) on test data = 0.0030387




+--------------------+-----------------+-------------------+
|    Average_diffPerc|     Max_diffPerc|       Min_diffPerc|
+--------------------+-----------------+-------------------+
|-0.02747537592686251|45.24445540946218|-30.120013943471474|
+--------------------+-----------------+-------------------+



                                                                                