#  Set up 

In [47]:
#read in file as dataframe 
# import pyspark modules
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *       # for datatype conversion
from pyspark.sql.functions import *   # for col() function
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import pandas as pd
import os
from pyspark.ml.feature import HashingTF, Tokenizer
import pyspark.sql.types as typ
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import OneHotEncoder, StringIndexer, Bucketizer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import pyspark.mllib.regression as reg
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.linalg import SparseVector, DenseVector
import functools 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
from pyspark.sql import SparkSession 
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("app") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '2') \
    .config('spark.cores.max', '2') \
    .config("spark.driver.memory",'4g') \
    .getOrCreate()

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

In [3]:
path_to_data = os.path.join("/home/jovyan/FlightDelay/clean_data_no_hot")

In [4]:
df = spark.read.format("csv") \
    .option("header", "true").option("inferschema","true").load(path_to_data)

# One Hot Encoder

In [5]:
# for each level, count freq. val=0 for most freq, then 1, ...

stringIndexer = StringIndexer(inputCol="AIRLINE", outputCol="AIRLINE_Index")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="AIRLINE_Index", outputCol="AIRLINE_Vec")
encoded = encoder.transform(indexed)


In [6]:
# for each level, count freq. val=0 for most freq, then 1, ...

stringIndexer2 = StringIndexer(inputCol="ORIGIN_AIRPORT", outputCol="ORIGIN_AIRPORT_Index")
model2 = stringIndexer2.fit(encoded)
indexed2 = model2.transform(encoded)

encoder2 = OneHotEncoder(inputCol="ORIGIN_AIRPORT_Index", outputCol="ORIGIN_AIRPORT_Vec")
encoded2 = encoder2.transform(indexed2)



In [7]:
# for each level, count freq. val=0 for most freq, then 1, ...

stringIndexer3 = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="DESTINATION_AIRPORT_Index")
model3 = stringIndexer3.fit(encoded2)
indexed3 = model3.transform(encoded2)

encoder3 = OneHotEncoder(inputCol="DESTINATION_AIRPORT_Index", outputCol="DESTINATION_AIRPORT_Vec")
encoded3 = encoder3.transform(indexed3)
encoded3.select('DESTINATION_AIRPORT','DESTINATION_AIRPORT_Index', "DESTINATION_AIRPORT_Vec").show()
#encoded3.cache()b

+-------------------+-------------------------+-----------------------+
|DESTINATION_AIRPORT|DESTINATION_AIRPORT_Index|DESTINATION_AIRPORT_Vec|
+-------------------+-------------------------+-----------------------+
|                DFW|                      2.0|        (585,[2],[1.0])|
|                CLT|                     14.0|       (585,[14],[1.0])|
|                ATL|                      0.0|        (585,[0],[1.0])|
|                LAX|                      4.0|        (585,[4],[1.0])|
|                IAH|                      5.0|        (585,[5],[1.0])|
|                DFW|                      2.0|        (585,[2],[1.0])|
|                DFW|                      2.0|        (585,[2],[1.0])|
|                DFW|                      2.0|        (585,[2],[1.0])|
|                FLL|                     21.0|       (585,[21],[1.0])|
|                DFW|                      2.0|        (585,[2],[1.0])|
|                MCO|                     10.0|       (585,[10],

In [8]:
new_cols_to_drop = ['AIRLINE_Index', 'AIRLINE', 'ORIGIN_AIRPORT_Index', 
                    'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT_Index',
                    'DESTINATION_AIRPORT', 'FLIGHT_NUMBER']

final_encoded = encoded3.drop(*new_cols_to_drop)

In [9]:
final_encoded.columns

['_c0',
 'YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'DISTANCE',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'B_SCHEDULED_ARRIVAL',
 'B_ARRIVAL_TIME',
 'B_SCHEDULED_DEPARTURE',
 'B_DEPARTURE_TIME',
 'AIRLINE_Vec',
 'ORIGIN_AIRPORT_Vec',
 'DESTINATION_AIRPORT_Vec']

In [10]:
final_df = final_encoded.drop('_c0', 'YEAR', 'DEPARTURE_DELAY', 'ARRIVAL_DELAY', 'ARRIVAL_TIME', 
                              'B_ARRIVAL_TIME', 'ELAPSED_TIME', 'B_ARRIVAL_TIME', 
                              'B_DEPARTURE_TIME', 'DEPARTURE_TIME')

# Scale data

In [11]:
#convery final_encoded to rdd 
# we cannpot scale bucketized or vec columns, so we omit those form the scaling process
input_data = final_df.rdd.map(lambda x: (x[8], DenseVector(x[1:8])))

In [12]:
df2 = sqlCtx.createDataFrame(input_data, ["label","features2"])

In [13]:
SS = StandardScaler(inputCol = "features2", outputCol = "features_scaled")

scaler = SS.fit(df2)

In [14]:
#transform the data in df2 with our scaler 
scaled_df = scaler.transform(df2)
#join scalable feature with columns  'AIRLINE_Vec', 'ORIGIN_AIRPORT_Vec', 
#'DESTINATION_AIRPORT_Vec', 'B_SCHEDULED_ARRIVAL', 'B_ARRIVAL_TIME', 
#'B_SCHEDULED_DEPARTURE','B_DEPARTURE_TIME'


In [15]:
scaled_df.show(4)

+-----+--------------------+--------------------+
|label|           features2|     features_scaled|
+-----+--------------------+--------------------+
|    1|[1.0,4.0,200.0,12...|[0.11374658158447...|
|    0|[1.0,4.0,220.0,22...|[0.11374658158447...|
|    0|[1.0,4.0,545.0,12...|[0.11374658158447...|
|    1|[1.0,4.0,545.0,66...|[0.11374658158447...|
+-----+--------------------+--------------------+
only showing top 4 rows



In [16]:
# since there is no common column between these two dataframes add row_index so that it can be joined
scaled_df = scaled_df.withColumn('row_index', F.monotonically_increasing_id())
final_df = final_df.withColumn('row_index', F.monotonically_increasing_id())

In [17]:
# combine scaled df and final_df
total_df = scaled_df.join(final_df, scaled_df.row_index == final_df.row_index)

In [18]:
total_df.count()

111166

In [19]:
#drop columns of already scaled predictors 
total_df = total_df.drop(*['MONTH', 'DAY', 'DAY_OF_WEEK', 'SCHEDULED_DEPARTURE', 'SCHEDULED_TIME', 'DISTANCE',
                           'SCHEDULED_ARRIVAL', 'DIVERTED', 
                           'CANCELLED2', 'row_index']) 

In [20]:
#final scaled dataframe for predicting Cancelled flights 
total_df.columns

['label',
 'features2',
 'features_scaled',
 'CANCELLED',
 'B_SCHEDULED_ARRIVAL',
 'B_SCHEDULED_DEPARTURE',
 'AIRLINE_Vec',
 'ORIGIN_AIRPORT_Vec',
 'DESTINATION_AIRPORT_Vec']

# SPlit data

In [21]:
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs)

In [22]:
train_data_scaled, test_data_scaled = total_df.randomSplit([0.8, 0.2], seed=12)

In [23]:
c = train_data_scaled.where(total_df.label == 1)

In [24]:
## How to filter out large portion of non-canclelled flights in training data to make data more even 
not_c = train_data_scaled.where(total_df.label == 0).sample(False, .018, 454) 

In [25]:
not_c.count()

1618

In [26]:
c.count()

1468

In [28]:
train_data_scaled = unionAll([c, not_c])

In [29]:
train_data_scaled.columns

['label',
 'features2',
 'features_scaled',
 'CANCELLED',
 'B_SCHEDULED_ARRIVAL',
 'B_SCHEDULED_DEPARTURE',
 'AIRLINE_Vec',
 'ORIGIN_AIRPORT_Vec',
 'DESTINATION_AIRPORT_Vec']

# Baseline Regression 

only using AIRLINE_Vec to predict cancellelation 

In [63]:
train_data_base = train_data_scaled.select(["AIRLINE_Vec", "CANCELLED"])
test_data_base =  test_data_scaled.select(["AIRLINE_Vec", "CANCELLED"])
lr = LogisticRegression(featuresCol = 'AIRLINE_Vec', labelCol='CANCELLED', maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data_base)
predicted = linearModel.transform(test_data_base)

In [62]:
predicted.show(4)

+--------------+--------------------+--------------------+----------+
|   AIRLINE_Vec|       rawPrediction|         probability|prediction|
+--------------+--------------------+--------------------+----------+
|(13,[2],[1.0])|[0.09728988844397...|[0.52430330524951...|       0.0|
|(13,[4],[1.0])|[0.09728988844397...|[0.52430330524951...|       0.0|
|(13,[0],[1.0])|[0.09728988844397...|[0.52430330524951...|       0.0|
|(13,[9],[1.0])|[0.09728988844397...|[0.52430330524951...|       0.0|
+--------------+--------------------+--------------------+----------+
only showing top 4 rows



# Evaluate the baseline model

Test Error 

In [55]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="CANCELLED", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predicted)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0150497


Area under the curve:

In [56]:
print("evaluation (area under ROC): %f" % accuracy)

evaluation (area under ROC): 0.984950


Precision = TP / (TP+FP)

Recall = TP / (TP+FN)


In [57]:
# tp = df[(df.target_index == 1) & (df.prediction == 1)].count()
tp = predicted[(predicted.CANCELLED == 1) & (predicted.prediction == 1.0)].count()
tn = predicted[(predicted.CANCELLED == 0) & (predicted.prediction == 0.0)].count()
fp = predicted[(predicted.CANCELLED == 0) & (predicted.prediction == 1.0)].count()
fn = predicted[(predicted.CANCELLED == 1) & (predicted.prediction == 0.0)].count()

Confusion matrix

In [58]:
# Confusion matrix 
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("Total", df.count())

True Positives: 0
True Negatives: 21990
False Positives: 0
False Negatives: 336
Total 111166


In [None]:
Precision and Recall

In [59]:
# Calculate Precision and recall: 
precision = tp / (tp + fp)
recall = tp / (tp + fn) 

ZeroDivisionError: division by zero

In [None]:
print("Precision = %g" % (precision))

In [None]:
print("Recall = %g" % (recall))

# Logistic Regression 

In [65]:
train_data_scaled.columns

['label',
 'features2',
 'features_scaled',
 'CANCELLED',
 'B_SCHEDULED_ARRIVAL',
 'B_SCHEDULED_DEPARTURE',
 'AIRLINE_Vec',
 'ORIGIN_AIRPORT_Vec',
 'DESTINATION_AIRPORT_Vec']

In [70]:
# select variables excluding label, feature2 and CANCELLED
vars_to_keep = [
 'features_scaled',
 'B_SCHEDULED_ARRIVAL',
 'B_SCHEDULED_DEPARTURE',
 'AIRLINE_Vec',
 'ORIGIN_AIRPORT_Vec',
 'DESTINATION_AIRPORT_Vec'
]

In [81]:
assembler = VectorAssembler(
                            inputCols=[c for c in vars_to_keep],
                            outputCol='features')
label = 'CANCELLED'
positive_label = 1
negative_label = 0
train = assembler.transform(training_data_scaled).select(["label", "features"])
test = assembler.transform(test_data_scaled).select(["label", "features"])

In [82]:
train

DataFrame[label: bigint, features: vector]

In [83]:
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml import Pipeline
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="features", outputCol="features_token")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [84]:
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [85]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

In [86]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

IllegalArgumentException: 'requirement failed: Input type must be string type but got struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

In [None]:
# Make predictions on test documents. cvModel uses the best model found (lrModel).
predicted = cvModel.transform(test)

In [None]:
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

In [None]:
# combine`predictions` and `labels` 
predictionAndLabel = predictions.zip(labels)
#predictions = predictions.withColumn('row_index', F.monotonically_increasing_id())
#labels = labels.withColumn('row_index', F.monotonically_increasing_id())
#predictionAndLabel = predictions.join(labels, predictions.row_index == labels.row_index)

# Evaluate the logistic model

In [None]:
MSE = predictionAndLabel \
    .map(lambda vp: (vp[0] - vp[1])**2) \
    .reduce(lambda x, y: x + y) / predictionAndLabel.count()
MSE

In [None]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictionAndLabel)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)