In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("theappname").getOrCreate()
df = spark.read.csv('/Code/spark/LAB4/data/20*.csv', mode="DROPMALFORMED", inferSchema=True, header = True)


from pyspark.sql.functions import  length, lpad
from pyspark.sql.types import IntegerType
selection = df\
    .where((df.Diverted == 0) & (df.CancellationCode.isNull()) & (df.TailNum.rlike("^([A-Z]|[a-z]|[0-9])"
                                                                                                                  "+$")) & ((length(df.DepTime) == 4) | (length(df.DepTime) == 3)) & (length(df.ArrTime) == 4) | (length(df.ArrTime) == 3))\
    .where(df['TailNum'].isNotNull())\
    .withColumn("DepTime", when(length(df.DepTime) == 3, lpad(df['DepTime'], 4,'0')).otherwise(df['DepTime']))\
    .withColumn("ArrTime", when(length(df.ArrTime) == 3, lpad(df['ArrTime'], 4,'0')).otherwise(df['ArrTime']))\
    .sort(asc('Year'), 'Month', 'DayofMonth', 'ArrTime')\
    .withColumn("ArrDelay", df.ArrDelay.cast(IntegerType()))\
    .withColumn("DepDelay", df["DepDelay"].cast(IntegerType()))\
    .withColumn("CarrierDelay", df["CarrierDelay"].cast(IntegerType()))\
    .withColumn("WeatherDelay", df["WeatherDelay"].cast(IntegerType()))\
    .withColumn("NASDelay", df["NASDelay"].cast(IntegerType()))\
    .withColumn("SecurityDelay", df["SecurityDelay"].cast(IntegerType()))\
    .withColumn("LateAircraftDelay", df["LateAircraftDelay"].cast(IntegerType()))\
    .fillna(0, subset=['ArrDelay', 'DepDelay', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay' ])


from pyspark.sql.functions import lag
from pyspark.sql.types import BinaryType
from pyspark.sql import Window
#w = Window.orderBy("Year", "Month", "DayofMonth", "ArrTime")
w = Window.partitionBy ("Year", "Month", "DayofMonth", "ArrTime").orderBy("Year", "Month", "DayofMonth", "ArrTime")

udf(returnType=BinaryType())
IsLate  = lambda x : x["ArrDelay"] > 15

base = selection \
    .withColumn("NbrOfPreviousLateFlights",(lag(selection['ArrDelay'], 1,0).over(w)  > 15 if 1 else 0).cast(IntegerType()) + (lag(selection['ArrDelay'], 2,0).over(w)  > 15 if 1 else 0).cast(IntegerType()) + (lag(selection['ArrDelay'], 3,0).over(w)  > 15 if 1 else 0).cast(IntegerType()))\
    .withColumn("IsLate", (IsLate(selection)))

In [21]:
superBasePanda = base.toPandas()
from pandas import notnull
superBasePanda = superBasePanda.where(superBasePanda['TailNum'].notnull())

#spark.catalog.clearCache()

In [29]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from pandas import notnull
# Split the data into training and testing sets
lb_make = LabelEncoder()
basePanda = superBasePanda[['Year', 'Month', 'DayofMonth', 'DepTime', 'Origin', 'Dest', 'TailNum','UniqueCarrier','TaxiOut', 'DepDelay','Distance', 'LateAircraftDelay', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'IsLate']]
basePanda = basePanda.dropna()
basePanda['Origin_code'] = lb_make.fit_transform (basePanda['Origin'])
basePanda['Dest_code'] = lb_make.fit_transform (basePanda['Dest'])
basePanda['UniqueCarrier_code'] = lb_make.fit_transform (basePanda['UniqueCarrier'])
basePanda['TailNum_code'] = lb_make.fit_transform (basePanda['TailNum'])
#basePanda['ArrTime_code'] = lb_make.fit_transform (basePanda['ArrTime'])
basePanda['DepTime_code'] = lb_make.fit_transform (basePanda['DepTime'])
basePanda['TaxiOut_code'] = lb_make.fit_transform (basePanda['TaxiOut'])
basePanda = basePanda.drop('Origin', axis = 1) 
basePanda = basePanda.drop('Dest', axis = 1)
basePanda = basePanda.drop('UniqueCarrier', axis = 1) 
basePanda = basePanda.drop('TailNum', axis = 1) 
#basePanda = basePanda.drop('CancellationCode', axis = 1)
#basePanda = basePanda.drop('ArrTime', axis = 1)
basePanda = basePanda.drop('DepTime', axis = 1)
basePanda = basePanda.drop('TaxiOut', axis = 1)
labels = np.array(basePanda['IsLate'])
  

features = basePanda.drop('IsLate', axis = 1) 
#features = basePanda.get_dummies(features)
features = np.array(features)


train_features, test_features, train_labels, test_labels = train_test_split(features, labels, test_size = 0.25, random_state = 42)


# Import the model we are using
from sklearn.ensemble import RandomForestClassifier
# Instantiate model with 1000 decision trees
rf = RandomForestClassifier(n_estimators = 16, random_state = 42)
# Train the model on training data
rf.fit(train_features, train_labels);



from time import gmtime, strftime
from sklearn.metrics import roc_auc_score
 
strftime("%Y-%m-%d %H:%M:%S", gmtime())
# Use the forest's predict method on the test data
predictions = rf.predict(test_features)
# Calculate the absolute errors
#errors = np.abs(predictions - test_labels)
# Print out the mean absolute error (mae)
#print('Mean Absolute Error:', np.round(np.mean(errors), 2))
strftime("%Y-%m-%d %H:%M:%S", gmtime())


rf_probs = rf.predict_proba(test_features)[:, 1]
roc_value = roc_auc_score(test_labels, rf_probs)