In [1]:
# All packages required for the data cleaning and model for the predicting of the flight delays

#Packages required for the cleaning data
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()
spark = SparkSession.builder.appName('missing').getOrCreate() # for cleaning data
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()
spark = SparkSession.builder.appName('linear_regression_docs').getOrCreate()
from pyspark.ml.regression import LinearRegression
#Packages import for data exploration and cleaning
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
from pyspark.sql.functions import isnan
from pyspark.sql.functions import col, sum

#Some of the packages need for model
#from pyspark.sql import *
#spark = SparkSession.builder.appName('logistics_regression_')

In [2]:
# Importing dataset of the flight delays in 2009, dataset contains its own headings
df = spark.read.csv('DelayedFlight.csv', header=True, inferSchema=True)


In [3]:
# As shown in the exploration of this dataset, it contains a lot of null values including the target variables got null values
df.show(1) #just show five rows
# Number of data points in the data set
print("Total data points:", df.count())

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
| ID|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
|  0|2009|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335| N712SW|  

In [4]:
#This code shows the summary of all the missing values in each field
from pyspark.sql.functions import lit
rows = df.count() 
summary = df.describe().filter(col("summary") == "count") 
summary.select(*((lit(rows)-col(c)).alias(c) for c in df.columns)).show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
| ID|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
|0.0| 0.0|  0.0|       0.0|      0.0|    0.0|       0.0|  612.0|       0.0|          0.0|      0.0|    0.0|  

## Data Cleaning

In [5]:
# 'na' stands for Not Available. Using na, we can then use drop.  
df = df.na.drop()

# Let's see how many rows of data we have now. 
print("Total data points:", df.count())


Total data points: 120786


In [6]:
df.filter(df.ArrDelay.isNull()).count()#count of null values in the target variable "ArrDelay"

0

In [7]:
df.filter(df.ArrDelay.isNotNull()).count()

120786

In [8]:
# To show the summary statistics of target attribute is "ArrDelay" and other variables correlated attributes.
df.describe('ArrDelay', 'DepDelay', 'CarrierDelay', 'LateAircraftDelay', 'NASDelay', 'WeatherDelay').show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|          ArrDelay|         DepDelay|      CarrierDelay| LateAircraftDelay|          NASDelay|      WeatherDelay|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|            120786|           120786|            120786|            120786|            120786|            120786|
|   mean| 63.04713294587121|59.16849634891461|19.354469888894407|25.652600466941532|14.407298859139305|3.5300531518553475|
| stddev|59.142071303111976|57.85968189457667|42.468740186721114| 41.65849285132601| 33.92247559511985| 21.53771286273278|
|    min|                15|                6|                 0|                 0|                 0|                 0|
|    max|              1525|             1355|              1158|               897|              1357|              1049|
+-------+-------

In [9]:
#show missing values
missing_field_df = df.na.drop(subset="ArrDelay")
missing_field_df.show()
print("Total data points:", missing_field_df.count())

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
| ID|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
|  4|2009|    1|         3|        4|   1829|      1755|   1959|      1925|           WN|     3920| N464WN|  

In [10]:
df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
| ID|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+------------+------------+--------+-------------+-----------------+
|  4|2009|    1|         3|        4|   1829|      1755|   1959|      1925|           WN|     3920| N464WN|  

In [11]:
df.drop('Year', 'DayOfWeek').columns

['ID',
 'Month',
 'DayofMonth',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

In [12]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','DepDelay') 

0.9505318931721426

In [13]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','CRSDepTime')

-0.002257089952414316

In [14]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','ArrTime')

-0.05534385970617289

In [15]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','CRSArrTime')

0.002199466922143343

In [16]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','Distance')

-0.007810004884905126

In [17]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','TaxiIn')

0.11141265968747364

In [18]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','TaxiOut')

0.2156991358388225

In [19]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','LateAircraftDelay')

0.4707952617620207

In [20]:
#Check the correlation of the target variable to other attribute before drop columns
df.corr('ArrDelay','CarrierDelay')

0.4800987528907703

In [21]:
df.corr('ArrDelay','SecurityDelay')
#Check the correlation of the target variable to other attribute before drop columns

-0.018459380613213956

In [22]:
df.corr('ArrDelay','NASDelay')

0.3947852811208922

In [23]:
df.corr('ArrDelay','WeatherDelay')

0.2683846406249515

In [24]:
df.corr('ArrDelay','AirTime')

0.005878478774603247

In [25]:
df.count()#count number of rows

120786

In [26]:
len(df.columns), df.columns#count number of columns

(29,
 ['ID',
  'Year',
  'Month',
  'DayofMonth',
  'DayOfWeek',
  'DepTime',
  'CRSDepTime',
  'ArrTime',
  'CRSArrTime',
  'UniqueCarrier',
  'FlightNum',
  'TailNum',
  'ActualElapsedTime',
  'CRSElapsedTime',
  'AirTime',
  'ArrDelay',
  'DepDelay',
  'Origin',
  'Dest',
  'Distance',
  'TaxiIn',
  'TaxiOut',
  'Cancelled',
  'CancellationCode',
  'CarrierDelay',
  'WeatherDelay',
  'NASDelay',
  'SecurityDelay',
  'LateAircraftDelay'])

In [27]:
df= df.drop('ID','Year', 'Month', 'DayofMonth','DayOfWeek', 'CRSArrTime',
          'UniqueCarrier','TailNum','ActualElapsedTime','CRSElapsedTime',
            'CancellationCode','Cancelled', 'Origin','Dest').columns

In [28]:
# 'na' stands for Not Available. Using na, we can then use drop.  
df.na.drop()

# Let's see how many rows of data we have now. 
print("Total data points:", df.count())

AttributeError: 'list' object has no attribute 'na'

In [None]:
df = len(df.columns), df.columns

### Data Mining Model

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
import pandas as pd
df.describe().toPandas().transpose()

In [None]:
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(inputCols= ['ArrDelay','DepDelay', 'NASDelay', 'LateAircraftDelay', 
                                               'CarrierDelay','TaxiOut','WeatherDelay', 'TaxiIn', 'Distance', 
                                               'SecurityDelay', 'ArrTime'], outputCol = 'features')
vector_output = vector_assembler.transform(df)
#vector_output.printSchema()

In [None]:
vector_output = vector_output.select('features', 'ArrDelay')
#print(vector_output.head())
vector_output.printSchema()

In [None]:
df = vector_output.show(10)

In [None]:
train_data,test_Data = vector_output.randomSplit([0.7,0.3])


In [None]:
train_data.describe().show()

In [None]:
test_Data.describe().show()

In [None]:

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='ArrDelay')
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept) + "\n")
training_summary = lr_model.summary
print("RMSE: " + str(training_summary.rootMeanSquaredError))
print("R2: " + str(training_summary.r2))

In [None]:
train_data.describe().show()

In [None]:
# Let's evaluate the model against the test data.
test_results = lr_model.evaluate(test_Data)

# And print the RMSE/R2. As expected, our RMSE and R2 are slightly worse when applying the testing set.
print("RMSE on test data: " + str(test_results.rootMeanSquaredError))
print("R2 on test data: " + str(test_results.r2))