<a href="https://colab.research.google.com/github/ChenxiLiu8/ResearcheraTeam2/blob/main/airport_delay_model_regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Classification Model For Predicting Airline Delay and Cancellation**

## **Set up environment and load the data**

Set up Spark Environment


In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Load in Data

In [None]:
import pandas as pd 
from google.colab import drive 


In [None]:
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
df = spark.read.load('/content/gdrive/My Drive/2007.csv',
                     format="csv", sep=",", inferSchema="true", header="true")

Check Data's Column

In [None]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

Show sample data 

In [None]:
df.show(10)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    1|         1|        1|   1232|      1225|   1341|      1340|           WN|     2891

In [None]:
df.count()

7453215

## **Data preprocessing for modeling**

Column Choice 
* We only look at the data that people might have gather before the flight took off 

In [None]:
col_to_keep = ['Month','DayofMonth','DayOfWeek','DayOfWeek','CRSDepTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum','CRSElapsedTime','AirTime','DepDelay','Origin','Dest','Distance','Cancelled','ArrDelay']
df = df.select([c for c in df.columns if c in col_to_keep])

In [None]:
df.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)



In [None]:
df.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+
|    1|         1|        1|      1225|      1340|           WN|     2891|   N351|            75|     54|       1|       7|   SMF| ONT|     389|        0|
|    1|         1|        1|      1905|      2035|           WN|      462|   N370|            90|     74|       8|      13|   SMF| PDX|     479|        0|
|    1|         1|        1|      2130|      2300|           WN|     1229|   N685|            90|     73|      34|      36|   SMF| PDX|     479|        0|
|    1|         1|        1|      1200|      1330|           WN|     1

Cast the schemas to its corrected type 

In [None]:
from pyspark.sql.types import *
df = df.withColumn("CRSElapsedTime", df["CRSElapsedTime"].cast(IntegerType()))
df = df.withColumn("AirTime", df["AirTime"].cast(IntegerType()))
df = df.withColumn("DepDelay", df["DepDelay"].cast(IntegerType()))
df = df.withColumn("ArrDelay", df["ArrDelay"].cast(IntegerType()))

In [None]:
df.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)



Remove nulls 

In [None]:
df = df.filter(df.Month. isNotNull())
df = df.filter(df.DayofMonth. isNotNull())
df = df.filter(df.DayOfWeek. isNotNull())
df = df.filter(df.CRSDepTime. isNotNull())
df = df.filter(df.CRSArrTime. isNotNull())
df = df.filter(df.UniqueCarrier. isNotNull())
df = df.filter(df.FlightNum. isNotNull())
df = df.filter(df.TailNum. isNotNull())
df = df.filter(df.CRSElapsedTime. isNotNull())
#df = df.filter(df.AirTime. isNotNull())
#df = df.filter(df.DepDelay. isNotNull())
df = df.filter(df.Origin. isNotNull())
df = df.filter(df.Dest. isNotNull())
df = df.filter(df.Distance. isNotNull())
df = df.filter(df.Cancelled. isNotNull())
#df = df.filter(df.ArrDelay. isNotNull())

In [None]:
df.count()

7452200

Build label column 

In [None]:
def generate_label(depdelay, arrdelay, cancelled):
  if depdelay is None and arrdelay is None and cancelled is None:
    return None
  if cancelled == 1:
    return "Over 120 minuates delay or cancelled"  
  if depdelay is None: 
    delay_sum = arrdelay
  elif arrdelay is None:
    delay_sum = depdelay
  else:
    delay_sum = depdelay + arrdelay 
  if delay_sum <= 30:
    return "Less than 30 minuates delay or on Time"
  elif delay_sum <= 120:
    return "Between 30 and 120 minuates delay"
  elif delay_sum > 120:
    return "Over 120 minuates delay or cancelled"


In [None]:
import pyspark.sql.functions as F
udfgenerate = F.udf(generate_label, StringType())
df_with_label = df.withColumn("DelayLabel", udfgenerate("DepDelay", "ArrDelay", "Cancelled"))

In [None]:
df_with_label.count()

7452200

In [None]:
df_with_label.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|          DelayLabel|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------------+
|    1|         1|        1|      1225|      1340|           WN|     2891|   N351|            75|     54|       1|       7|   SMF| ONT|     389|        0|Less than 30 minu...|
|    1|         1|        1|      1905|      2035|           WN|      462|   N370|            90|     74|       8|      13|   SMF| PDX|     479|        0|Less than 30 minu...|
|    1|         1|        1|      2130|      2300|           WN|     1229|   N685|            90|     73|      34|      

filter out used columns 

In [None]:
col_to_keep = ['Month','DayofMonth','DayOfWeek','DayOfWeek','CRSDepTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum','CRSElapsedTime','Origin','Dest','Distance', 'DelayLabel']
df_with_label = df_with_label.select([c for c in df_with_label.columns if c in col_to_keep])

In [None]:
df_with_label.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+------+----+--------+--------------------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|Origin|Dest|Distance|          DelayLabel|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+------+----+--------+--------------------+
|    1|         1|        1|      1225|      1340|           WN|     2891|   N351|            75|   SMF| ONT|     389|Less than 30 minu...|
|    1|         1|        1|      1905|      2035|           WN|      462|   N370|            90|   SMF| PDX|     479|Less than 30 minu...|
|    1|         1|        1|      2130|      2300|           WN|     1229|   N685|            90|   SMF| PDX|     479|Between 30 and 12...|
|    1|         1|        1|      1200|      1330|           WN|     1355|   N364|            90|   SMF| PDX|     479|Between 30 and 12...|
|    1|         1|  

In [None]:
df_under_30 = df_with_label.filter(df_with_label.DelayLabel == 'Less than 30 minuates delay or on Time')
df_under_120 = df_with_label.filter(df_with_label.DelayLabel == 'Between 30 and 120 minuates delay')
df_120_above = df_with_label.filter(df_with_label.DelayLabel == 'Over 120 minuates delay or cancelled')

In [None]:
df_with_label.count()

7452200

In [None]:
df_under_120.count()

1091674

In [None]:
df_120_above.count()

653063

In [None]:
df_under_30.count()

5707463

From Above, we have noticed a class imbalance issue with under 30 mins, and  on time entries have a lot more records than others. This could cause issue on our future modeling. Therefore, down sampling approach is in place.

In [None]:
final_df = df_120_above
final_df = final_df.unionAll(df_under_120)
sampled_df_under_30 = df_under_30.sample(False, 1/5)
final_df = final_df.unionAll(sampled_df_under_30)

In [None]:
final_df.count()

2886707

In [None]:
final_df.filter(final_df.DelayLabel == 'Less than 30 minuates delay or on Time').count()

1141970

In [None]:
final_df.filter(final_df.DelayLabel == 'Between 30 and 120 minuates delay').count()

1091674

In [None]:
final_df.filter(final_df.DelayLabel == 'Over 120 minuates delay or cancelled').count()

653063

# **Model**

### **Random Forest**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator


In [None]:
train,test = final_df.randomSplit([0.8,0.2], seed = 0)

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
indexer_1 = StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrier_IND").setHandleInvalid("keep")
indexer_2 = StringIndexer().setInputCol("TailNum").setOutputCol("TailNum_IND").setHandleInvalid("keep")
indexer_3 = StringIndexer().setInputCol("Origin").setOutputCol("Origin_IND").setHandleInvalid("keep")
indexer_4 = StringIndexer().setInputCol("Dest").setOutputCol("Dest_IND").setHandleInvalid("keep")
indexer_5 = StringIndexer().setInputCol("DelayLabel").setOutputCol("DelayLabel_IND").setHandleInvalid("keep")

In [None]:
indexer_1 = indexer_1.fit(train)
indexer_2 = indexer_2.fit(train)
indexer_3 = indexer_3.fit(train)
indexer_4 = indexer_4.fit(train)
indexer_5 = indexer_5.fit(train)

In [None]:
final_df.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+------+----+--------+--------------------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|Origin|Dest|Distance|          DelayLabel|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+------+----+--------+--------------------+
|    1|         1|        1|      2105|      2225|           WN|     2083|   N205|            80|   SMF| SAN|     480|Over 120 minuates...|
|    1|         1|        1|      2030|      2135|           WN|     2734|      0|            65|   SNA| LAS|     226|Over 120 minuates...|
|    1|         1|        1|      1815|      1920|           WN|     2782|   N781|            65|   SNA| LAS|     226|Over 120 minuates...|
|    1|         1|        1|      2040|      2155|           WN|     1243|   N458|            75|   SNA| SJC|     342|Over 120 minuates...|
|    1|         1|  

In [None]:
va = VectorAssembler(
    inputCols=["Month", "DayofMonth", "DayOfWeek", "CRSDepTime", "CRSArrTime" , "UniqueCarrier_IND", "FlightNum", "TailNum_IND", "CRSElapsedTime", "Origin_IND", "Dest_IND", "Distance"],
    outputCol="features")

In [None]:
rf = RandomForestClassifier(labelCol="DelayLabel_IND", featuresCol="features", maxBins=6000)

In [None]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=indexer_5.labels)

In [None]:
pipeline_rf = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4,indexer_5, va, rf, labelConverter])


In [None]:
model_rf = pipeline_rf.fit(train)

In [None]:
predictions_rf = model_rf.transform(test)

In [None]:
predictions_rf.select('DelayLabel', 'PredictedLabel', 'prediction', 'rawPrediction', 'features').show(5)

+--------------------+--------------------+----------+--------------------+--------------------+
|          DelayLabel|      PredictedLabel|prediction|       rawPrediction|            features|
+--------------------+--------------------+----------+--------------------+--------------------+
|Over 120 minuates...|Less than 30 minu...|       0.0|[11.1094485935947...|[1.0,1.0,1.0,600....|
|Over 120 minuates...|Over 120 minuates...|       2.0|[1.86535433093743...|[1.0,1.0,1.0,600....|
|Over 120 minuates...|Over 120 minuates...|       2.0|[1.13553893494800...|[1.0,1.0,1.0,605....|
|Over 120 minuates...|Over 120 minuates...|       2.0|[2.33315486880992...|[1.0,1.0,1.0,605....|
|Over 120 minuates...|Over 120 minuates...|       2.0|[1.37211221560833...|[1.0,1.0,1.0,605....|
+--------------------+--------------------+----------+--------------------+--------------------+
only showing top 5 rows



In [None]:
evaluator_rf = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_IND", predictionCol="prediction", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictions_rf)
print("Accuracy = %g" % (accuracy_rf))

Accuracy = 0.503414


Model tuning 

In [None]:
#import numpy as np
#paramGrid = ParamGridBuilder() \
    #.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
    #.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
    #.build()

In [None]:
#crossval = CrossValidator(estimator=pipeline_rf,
                         # estimatorParamMaps=paramGrid,
                          #evaluator=evaluator_rf,
                         # numFolds=2)

In [None]:
#cvModel = crossval.fit(train)

In [None]:
#predictions = cvModel.transform(testData)

### **Multinomial Logistic Regression**

In [None]:
lr = LogisticRegression(labelCol="DelayLabel_IND", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [None]:
pipeline_lr = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4,indexer_5, va, lr])

In [None]:
model_lr = pipeline_lr.fit(train)

In [None]:
predictions_lr = model_lr.transform(test)

In [None]:
evaluator_lr = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_IND", predictionCol="prediction", metricName="accuracy")
accuracy_lr = evaluator_lr.evaluate(predictions_lr)
print("Accuracy = %g" % (accuracy_lr))

Accuracy = 0.396136


### **Naive Bayes**

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StandardScaler, MinMaxScaler

In [None]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [None]:
nb = NaiveBayes(labelCol="DelayLabel_IND", featuresCol="scaledFeatures",smoothing=1.0, modelType="multinomial")


In [None]:
pipeline_nb = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4,indexer_5, va, scaler, nb])

In [None]:
model_nb = pipeline_nb.fit(train)

In [None]:
predictions_nb = model_nb.transform(test)

In [None]:
evaluator_nb = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_IND", predictionCol="prediction", metricName="accuracy")
accuracy_nb = evaluator_nb.evaluate(predictions_nb)
print("Accuracy = %g" % (accuracy_nb))

Accuracy = 0.434463


### **Decision Tree**

In [None]:
dt = DecisionTreeClassifier(labelCol="DelayLabel_IND", featuresCol="features", maxBins=6000)

In [None]:
pipeline_dt = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4,indexer_5, va, dt])

In [None]:
model_dt = pipeline_dt.fit(train)

In [None]:
predictions_dt = model_dt.transform(test)

In [None]:
evaluator_dt = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_IND", predictionCol="prediction", metricName="accuracy")
accuracy_dt = evaluator_nb.evaluate(predictions_dt)
print("Accuracy = %g" % (accuracy_dt))

Accuracy = 0.500523


# **Binary Case**

In [None]:
def generate_binary_label(depdelay, arrdelay):
  if depdelay is not None and depdelay > 0:
    return 1

  elif arrdelay is not None and arrdelay > 0:
    return 0
  else:
    return 1


In [None]:

bin_udfgenerate = F.udf(generate_binary_label, IntegerType())
df_no_cancel = df.filter(df.ArrDelay. isNotNull())
df_no_cancel = df_no_cancel.filter(df_no_cancel.DepDelay. isNotNull())
df_with_label_bin = df_no_cancel.withColumn("DelayLabel_BIN", bin_udfgenerate("DepDelay", "ArrDelay"))


In [None]:
df_with_label_bin.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|DelayLabel_BIN|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+
|    1|         1|        1|      1225|      1340|           WN|     2891|   N351|            75|     54|       1|       7|   SMF| ONT|     389|        0|             1|
|    1|         1|        1|      1905|      2035|           WN|      462|   N370|            90|     74|       8|      13|   SMF| PDX|     479|        0|             1|
|    1|         1|        1|      2130|      2300|           WN|     1229|   N685|            90|     73|      34|      36|   SMF| PDX|     479|      

In [None]:
df_delayed = df_with_label_bin.filter(df_with_label_bin.DelayLabel_BIN == 1)
df_ontime = df_with_label_bin.filter(df_with_label_bin.DelayLabel_BIN == 0)

In [None]:
df_delayed.count()

6259545

In [None]:
df_ontime.count()

1015743

In [None]:
final_df_bin = df_ontime
sampled_df_delayed = df_delayed.sample(False, 1/6)
final_df_bin = final_df_bin.unionAll(sampled_df_delayed)

In [None]:
final_df_bin.count()

2059079

In [None]:
final_df_bin.show(2)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|DelayLabel_BIN|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+
|    1|         1|        1|      1245|      1450|           WN|     1684|   N493|            65|     56|       7|       0|   SNA| PHX|     338|        0|             0|
|    1|         1|        1|      1440|      1605|           WN|      728| N707SA|            85|     70|       2|      -1|   SNA| SMF|     404|        0|             0|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+------

In [None]:
train,test = final_df_bin.randomSplit([0.8,0.2], seed = 0)

In [None]:
indexer_1 = StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrier_IND").setHandleInvalid("keep")
indexer_2 = StringIndexer().setInputCol("TailNum").setOutputCol("TailNum_IND").setHandleInvalid("keep")
indexer_3 = StringIndexer().setInputCol("Origin").setOutputCol("Origin_IND").setHandleInvalid("keep")
indexer_4 = StringIndexer().setInputCol("Dest").setOutputCol("Dest_IND").setHandleInvalid("keep")

Random Forest

In [None]:
va = VectorAssembler(
    inputCols=["Month", "DayofMonth", "DayOfWeek", "CRSDepTime", "CRSArrTime" , "UniqueCarrier_IND", "FlightNum", "TailNum_IND", "CRSElapsedTime", "Origin_IND", "Dest_IND", "Distance"],
    outputCol="features")

In [None]:
rf_bin = RandomForestClassifier(labelCol="DelayLabel_BIN", featuresCol="features", maxBins=6000)

In [None]:
pipeline_rf_bin = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, rf_bin])


In [None]:
model_rf_bin = pipeline_rf_bin.fit(train)

In [None]:
predictions_rf_bin = model_rf_bin.transform(test)

In [None]:
evaluator_rf_bin = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_rf_bin = evaluator_rf_bin.evaluate(predictions_rf_bin)
print("Accuracy = %g" % (accuracy_rf_bin))

Accuracy = 0.586159


Logistic Regression

In [None]:
lr_bin = LogisticRegression(labelCol="DelayLabel_BIN", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [None]:
pipeline_lr_bin = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, lr_bin])

In [None]:
model_lr_bin = pipeline_lr_bin.fit(train)

In [None]:
predictions_lr_bin = model_lr_bin.transform(test)

In [None]:
evaluator_lr_bin = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_lr_bin = evaluator_lr_bin.evaluate(predictions_lr_bin)
print("Accuracy = %g" % (accuracy_lr_bin))

Accuracy = 0.506676


Naive Bayes

In [None]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [None]:
nb_bin= NaiveBayes(labelCol="DelayLabel_BIN", featuresCol="scaledFeatures",smoothing=1.0, modelType="multinomial")


In [None]:
pipeline_nb_bin = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, scaler, nb_bin])

In [None]:
model_nb_bin = pipeline_nb_bin.fit(train)

In [None]:
predictions_nb_bin = model_nb_bin.transform(test)

In [None]:
evaluator_nb_bin = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_nb_bin = evaluator_nb_bin.evaluate(predictions_nb_bin)
print("Accuracy = %g" % (accuracy_nb_bin))

Accuracy = 0.533203


Decision Tree

In [None]:
dt_bin = DecisionTreeClassifier(labelCol="DelayLabel_BIN", featuresCol="features", maxBins=6000)

In [None]:
pipeline_dt_bin = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, dt_bin])

In [None]:
model_dt_bin = pipeline_dt_bin.fit(train)

In [None]:
predictions_dt_bin = model_dt_bin.transform(test)

In [None]:
evaluator_dt_bin = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_dt_bin = evaluator_dt_bin.evaluate(predictions_dt_bin)
print("Accuracy = %g" % (accuracy_dt_bin))

Accuracy = 0.580117


GBT Tree

In [None]:
gbt = GBTClassifier(labelCol="DelayLabel_BIN", featuresCol="features", maxIter=30, maxBins=6000)

In [None]:
pipeline_gbt = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, gbt])


In [None]:
model_gbt = pipeline_gbt.fit(train)

In [None]:
predictions_gbt = model_gbt.transform(test)

In [None]:
evaluator_gbt = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_gbt= evaluator_rf.evaluate(predictions_gbt)
print("Accuracy = %g" % (accuracy_gbt))

Accuracy = 0.602314


with feature selection 

In [None]:
from pyspark.ml.feature import ChiSqSelector

In [None]:
selector = ChiSqSelector(numTopFeatures=8, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="DelayLabel_BIN")

In [None]:
gbt_select = GBTClassifier(labelCol="DelayLabel_BIN", featuresCol="selectedFeatures", maxIter=30, maxBins=6000)

In [None]:
pipeline_gbt_select = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, selector, gbt_select])


In [None]:
model_gbt_select = pipeline_gbt_select.fit(train)

In [None]:
predictions_gbt_select = model_gbt_select.transform(test)

In [None]:
predictions_gbt_select.show(10)

+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+-----------------+-----------+----------+--------+--------------------+--------------------+--------------------+--------------------+----------+
|Month|DayofMonth|DayOfWeek|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|DelayLabel_BIN|UniqueCarrier_IND|TailNum_IND|Origin_IND|Dest_IND|            features|    selectedFeatures|       rawPrediction|         probability|prediction|
+-----+----------+---------+----------+----------+-------------+---------+-------+--------------+-------+--------+--------+------+----+--------+---------+--------------+-----------------+-----------+----------+--------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|         1|        1|       600|       654|     

In [None]:
evaluator_rf = MulticlassClassificationEvaluator(
    labelCol="DelayLabel_BIN", predictionCol="prediction", metricName="accuracy")
accuracy_gbt_select= evaluator_rf.evaluate(predictions_gbt_select)
print("Accuracy = %g" % (accuracy_gbt_select))

Accuracy = 0.592618


MLPC (Feed forward Nerual Network)


In [None]:
##from pyspark.ml.classification import MultilayerPerceptronClassifier

In [None]:
##layers = [10, 6, 4 ,2]

In [None]:
##trainer_bin = MultilayerPerceptronClassifier(labelCol="DelayLabel_BIN", featuresCol="selectedFeatures",maxIter=100, layers=layers, blockSize=128, seed=1234)

In [None]:
##pipeline_MLPC_bin = Pipeline(stages=[indexer_1, indexer_2, indexer_3, indexer_4, va, selector, trainer_bin])


In [None]:
##model_MLPC_bin = pipeline_MLPC_bin.fit(train)

Py4JJavaError: ignored