In [1]:
from pyspark.ml import feature
from pyspark.ml import clustering
from pyspark.ml import Pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import functions as sf
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import RegexTokenizer
import requests
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import concat, col, lit, lower
from pyspark.sql.functions import isnan, when, count, col, isnull
from pyspark.sql.functions import concat_ws
from  pyspark.sql.functions import abs
# seting master("local[*]") enables multicore processing on all available logical cores on your machine
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [2]:
# Do not delete or change this cell

import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [3]:
airlines_df = spark.read.csv(get_training_filename('airlines.csv'), header=True, inferSchema=True)
airports_df = spark.read.csv(get_training_filename('airports.csv'), header=True, inferSchema=True)
flights_df = spark.read.csv(get_training_filename('flights.csv'), header=True, inferSchema=True)

In [4]:
flights_df = flights_df.select('MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_DELAY', 'DISTANCE', 'SCHEDULED_ARRIVAL', 'ARRIVAL_DELAY', 'CANCELLED')

flights_df = flights_df.filter((fn.col('CANCELLED')==0))

flights_df = flights_df.withColumn("Flight_Delayed", fn.when(fn.col("DEPARTURE_DELAY")<15, 0).otherwise(1))

from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 100, 1000, float('Inf') ],inputCol="DISTANCE", outputCol="Distance_Bucket")
flights_df = bucketizer.setHandleInvalid("keep").transform(flights_df)

from pyspark.sql.functions import udf
from pyspark.sql.types import *

t = {0.0:"Short", 1.0: "Medium", 2.0:"Long"}
udf_foo = udf(lambda x: t[x], StringType())
flights_df = flights_df.withColumn("Flight_Distance", udf_foo("Distance_Bucket"))

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="AIRLINE", outputCol="Airline_Numeric").fit(flights_df)
flights_df = indexer.transform(flights_df)

from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="Airline_Numeric", outputCol="Airline_OHE")
flights_df= encoder.transform(flights_df)

indexer = StringIndexer(inputCol="ORIGIN_AIRPORT", outputCol="OA_Numeric").fit(flights_df)
flights_df = indexer.transform(flights_df)

encoder = OneHotEncoder(inputCol="OA_Numeric", outputCol="Origin_Airport_OHE")
flights_df= encoder.transform(flights_df)

indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="DA_Numeric").fit(flights_df)
flights_df = indexer.transform(flights_df)

encoder = OneHotEncoder(inputCol="DA_Numeric", outputCol="Destination_Airport_OHE")
flights_df= encoder.transform(flights_df)

flights_df = flights_df.drop('ARRIVAL_DELAY')




In [5]:
training_df, testing_df = flights_df.randomSplit([0.9, 0.1], seed=5)
major_df = training_df.filter(col("Flight_Delayed") == 0)
minor_df = training_df.filter(col("Flight_Delayed") == 1)
ratio = major_df.count()/minor_df.count()
print(ratio)
sampled_majority_df = major_df.sample(False, 1.2/ratio, seed=5)
combined_df_2 = sampled_majority_df.unionAll(minor_df)
#combined_df_2.show()

In [6]:
sample = combined_df_2.sample(True, 0.2,seed=5)

In [7]:
flights_df = flights_df.drop('label')
training_df = training_df.drop('label')
testing_df = testing_df.drop('label')
sample = sample.drop('label')

In [8]:
features = sample.columns
sample = sample.select(col("Flight_Delayed").alias("label"), *features)

In [9]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler

features = flights_df.columns
flights_df = flights_df.select(col("Flight_Delayed").alias("label"), *features)

training_df, testing_df = flights_df.randomSplit([0.9, 0.1], seed=5)

vectorAssembler = VectorAssembler(inputCols=["MONTH", "DAY", "DAY_OF_WEEK", "Airline_OHE", "Origin_Airport_OHE", "Destination_Airport_OHE", "SCHEDULED_DEPARTURE", "Distance_Bucket", "SCHEDULED_ARRIVAL", "CANCELLED"], outputCol="unscaled_features")

standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

rf = RandomForestClassifier(featuresCol='features', labelCol='label')
rf_pipeline = Pipeline(stages=[vectorAssembler,standardScaler, rf])


paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3,4]) \
    .addGrid(rf.numTrees, [100,150]) \
    .addGrid(rf.impurity,["Gini"]) \
    .addGrid(rf.featureSubsetStrategy,["auto", "sqrt"]) \
    .addGrid(rf.maxBins,[100]) \
    .build()
crossval_rf = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

cvModel = crossval_rf.fit(sample)

In [10]:
#cvModel.fit()

In [11]:
prediction = cvModel.transform(testing_df)


In [12]:
bce = BinaryClassificationEvaluator(labelCol='label')
bce.evaluate(prediction, {bce.metricName: "areaUnderROC"} )

In [13]:
predictions_train = cvModel.transform(testing_df)
y_true = predictions_train.select(['Flight_Delayed']).collect()
y_pred = predictions_train.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
confusion_matrix(y_true, y_pred)

In [14]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="f1")
mce.evaluate(cvModel.transform(testing_df))



In [15]:
#(f1|weightedPrecision|weightedRecall|accuracy)
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="weightedPrecision")
mce.evaluate(cvModel.transform(testing_df))

In [16]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="weightedRecall")
mce.evaluate(cvModel.transform(testing_df))

In [17]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="accuracy")
mce.evaluate(cvModel.transform(testing_df))

In [18]:
bestModel = cvModel.bestModel.stages[-1]
print('Best Param (regParam): ', bestModel._java_obj.getNumTrees())
print('Best Param (regParam): ', bestModel._java_obj.getMaxDepth())
print('Best Param (regParam): ', bestModel._java_obj.getMaxBins())


In [19]:
sample = combined_df_2.sample(True, 0.5,seed=5)
features = sample.columns
sample = sample.select(col("Flight_Delayed").alias("label"), *features)

In [20]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [4]) \
    .addGrid(rf.numTrees, [175,200]) \
    .addGrid(rf.impurity,["Gini"]) \
    .addGrid(rf.featureSubsetStrategy,["auto", "sqrt"]) \
    .addGrid(rf.maxBins,[150]) \
    .build()
crossval_rf = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

cvModel = crossval_rf.fit(sample)

In [21]:
prediction = cvModel.transform(testing_df)


In [22]:
bce = BinaryClassificationEvaluator(labelCol='label')
bce.evaluate(prediction, {bce.metricName: "areaUnderROC"} )

In [23]:

mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="weightedPrecision")
mce.evaluate(cvModel.transform(testing_df))

In [24]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="weightedRecall")
mce.evaluate(cvModel.transform(testing_df))

In [25]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="Flight_Delayed", metricName="accuracy")
mce.evaluate(cvModel.transform(testing_df))

In [26]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")
mce.evaluate(cvModel.transform(testing_df))

In [27]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
mce.evaluate(cvModel.transform(testing_df))

In [28]:
mce = evaluation.MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
mce.evaluate(cvModel.transform(testing_df))