In [0]:
import math 
import pyspark
from pyspark import SparkContext
from pyspark.sql.functions import max, col, count, when, isnan
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.feature import (Imputer, 
                                MinMaxScaler, 
                                VectorAssembler,
                                StandardScaler,
                                StringIndexer, 
                                OneHotEncoder,
                                PCA)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression


### Setup

In [0]:
secret_scope = "w261"
secret_key   = "w261-scope-key"    
blob_container  = "w261-team-7-1"
storage_account = "w261team71"
team_blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
display(dbutils.fs.ls(f"{team_blob_url}/otpw_12m/pca_data/train/"))

path,name,size,modificationTime
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/_SUCCESS,_SUCCESS,0,1700415370000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/_committed_3477442606972160090,_committed_3477442606972160090,924,1700415369000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/_started_3477442606972160090,_started_3477442606972160090,0,1700415351000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00000-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-566-1-c000.snappy.parquet,part-00000-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-566-1-c000.snappy.parquet,185855230,1700415367000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00001-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-567-1-c000.snappy.parquet,part-00001-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-567-1-c000.snappy.parquet,195186115,1700415363000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00002-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-568-1-c000.snappy.parquet,part-00002-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-568-1-c000.snappy.parquet,193991930,1700415368000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00003-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-569-1-c000.snappy.parquet,part-00003-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-569-1-c000.snappy.parquet,193473042,1700415362000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00004-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-570-1-c000.snappy.parquet,part-00004-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-570-1-c000.snappy.parquet,198872152,1700415368000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00005-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-571-1-c000.snappy.parquet,part-00005-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-571-1-c000.snappy.parquet,199701817,1700415363000
wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/part-00006-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-572-1-c000.snappy.parquet,part-00006-tid-3477442606972160090-d676cf0b-212b-457c-ba0b-0b42add3ba75-572-1-c000.snappy.parquet,198516483,1700415368000


In [0]:
def read_data(dataset="otpw_12m", data_type="pca", split="train"):
    path = f"{team_blob_url}/{dataset}/{data_type}_data/{split}/"
    print(f"\nData path: {path}\n")
    data = spark.read.parquet(path).withColumnRenamed("DEP_DEL15", "label")
    return data

In [0]:
def crossval_baseline_model(data):

    model = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "pca_features")

    pca = PCA(k=100, inputCol="feature_vec", outputCol="pca_features")

    pipeline = Pipeline(stages=[pca, model])

    # Build the parameter grid for model tuning
    paramGrid = ParamGridBuilder() \
                .addGrid(model.regParam, [0.1, 0.01]) \
                .addGrid(pca.k, [50,100,200]) \
                .build()

    # Execute CrossValidator for model tuning
    crossval = CrossValidator(estimator=pipeline,
                                        estimatorParamMaps=paramGrid,
                                        evaluator=BinaryClassificationEvaluator(),
                                        numFolds=5) 
    

    # Train the tuned model and establish our best model
    cvModel = crossval.fit(data)
    best_model = cvModel.bestModel

    return best_model



Data path: wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/full_data/train/



In [0]:
train_data = read_data()
test_data = read_data(split='test')
# Create a Linear Regression model
lr = LinearRegression(featuresCol="pca_features", labelCol="label")

# Fit the model
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)


Data path: wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/train/


Data path: wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/pca_data/test/



In [0]:
train_data = read_data(data_type="full")
train_data


Data path: wasbs://w261-team-7-1@w261team71.blob.core.windows.net/otpw_12m/full_data/train/



DataFrame[DEP_TIME: int, MONTH: int, FL_DATE: date, DAY_OF_MONTH: int, DAY_OF_WEEK: int, OP_UNIQUE_CARRIER: string, ORIGIN: string, TAXI_OUT: int, DISTANCE: int, HourlyDewPointTemperature: int, HourlyPrecipitation: float, HourlyPressureChange: float, HourlyRelativeHumidity: int, HourlySeaLevelPressure: float, HourlyVisibility: float, HourlyWindDirection: int, HourlyWindGustSpeed: int, HourlyWindSpeed: int, label: int, vec_features: vector, scaled_features: vector, OP_UNIQUE_CARRIER_index: double, OP_UNIQUE_CARRIER_vec: vector, ORIGIN_index: double, ORIGIN_vec: vector, feature_vec: vector, pca_features: vector]

In [0]:
cv_model = crossval_model(data=train_data)

cv_model.summary

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-2798664673115701>, line 3[0m
[1;32m      1[0m cv_model [38;5;241m=[39m crossval_model(data[38;5;241m=[39mtrain_data)
[0;32m----> 3[0m cv_model[38;5;241m.[39msummary

[0;31mAttributeError[0m: 'PipelineModel' object has no attribute 'summary'

In [0]:
training_summary = cv_model.stages[1].summary
accuracy = training_summary.accuracy
precision = training_summary.precisionByLabel
recall = training_summary.recallByLabel
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)

Accuracy:  0.7991993702710468
Precision:  [0.7996412621187322, 0.6334158973204234]
Recall:  [0.9987795420358934, 0.008356243938158448]


In [0]:
test_preds = cv_model.transform(test_data)

test_preds.columns

['pca_features', 'label', 'rawPrediction', 'probability', 'prediction']