In [0]:
# import libraries
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


from pyspark.sql import functions as f
from pyspark.sql import SQLContext
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import isnan, when, count, col, isnull, percent_rank, avg, mean
from pyspark.sql.functions import min
from pyspark.sql.functions import col, max
from pyspark.sql.functions import format_string
from pyspark.sql.functions import substring
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import concat
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql.functions import expr
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import instr
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer,OneHotEncoder
from pyspark.ml.classification import MultilayerPerceptronClassifier

from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import GBTClassifier

from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [0]:
#Initializes blob storage credentials/location
blob_container = "w261-sec4-group2" # The name of your container created in https://portal.azure.com
storage_account = "kdevery" # The name of your Storage account created in https://portal.azure.com
secret_scope = "sec4-group2" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261-key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

#Points to SAS token
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
display(dbutils.fs.ls(f"{blob_url}"))

path,name,size,modificationTime
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/feature_engineered_data/,feature_engineered_data/,0,1668924639000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/feature_engineered_data_test/,feature_engineered_data_test/,0,1668924670000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/feature_engineered_train_data/,feature_engineered_train_data/,0,1668559613000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/merged_cleaned_data/,merged_cleaned_data/,0,1669494945000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/merged_cleaned_data_test/,merged_cleaned_data_test/,0,1669495012000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/merged_cleaned_data_train/,merged_cleaned_data_train/,0,1669495000000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/merged_data/,merged_data/,0,1669494746000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/number_flights_and_delay_rate/,number_flights_and_delay_rate/,0,1669961275000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/pagerank_scores/,pagerank_scores/,0,1669963319000
wasbs://w261-sec4-group2@kdevery.blob.core.windows.net/predictions_GBT/,predictions_GBT/,0,1670103533000


In [0]:
#Read processed folds from the blob

d_processed = {}

d_processed['df1'] = spark.read.parquet(f"{blob_url}/processed_fold_1")
d_processed['df2'] = spark.read.parquet(f"{blob_url}/processed_fold_2")
d_processed['df3'] = spark.read.parquet(f"{blob_url}/processed_fold_3")
d_processed['df4'] = spark.read.parquet(f"{blob_url}/processed_fold_4")
d_processed['df5'] = spark.read.parquet(f"{blob_url}/processed_fold_5")

In [0]:
def undersample(data, verbose = False):
    """
    Under samples the majority class
    """

    delay_count = data.filter(f.col('label') == 1 ).count()
    non_delay_count = data.filter(f.col('label') == 0 ).count()
    #   print(f' total count : {data.count()}')
    #   print(f' delayed count : {delay_count}')
    #   print(f' non delayed count : {non_delay_count}')

    fraction_undersample = delay_count / non_delay_count
    #   print(f' delayed / non delayed: {fraction_undersample}')

    train_delayed = data.filter(f.col('label') == 1)
    #   print(f' non delayed count df : {train_delayed.count()}')

    train_non_delay_undersample = data.filter(f.col('label') == 0).sample(withReplacement=False, fraction=fraction_undersample, seed = 261)
    #   print(f' oversampled delayed count : {train_non_delay_undersample.count()}')

    data_undersampled = train_delayed.union(train_non_delay_undersample)
    #   print(f' train_df Oversampled : {train_undersampled.count()}')

    return data_undersampled

In [0]:
# apply undersampling function

d_processed_undersampled = {}

d_processed_undersampled['df1'] = undersample(d_processed['df1'])
d_processed_undersampled['df2'] = undersample(d_processed['df2'])
d_processed_undersampled['df3'] = undersample(d_processed['df3'])
d_processed_undersampled['df4'] = undersample(d_processed['df4'])
d_processed_undersampled['df5'] = undersample(d_processed['df5'])

In [0]:
d_processed['df1'].groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  0.0|5802362|
|  1.0|1270968|
+-----+-------+



In [0]:
d_processed_undersampled['df1'].groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  1.0|1270968|
|  0.0|1270611|
+-----+-------+



#Cross Validation and GridSearch for Logistic Regression

In [0]:
%run "/Shared/w261_Section4_Group2/Phase 3/custom_cv_module"

#### Logistic Regression

In [0]:
# set up grid search: estimator, set of params, and evaluator
logistic_model = LogisticRegression(labelCol="label", featuresCol="scaled_feature_vector")
grid = ParamGridBuilder()\
            .addGrid(logistic_model.threshold, [0.3,0.5,0.8])\
            .addGrid(logistic_model.regParam, [0.01,0.1,0.5,1.0, 2.0])\
            .addGrid(logistic_model.elasticNetParam, [0.0,0.25,0.50,0.75, 1.0])\
            .addGrid(logistic_model.maxIter, [1,5,10,20, 50])\
            .build() 

# Example using F0.5 score for evaluator
evaluator = MulticlassClassificationEvaluator(metricName='fMeasureByLabel', beta=0.5, metricLabel=1)

In [0]:
# run cross validation & return the crossvalidation F0.5 score for 'test' set
cv = CustomCrossValidator(estimator=logistic_model, estimatorParamMaps=grid, evaluator=evaluator,splitWord =('train','val'), cvCol = 'cv',parallelism=10)

In [0]:
cvModel = cv.fit(d_processed_undersampled)

fold 1 start...
fold 1 end
fold 2 start...
fold 2 end
fold 3 start...
fold 3 end
fold 4 start...
fold 4 end
fold 5 start...
fold 5 end
Best Model:  {Param(parent='LogisticRegression_863676ff3461', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p].'): 0.5, Param(parent='LogisticRegression_863676ff3461', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='LogisticRegression_863676ff3461', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0, Param(parent='LogisticRegression_863676ff3461', name='maxIter', doc='max number of iterations (>= 0).'): 5} Detailed Score [0.7459687483399707, 0.7427318336245307, 0.7317935091887819, 0.7148814282746508, 0.7236674426968376] Avg Score 0.7318085924249543


In [0]:
#for individual testing

#test_train = d_processed['df1'].filter(col('cv')=='train')
#test_val = d_processed['df1'].filter(col('cv')=='val')

#test_logistic_model = LogisticRegression(labelCol="label", featuresCol="scaled_feature_vector")
#evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
#lrModel = test_logistic_model.fit(test_train)
#predictions = lrModel.transform(test_val)
#evaluator.evaluate(predictions)

In [0]:
# make predictions
predictions = cvModel.transform(d_processed_undersampled['df1'])

display(predictions.groupby('label', 'prediction').count())

label,prediction,count
0.0,1.0,403397
0.0,0.0,867214
1.0,1.0,893815
1.0,0.0,377153


In [0]:
fbeta = cvModel.avgMetrics[0]
print(f"Logistic Regression F0.5 Score: {fbeta}")

Logistic Regression F0.5 Score: 0.6276628550510237


#Full Train and Test Sets

In [0]:
# Read in processed training and test data

processed_train_df = spark.read.parquet(f"{blob_url}/processed_train")
processed_test_df = spark.read.parquet(f"{blob_url}/processed_test")

In [0]:
processed_train_undersampled_df = undersample(processed_train_df)

In [0]:
print(processed_train_df.count())
print(processed_train_undersampled_df.count())

35366654
12069645


In [0]:
processed_test_df.count()

Out[11]: 5859306

In [0]:
processed_train_df.groupBy('label').count().show()

+-----+--------+
|label|   count|
+-----+--------+
|  0.0|29332294|
|  1.0| 6034360|
+-----+--------+



In [0]:
processed_train_undersampled_df.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  1.0|6034360|
|  0.0|6035285|
+-----+-------+



In [0]:
#regParam=0.001, elasticNetParam=1,

final_logistic_model = LogisticRegression(labelCol="label", featuresCol="scaled_feature_vector", threshold=0.5, regParam=0.01, elasticNetParam=1.0, maxIter=5)
evaluator = MulticlassClassificationEvaluator(metricName='fMeasureByLabel', beta=0.5, metricLabel=1)
lrModel = final_logistic_model.fit(processed_train_undersampled_df)

In [0]:
predictions = lrModel.transform(processed_test_df)

In [0]:
display(predictions)

label,scaled_feature_vector,index,rawPrediction,probability,prediction
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 22, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 1.1771165614071244, 1.1807240188784864, 2.001448286999518, 4.804541102597839, 0.5290014255667469, 2.5358333967474525, 0.3529775526800816, 2.2518971367325515, 0.2623600525700863, 2.453487744696793, 0.4321511055947388))",0,"Map(vectorType -> dense, length -> 2, values -> List(1.3346121533350817, -1.3346121533350817))","Map(vectorType -> dense, length -> 2, values -> List(0.7916025156879597, 0.2083974843120403))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 21, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.8181980723075531, 1.518073738558054, 2.001448286999518, 5.254269120180973, 0.34977829198919363, 0.9689112552858259, 0.433041426600483, 2.3371183222456193, 0.27964920300085777, 2.453487744696793, 0.4321511055947388))",1,"Map(vectorType -> dense, length -> 2, values -> List(1.367628588583798, -1.367628588583798))","Map(vectorType -> dense, length -> 2, values -> List(0.7969967461751284, 0.2030032538248716))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 21, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.8181980723075531, 1.1807240188784864, 2.001448286999518, 5.254269120180973, 0.4050167164495406, 0.9689112552858259, 0.433041426600483, 2.3371183222456193, 0.27964920300085777, 2.453487744696793, 0.4321511055947388))",2,"Map(vectorType -> dense, length -> 2, values -> List(1.3742340200620617, -1.3742340200620617))","Map(vectorType -> dense, length -> 2, values -> List(0.7980633619500359, 0.2019366380499641))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 9, 21, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.8181980723075531, 2.001448286999518, 5.254269120180973, 0.34688490538266537, 0.9689112552858259, 0.433041426600483, 2.3371183222456193, 0.27964920300085777, 2.453487744696793, 0.4321511055947388))",3,"Map(vectorType -> dense, length -> 2, values -> List(1.4751357129364457, -1.4751357129364457))","Map(vectorType -> dense, length -> 2, values -> List(0.8138367343668125, 0.18616326563318752))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 22, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.722940084679231, 2.361448037756973, 2.001448286999518, 4.804541102597839, 0.38154845811321203, 0.7047198828462189, 0.5403663049134857, 1.8916360906620093, 0.4360534659923761, 2.453487744696793, 0.4321511055947388))",4,"Map(vectorType -> dense, length -> 2, values -> List(1.3699793380233263, -1.3699793380233263))","Map(vectorType -> dense, length -> 2, values -> List(0.797376815295182, 0.202623184704818))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 22, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.722940084679231, 0.8433742991989189, 2.001448286999518, 4.804541102597839, 0.3154302374467945, 0.7047198828462189, 0.5403663049134857, 1.8916360906620093, 0.4360534659923761, 2.453487744696793, 0.4321511055947388))",5,"Map(vectorType -> dense, length -> 2, values -> List(1.4970429487577852, -1.4970429487577852))","Map(vectorType -> dense, length -> 2, values -> List(0.8171330282572575, 0.18286697174274247))",0.0
1.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 7, 9, 22, 30, 31, 33, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 0.722940084679231, 2.1927731779171893, 10.563083864358715, 2.001448286999518, 4.804541102597839, 0.608351639401759, 0.7047198828462189, 0.5403663049134857, 1.8916360906620093, 0.4360534659923761, 2.453487744696793, 0.4321511055947388))",6,"Map(vectorType -> dense, length -> 2, values -> List(0.9548873474708486, -0.9548873474708486))","Map(vectorType -> dense, length -> 2, values -> List(0.7220970001027153, 0.27790299989728473))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 26, 30, 31, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 1.5003133051460746, 0.5060245795193513, 7.296613042424572, 1.3850953356442304, 1.5473254271491774, 2.506155999639365, 0.1854260324295884, 2.453487744696793, 0.4321511055947388))",7,"Map(vectorType -> dense, length -> 2, values -> List(1.147452183885107, -1.147452183885107))","Map(vectorType -> dense, length -> 2, values -> List(0.7590452402834549, 0.24095475971654512))",0.0
0.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 26, 30, 31, 35, 36, 37, 38), values -> List(0.6062867010278674, 0.4552178936348829, 1.9962981366261658, 1.6500044285620095, 1.0120491590387026, 7.296613042424572, 1.5574891757029543, 1.0216639197618846, 3.020166622012421, 0.35300129827896976, 2.453487744696793, 0.4321511055947388))",8,"Map(vectorType -> dense, length -> 2, values -> List(0.9737517770160948, -0.9737517770160948))","Map(vectorType -> dense, length -> 2, values -> List(0.7258666745901784, 0.2741333254098216))",0.0
1.0,"Map(vectorType -> sparse, length -> 39, indices -> List(0, 1, 2, 3, 4, 9, 21, 30, 31, 33, 35, 36, 37, 38), values -> List(1.5157167525696686, 1.251849207495928, 0.9981490683130829, 0.8181980723075531, 1.0120491590387026, 2.001448286999518, 5.254269120180973, 1.5143994442576951, 0.9689112552858259, 0.756889388763837, 2.374154468913133, 0.4757177694132144, 2.364497822755491, 0.2879023802938084))",9,"Map(vectorType -> dense, length -> 2, values -> List(1.0369960417982573, -1.0369960417982573))","Map(vectorType -> dense, length -> 2, values -> List(0.7382699745889942, 0.26173002541100576))",0.0


In [0]:
display(predictions.groupby('label', 'prediction').count())

label,prediction,count
1.0,1.0,464125
0.0,1.0,322717
1.0,0.0,554578
0.0,0.0,4517886


In [0]:
evaluator.evaluate(predictions)

Out[18]: 0.5570296329563275

In [0]:
# feature_cols = ['MONTH','DAY_OF_MONTH','DAY_OF_WEEK','DISTANCE','HourlyWindSpeed','Rain','Blowing','Snow','Thunder','CloudySkyCondition','carrier_vec',         'holiday_period','mean_carrier_delay','Pagerank_Score','PREV_FLIGHT_DELAYED','origin_percent_delayed','dest_percent_delayed','ORIGIN_Prophet_trend','ORIGIN_Prophet_pred','DEST_Prophet_trend','DEST_Prophet_pred']

lrModel.coefficients


Out[19]: SparseVector(39, {3: 0.0413, 4: 0.0702, 5: 0.0505, 7: 0.0338, 8: 0.074, 9: 0.0401, 30: 0.3093, 32: 0.755, 33: 0.0065, 34: 0.0243, 35: 0.153, 37: 0.0206})

In [0]:
# predictions.write.mode("overwrite").parquet(f"{blob_url}/predictions_LR_undersampled")