# W261 Team 4_1 - Phase 3

## Team

- Maegan Kornexl: mkornexl@berkeley.edu
- I-Hsiu Kao: ihsiukao@berkeley.edu
- Nathan Arias: nathanarias@berkeley.edu
- Thomas Dolan: tdolan@berkeley.edu

## Phase Leader Plan 

| Phase   | Leader |
|--------|-------|
| Phase I - Project Plan: Describe Datasets, Tasks, and Metrics      | Thomas Dolan    |
| Phase II EDA, Baseline Pipeline On All Data    | Nathan Arias     |
| Phase III - Advanced Model Architectures and Loss Functions | I-Hsiu Kao    |
| Phase IV - Final Project | Maegan Kornexl    |


## Credit Assignment 

| Responsibility   | Team Member(s) | Time Commitment (Hrs)| 
|--------|-------|-------|
| Four Tables and OTPW EDA| I-Hsiu Kao, Nathan Arias        | 10, 5|
| Model Building                                   |  Nathan Arias, I-Hsiu Kao, Maegan Kornexl  |  4, 5, 7|
| Abstract, Project Description                                             | Maegan Kornexl| 3|
| Map out pipelines for cleaning, preprocessing, modeling                                              | Thomas Dolan| 7 |
| Presentation Preparation                    | All    |  5|
| Blob Storage and Team Permissions| Thomas Dolan | 2|

#Project Description

## Abstract



In [0]:
import numpy as np
import seaborn as sns
import pandas as pd 
import plotly.express as px 
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType, FloatType, IntegerType
import re
from pyspark.sql.functions import array_contains
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
## Place this cell in any team notebook that needs access to the team cloud storage.
# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container  = "blobstorage261"       # The name of your container created in https://portal.azure.com
storage_account = "thdolan"  # The name of your Storage account created in https://portal.azure.com
secret_scope = "group_4_1_scope"          # The name of the scope created in your local computer using the Databricks CLI
secret_key   = "group_4_1_key"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket
# the 261 course blob storage is mounted here.
mids261_mount_path = "/mnt/mids-w261"
# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
df_otpw_1yr = spark.read.parquet(f"{team_blob_url}/2015_1yr_train")

otpw_null_dataframe_1yr =  df_otpw_1yr.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df_otpw_1yr.columns])
row_count_otpw_1yr = df_otpw_1yr.count()
otpw_null_count_pandas_1yr = otpw_null_dataframe_1yr.toPandas().transpose().reset_index().rename(columns = {0:'nullCount'})
otpw_null_count_pandas_1yr['percentNull'] = np.round(otpw_null_count_pandas_1yr['nullCount'] * 100 / row_count_otpw_1yr , 2)

drop_df_1yr = otpw_null_count_pandas_1yr[otpw_null_count_pandas_1yr['percentNull'] >= 90]
df_otpw_1yr = df_otpw_1yr.drop(*drop_df_1yr['index'].tolist())

drop_list_1yr = [i for i in df_otpw_1yr.columns if 'Backup' in i]
drop_list_1yr.append('WindEquipmentChangeDate')
drop_list_1yr.append('_row_desc')
df_otpw_1yr = df_otpw_1yr.drop(*drop_list_1yr)

#Feature Selection

###PageRank (weighted graph based on traffic)

####function to calculate pagerank and output dictionary (takes a pyspark dataframe as input)

In [0]:
import networkx as nx

def getPageRankDict(df, alpha_val = 0.85):
    df_station = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/stations_with_neighbors.parquet/")
    station_list = df_station.select('neighbor_id').distinct().toPandas()['neighbor_id'].tolist()
    traffic_df = df.groupBy("origin_station_id", "dest_station_id").count().toPandas()
    traffic_dict = dict(zip(tuple(zip(traffic_df.origin_station_id, traffic_df.dest_station_id)), traffic_df['count']))
    for i in station_list:
        for j in station_list:
            if i == j:
                continue
            if traffic_dict.get((i, j), None) is None:
                traffic_dict[(i, j)] = 0

    station_graph = nx.DiGraph()

    for key, value in traffic_dict.items():
        station_graph.add_edge(key[0], key[1], weight = value)

    pagerank_dict = nx.pagerank(station_graph, alpha = alpha_val, personalization=None, weight='weight')

    return pagerank_dict

####usage example - add a new column to the dataframe called pagerank based on original station id

In [0]:
pagerank_dict = getPageRankDict(df_otpw_1yr, 0.85)

def parse_pagerank(value):
    if value is None or value == "":
        return None
    if pagerank_dict.get(value, None) is None:
        return None
    else:
        return pagerank_dict[value]
        

parse_pagerank_udf = udf(parse_pagerank, DoubleType())

df_otpw_1yr = df_otpw_1yr.withColumn("pagerank", parse_pagerank_udf(df_otpw_1yr.origin_station_id))
display(df_otpw_1yr.select('pagerank').limit(10))

pagerank
0.0076692440395774
0.0152277394425883
0.0152277394425883
0.0238784092588762
0.0238784092588762
0.0238784092588762
0.0014683392707141
0.0014683392707141
0.0017661098521892
0.0017661098521892


In [0]:
#1 year sampling at 0.01 - 1 percent
df_otpw_1yr_sample = df_otpw_1yr.sample(0.01, 0).toPandas()

zcale_cols = ["CRS_ELAPSED_TIME", "DISTANCE", "ELEVATION", "HourlyAltimeterSetting","HourlyDewPointTemperature", "HourlyDryBulbTemperature","HourlyPrecipitation", "HourlyPressureChange", "HourlyRelativeHumidity", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindSpeed", "DEP_DEL15"]

df = df_otpw_1yr_sample[df_otpw_1yr_sample.columns.intersection(zcale_cols)]

for col in df.columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = pd.to_numeric(df[col], errors='coerce')


#### Numeric Feature Exploratory Data Analysis 
**Goal**: Identify meaningful differences between delayed and not delayed flights within weather numeric feature distributions\
**Approach**: Sample 1% of data within 1 year OTPW dataset. Determine frequency of various unique numeric features, we analyze the following:

|             ||
|------------------------------| ------------------------------|
| CRS_ELAPSED_TIME             |HourlyPressureChange        |
| DISTANCE                    | HourlyRelativeHumidity      |
| ELEVATION                   | HourlyRelativeHumidity      |
| HourlyAltimeterSetting      | HourlySeaLevelPressure      |
| HourlyDewPointTemperature   | HourlyStationPressure       |
| HourlyDryBulbTemperature    | HourlyStationPressure       |
| HourlyPrecipitation         | HourlyVisibility            |
| HourlyPressureChange        | HourlyWetBulbTemperature    |
| HourlyRelativeHumidity      | HourlyWindSpeed             |

**Observations:**
Subtle difference in distributions, difficult to tell if they will contribute largely to our classification. **HourlyStationPressure** seems to have a shifted right distribution. Ultimately, we ran ridge regression then pick only top performing numeric variables related to the outcome variable of delays.

In [0]:
def create_histogram(dataframe, group_by_column):
    temp = dataframe.groupby([group_by_column, 'DEP_DEL15']).count().reset_index().iloc[:, :3]
    temp = temp.rename(columns={temp.columns[-1]: 'Count'})
    temp['DEP_DEL15'] = temp['DEP_DEL15'].astype('category')

    fig = px.histogram(temp, x=group_by_column, y='Count', color='DEP_DEL15', marginal="box", barmode = 'overlay' ,  histnorm='percent',
                        labels={'DEP_DEL15': 'Delayed'})
    fig.update_traces(marker_line_width=1, marker_line_color="white")
    fig.update_layout(height=400, title = f'<b>{group_by_column}</b> Feature: No Delay/Delay', title_x = 0.5)
    
    
    fig.show()



numeric_cols = ["CRS_ELAPSED_TIME", "DISTANCE", "ELEVATION", "HourlyAltimeterSetting","HourlyDewPointTemperature", "HourlyDryBulbTemperature","HourlyPrecipitation", "HourlyPressureChange", "HourlyRelativeHumidity", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindSpeed"]

for col in numeric_cols:
    create_histogram(df,col)


## Airports and Time Relationships

`Goal:`Analyze different categorical features and see their contribution toward delayed flights\
`Approach:` First we analyze the overall flight counts for the 5 busiest stations in 2005, which are the following: 

- Hartsfield-Jackson Atlanta International Airport (ATL)
- Chicago O'Hare International Airport (ORD)
- Dallas Fort Worth International Airport (DFW)
- Los Angeles International Airport (LAX)
- Denver International Airport (DEN)

### Day of the Week

From experience, the day of the week of a flight seems to play an important role for overall airport traffic. Our tem noticed **periodic M Shaped patterns** in 7 day intervals illustrate weekly airport traffic. Monday and Fridays contitute the weekly highs in airport traffic. **DAY_OF_WEEK** is a categorical variable we can one hot encode within our model.

Interacting with Visualization: Display different airports by toggling the legend traces. Zoom into different regions of the plot by dragging across the region of interest.

In [0]:
top5_stations = ['ATL','ORD','DFW','LAX','DEN']


colors = ['tomato','dodgerblue','darkseagreen','mediumslateblue','grey']

import plotly.graph_objects as go
def transform_df(df):
    df['counts'] = 1
    df['FL_DATE'] = pd.to_datetime(df['FL_DATE'])
    return df.set_index('FL_DATE').resample('D').agg({'counts':'sum'})

pop_stations = []  
for i, station in enumerate(top5_stations):
    temp = transform_df(df_otpw_1yr.where(df_otpw_1yr['ORIGIN'] == station).select("FL_DATE").toPandas())
    pop_stations.append(
        go.Scatter(
            x=temp.index,
            y=temp['counts'],
            name=station,
            marker=dict(color=colors[i % len(colors)]),
        visible=True if i == 0 else 'legendonly' ))
    

fig = go.Figure()  
for trace in pop_stations:
    fig.add_trace(trace)

fig.update_layout(
    title='Flight Counts by Date - OTPW Training Set',
    xaxis_title='Date',
    yaxis_title='Flight Count',
    legend_title='Airport',
    title_x = 0.5,
)
    

### Month

Any frequent flyer can attest to seasonal effects on airport traffic. To observe this phenomena we plotted the number of delays over 15 minuts over the course of a year. 

By clicking on the legend and selecting a specific airport trace, we can observe **month-wise** trends, indicating that seasonality affects the the number of flights that are entering or leaving a specific airport. We shall one hot encode the **MONTH** feature within our dataset

In [0]:
top5_stations = ['ATL','ORD','DFW','LAX','DEN']


def transform_df(df):
    df['DEP_DEL15'] = pd.to_numeric(df['DEP_DEL15'], errors='coerce')
    df['DEP_DEL15'] = df['DEP_DEL15'].fillna(0)
    df['FL_DATE'] = pd.to_datetime(df['FL_DATE'])
    df = df.set_index('FL_DATE')
    
    return df.resample('D').agg({'DEP_DEL15': 'sum'}).rename(columns={'DEP_DEL15': 'delays'})

pop_stations = []


for i, station in enumerate(top5_stations):
    temp_df = df_otpw_1yr.select("FL_DATE", "DEP_DEL15").where(df_otpw_1yr['ORIGIN'] == station).toPandas()
    temp = transform_df(temp_df)
    
    scatter = go.Scatter(
        x=temp.index,
        y=temp['delays'],
        name=station,
        marker=dict(color=colors[i % len(colors)]),
        visible=True if i == 0 else 'legendonly' )
    
    pop_stations.append(scatter)

fig = go.Figure()    
fig = go.Figure(data=pop_stations)


fig.update_layout(title='Daily Flight Delays by Station',
                  xaxis_title='Date',
                  yaxis_title='Number of Delays over 15 Minutes',
                  legend_title='Airport',
                  title_x = 0.5,
                  )



###Holiday Window

Anecdotally, airport traffic seems to increase during major Federal Holidays. Our team investigated delays occuring on those days and found that on the actual holidays, **airport traffic lowers**. However, we did notice a trend that the days surrounding a federal holday have higher than usual traffic. This lead us create a new feature based off of a 2 day window surrounding holidays. Displayed below is an example with ATL airport, as we can see many of the days with high delay count land within 2 day windows.

In [0]:
import pandas as pd
import plotly.graph_objects as go

top5_stations = ['ATL', 'ORD', 'DFW', 'LAX', 'DEN']

def transform_df(df):
    df['DEP_DEL15'] = pd.to_numeric(df['DEP_DEL15'], errors='coerce')
    df['DEP_DEL15'] = df['DEP_DEL15'].fillna(0)
    df['FL_DATE'] = pd.to_datetime(df['FL_DATE'])
    df = df.set_index('FL_DATE')
    return df.resample('D').agg({'DEP_DEL15': 'sum'}).rename(columns={'DEP_DEL15': 'delays'})

pop_stations = []


for i, station in enumerate(top5_stations):
    temp_df = df_otpw_1yr.select("FL_DATE", "DEP_DEL15").where(df_otpw_1yr['ORIGIN'] == station).toPandas()
    temp = transform_df(temp_df)
    
    scatter = go.Scatter(
        x=temp.index,
        y=temp['delays'],
        name=station,
        mode = 'markers+lines',
        marker=dict(color='black', opacity = 0.4),
        line = dict(color = colors[i % len(colors)]),
        visible=True if i == 0 else 'legendonly' )
    
    pop_stations.append(scatter)

fig = go.Figure(data=pop_stations)

# Dummy traces for legend
fig.add_trace(go.Scatter(x=[None], y=[None], mode='markers',
                         name='-2 days', marker=dict(color='blue', size=10)))
fig.add_trace(go.Scatter(x=[None], y=[None], mode='markers',
                         name='+2 days', marker=dict(color='red', size=10)))

federal_holidays = [
    '2015-07-04', '2015-04-12',
    '2015-06-19', '2015-09-06', '2015-01-19',
    '2015-05-25', '2015-01-01',
    '2015-02-14', '2015-02-16', '2015-04-05'
]

# Shade regions around each federal holiday
for date_str in federal_holidays:
    date = pd.to_datetime(date_str)
    two_days_before = date - pd.Timedelta(days=2)
    two_days_after = date + pd.Timedelta(days=2)

    # Shade the region before the holiday in blue
    fig.add_vrect(
        x0=two_days_before, x1=date,
        fillcolor="blue", opacity=0.2,
        line_width=0)

    # Shade the region after the holiday in red
    fig.add_vrect(
        x0=date, x1=two_days_after,
        fillcolor="red", opacity=0.2,
        line_width=0)

fig.update_layout(title='Daily Flight Delays by Station - Holiday 2 Day Window',
                  xaxis_title='Date',
                  yaxis_title='Number of Delays over 15 Minutes',
                  legend_title='Legend',
                  title_x=0.5,
                )
                  


### Time of Day Analysis 
Our team wanted to inspect variable related to the time a **flight is scheduled to depart** and the occurances of delays. Below are two different analysis with varying levels of granularity.

####By Time block (Hourly)
Our team

In [0]:
import pandas as pd
import plotly.express as px

# Group by the hour and delay status
arr_time_blk_delays = df_otpw_1yr_sample.groupby(['DEP_TIME_BLK', 'DEP_DEL15']).size().reset_index(name='Count')

# Normalize the counts by the total flights in each hour block
total_flights_per_block = df_otpw_1yr_sample.groupby('DEP_TIME_BLK').size().reset_index(name='Total')
arr_time_blk_delays = pd.merge(arr_time_blk_delays, total_flights_per_block, on='DEP_TIME_BLK')
arr_time_blk_delays['Normalized Count'] = arr_time_blk_delays['Count'] / arr_time_blk_delays['Total']

# Plotting the data
fig = px.bar(arr_time_blk_delays, x='DEP_TIME_BLK', y='Normalized Count', color='DEP_DEL15',
             labels={'Normalized Count': 'Proportion of Flights', 'DEP_DEL15': 'Delayed 15 Min'},
             title='Proportion of Flights Delayed by Departure Time Block')
fig.update_layout(title_x=0.5)
fig.update_xaxes(title='Departure Time Block', tickformat='%H:%M')
fig.show()

####Morning, Afternoon, Evening

In [0]:
time_bins = { '0001-0559':'Morning',
             '0600-0659':'Morning',
             '0700-0759':'Morning',
             '0800-0859':'Morning',
             '0900-0959':'Morning',
             '1000-1059':'Morning',
             '1100-1159':'Morning',
             '1200-1259':'Afternoon',
             '1300-1359':'Afternoon',
             '1400-1459':'Afternoon',
             '1500-1559':'Afternoon',
             '1700-1759':'Evening',
             '1800-1859':'Evening',
             '1900-1959':'Evening',
             '2000-2059':'Evening',
             '2100-2159':'Evening',
             '2200-2259':'Evening',
             '2300-2359':'Evening',
             }
             
df_otpw_1yr_sample['Time_of_Day'] = df_otpw_1yr_sample['DEP_TIME_BLK'].map(time_bins)


# Group by the hour and delay status
arr_time_blk_delays = df_otpw_1yr_sample.groupby(['Time_of_Day', 'DEP_DEL15']).size().reset_index(name='Count')

# Normalize the counts by the total flights in each hour block
total_flights_per_block = df_otpw_1yr_sample.groupby('Time_of_Day').size().reset_index(name='Total')
arr_time_blk_delays = pd.merge(arr_time_blk_delays, total_flights_per_block, on='Time_of_Day')
arr_time_blk_delays['Normalized Count'] = arr_time_blk_delays['Count'] / arr_time_blk_delays['Total']

# Plotting the data
fig = px.bar(arr_time_blk_delays, x='Time_of_Day', y='Normalized Count', color='DEP_DEL15',
             labels={'Normalized Count': 'Proportion of Flights', 'DEP_DEL15': 'Delayed 15 Min'},
             title='Proportion of Flights Delayed by Departure Day Time Period', text_auto=True)
fig.update_layout(title_x=0.5)
fig.update_xaxes(title='Departure Time Block', tickformat='%H:%M')
#fig.update_traces(hoverinfo='label+percent+name', selector=dict(type='bar'))
fig.show()



#Pipeline

To ensure the best possible iteration during model creation, the ATP and GSOD datasets were joined into one (the OTPW set) and then split into subsets spanning three months, one year, three years, and five years. The training and testing datasets were split at the beginning of the pipeline to prevent data leakage, and training sets always occurred before test sets in the split without time overlap. For example, the 1-year dataset contains the first three quarters of 2015 as training data and the last quarter of the year serves as test data. The data split was followed by an exploratory data analysis (EDA) and cleaning stage, which ensured any missing or duplicate values were dropped and that text columns were parsed into categorical features. Any new features were also created during this phase.

The next step, feature selection, was performed iteratively on the train and test data, and various possible features were kept or dropped based on the EDA, Lasso, and Ridge Regression results. With both datasets consisting of relevant features, we then balanced via naive undersampling, and z-scaled the training data (as Logistic Regression assumes linear data). Z-scaling was also performed on the numeric test data, though applied so that the scaling parameters were derived from the training data. All categorical variables for all datasets were then one-hot encoded, vectorized, and saved for later use.

Finally, a five-fold cross-validation grid search was used during the hyperparameter tuning phase to find and optimize the best model. The grid itself used Spark's Binary Classification Evaluator to assess model efficacy based on the model's ROC curves. The model with the best hyperparameter values was then selected and used to evaluate the withheld and cleaned test data. Final results were reported using two metrics: ROC curves and F-beta scores. The thresholding of the ROC curves provided a more holistic view of a model's performance than most other metrics, which made it optimal for assessing model performance, and the F-beta score weighted precision more highly than recall, thus enabling finer control of false-positives. This particular implementation was chosen because a false-positive in detecting a flight delay could cause passengers to miss their flight altogether, and would likely result in more customer dissatisfaction than a regular delay would have normally caused.

###Load Data for Pipeline

####3 Month Data

In [0]:
df_otpw_quarter = spark.read.parquet(f"{team_blob_url}/2015_3m_train")
df_otpw_quarter_test = spark.read.parquet(f"{team_blob_url}/2015_3m_test")

####1 Year Data

In [0]:
df_otpw_1yr = spark.read.parquet(f"{team_blob_url}/2015_1yr_train")
df_otpw_1yr_test = spark.read.parquet(f"{team_blob_url}/2015_1yr_test")

####3 Years Data

In [0]:
df_otpw_3yr = spark.read.parquet(f"{team_blob_url}/OTPW_3yr_train")
df_otpw_3yr_test = spark.read.parquet(f"{team_blob_url}/OTPW_3yr_test")

####5 Years Data

In [0]:
df_otpw_5yr = spark.read.parquet(f"{team_blob_url}/OTPW_5yr_train")
df_otpw_5yr_test = spark.read.parquet(f"{team_blob_url}/OTPW_5yr_test")

#### Data Handling Summary Visualization

In [0]:
import plotly.express as px
import pandas as pd

stages = ['Total', 'Train Test Split', 'Null Records Dropped', 'Duplicates Dropped', 'Balanced Data']

df_t = pd.DataFrame(dict(number=[5811854, 0, 0, 0, 0], stage=stages))

df_t['Dataset'] = 'Original'

df_mtl = pd.DataFrame(dict(number=[0, 4380612, 1273852, 1273852, 492710], stage=stages))

df_mtl['Dataset'] = 'Train'

df_toronto = pd.DataFrame(dict(number=[0, 1431242, 398545, 398545, 398545], stage=stages))

df_toronto['Dataset'] = 'Test'

df_d = pd.DataFrame(dict(number=[0, 0, 4139457, 4139457, 4920599], stage=stages))

df_d['Dataset'] = 'Dropped Records'

df = pd.concat([df_t, df_mtl, df_toronto, df_d], axis=0)


fig = px.funnel(df, y='number', x='stage', color='Dataset', color_discrete_map = {'Original': 'darkseagreen', 
                                                                                  'Train': 'dodgerblue', 
                                                                                  'Test':'tomato', 
                                                                                  'Dropped Records':'grey'},
               orientation = 'v')

fig.update_layout(
    autosize=False,
    width=1200,
    height=500,
    title = '')



fig.update_layout(legend=dict(
    orientation="h",
    yanchor="bottom",
    y=1.02,
    xanchor="right",
    x=0.7,
    title = ''
))

fig.update_yaxes(title = '')
fig.update_xaxes(title = '')

fig

###Pre-process Data

In [0]:
import networkx as nx

def getPageRankDict(df, alpha_val = 0.85):
    df_station = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/stations_with_neighbors.parquet/")
    station_list = df_station.select('neighbor_id').distinct().toPandas()['neighbor_id'].tolist()
    traffic_df = df.groupBy("origin_station_id", "dest_station_id").count().toPandas()
    traffic_dict = dict(zip(tuple(zip(traffic_df.origin_station_id, traffic_df.dest_station_id)), traffic_df['count']))
    for i in station_list:
        for j in station_list:
            if i == j:
                continue
            if traffic_dict.get((i, j), None) is None:
                traffic_dict[(i, j)] = 0

    station_graph = nx.DiGraph()

    for key, value in traffic_dict.items():
        station_graph.add_edge(key[0], key[1], weight = value)

    pagerank_dict = nx.pagerank(station_graph, alpha = alpha_val, personalization=None, weight='weight')

    return pagerank_dict

In [0]:
def Pre_Process_DF(df, train_set_flag, pagerank_dict):    
    # Create clean pipeline
    print('initial size: ' + str(df.count()))
    '''
    # Quantify missing Values
    otpw_null_dataframe =  df.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df.columns])
    row_count_otpw = df.count()
    otpw_null_count_pandas = otpw_null_dataframe.toPandas().transpose().reset_index().rename(columns = {0:'nullCount'})
    otpw_null_count_pandas['percentNull'] = np.round(otpw_null_count_pandas['nullCount'] * 100 / row_count_otpw , 2)
    nonnull_cols_otpw = otpw_null_count_pandas[otpw_null_count_pandas['percentNull'] < 4]['index'].to_list()

    otpw_null_count_pandas.rename(columns={'index': 'Column Name', 'nullCount': 'Number of Null Values', 'percentNull': 'Percentages'}, inplace=True)

    # Drop Columns with Null Percentage >=90%
    drop_df = otpw_null_count_pandas[otpw_null_count_pandas['Percentages'] >= 90]
    df = df.drop(*drop_df['Column Name'].tolist())
    '''
    drop_list = [i for i in df.columns if 'Backup' in i]
    drop_list.append('WindEquipmentChangeDate')
    drop_list.append('_row_desc')
    df = df.drop(*drop_list)

    # Parse and clean values
    df = df.withColumn("FL_DATE", F.to_timestamp(df.FL_DATE))
    #df = df.withColumn("sched_depart_date_time", F.to_timestamp(df.sched_depart_date_time))
    df = df.withColumn("sched_depart_date_time_UTC", F.to_timestamp(df.sched_depart_date_time_UTC))
    df = df.withColumn("four_hours_prior_depart_UTC", F.to_timestamp(df.four_hours_prior_depart_UTC))
    df = df.withColumn("two_hours_prior_depart_UTC", F.to_timestamp(df.two_hours_prior_depart_UTC))
    df = df.withColumn("DATE", F.to_timestamp(df.DATE))

    # Parsing Steps

    # Hourly Sky Conditions Parsing
    # Helper function to parse the hourlyskyconditions
    def parse_hourlyskyconditions(s):
        # First, check if the string contains the pattern (letters followed by a colon and numbers)
        if re.search(r'\b[A-Z]+:\d+', str(s)):
            # If the pattern is found, extract numbers that are preceded by letters and a colon
            return re.findall(r'\b[A-Z]+:(\d+)', str(s))
        else:
            # If the pattern is not found, the string might be a standalone number
            # Extract the standalone number
            standalone_number = re.findall(r'\b\d+\b', str(s))
            return standalone_number if standalone_number else []

    parse_hourlyskyconditions_udf = udf(parse_hourlyskyconditions, ArrayType(StringType()))

    df = df.withColumn("HourlySkyConditions_Parsed", parse_hourlyskyconditions_udf(df.HourlySkyConditions))

    # Hourly Present Weater Type Parsing
    def parse_hourlyPresentWeatherType(s):
        # This pattern matches sequences of letters that may be followed by a colon and numbers
        pattern = re.compile(r'(?:[+-])?([A-Z]+)')
        if s is None:
            return []
        # Find all matches in the string, and filter out empty strings if any
        return [match for match in pattern.findall(str(s)) if match]

    parse_hourlyPresentWeatherType_udf = udf(parse_hourlyPresentWeatherType, ArrayType(StringType()))

    df = df.withColumn("HourlyPresentWeatherType_Parsed", parse_hourlyPresentWeatherType_udf(df.HourlyPresentWeatherType))

    # Hourly Pressure Tendency Parsing
    def parse_PressureTendency(value):
        if value is None or value == "":
            return None
        # Remove any non-numeric characters except the decimal point
        numeric_value = ''.join(char for char in str(value) if char.isdigit() or char == '.' or char == '-')
        if numeric_value == '9':
            return None
        return numeric_value

    parse_PressureTendency_udf = udf(parse_PressureTendency, StringType())

    df = df.withColumn("HourlyPressureTendency", parse_PressureTendency_udf(df.HourlyPressureTendency))

    # Hourly Wind Direction Parsing
    def parse_doubleToString(value):
        if value is None or value == "":
            return NotImplementedError
        if str(value) == "VRB":
            return "VRB"
        # Remove any non-numeric characters except the decimal point
        numeric_value = ''.join(char for char in str(value) if char.isdigit() or char == '.' or char == '-')
        return numeric_value

    parse_doubleToString_udf = udf(parse_doubleToString, StringType())

    df = df.withColumn("HourlyWindDirection", parse_doubleToString_udf(df.HourlyWindDirection))

    # Parse all other numeric weather columns to double
    def parse_doubleToDouble(value):
        if value is None or value == "":
            return None
        # Remove any non-numeric characters except the decimal point
        numeric_value = ''.join(char for char in str(value) if char.isdigit() or char == '.' or char == '-')
        if len(numeric_value) == 0:
            return None
        # Convert the cleaned string to a float
        try:
            float(numeric_value)
            return float(numeric_value)
        except ValueError:
            return None
            

    parse_doubleToDouble_udf = udf(parse_doubleToDouble, DoubleType())


################################################################################################
    #creating holiday window features (+/- 1 or 2 days after a federal holdiay)
    federal_holidays_str = ['2015-07-04', '2016-07-04', '2017-07-04', '2018-07-04', '2019-07-04', '2020-07-04', '2015-12-25', '2016-12-25', '2017-12-25', '2018-12-25', '2019-12-25', '2020-12-25', '2015-12-24', '2016-12-24', '2017-12-24', '2018-12-24', '2019-12-24', '2020-12-24', '2018-10-08', '2017-10-09', '2016-10-10', '2015-10-12', '2020-10-12', '2019-10-14', '2018-04-08', '2015-04-12', '2017-04-16', '2020-04-19', '2019-04-28', '2016-05-01', '2020-06-19', '2015-06-19', '2016-06-19', '2017-06-19', '2018-06-19', '2019-06-19', '2019-09-02', '2018-09-03', '2017-09-04', '2016-09-05', '2015-09-07', '2020-09-07', '2019-08-31', '2019-09-01', '2018-09-01', '2018-09-02', '2017-09-02', '2017-09-03', '2016-09-03', '2016-09-04', '2015-09-05', '2020-09-05', '2015-09-06', '2020-09-06', '2018-01-15', '2017-01-16', '2016-01-18', '2015-01-19', '2020-01-20', '2019-01-21', '2015-05-25', '2020-05-25', '2019-05-27', '2018-05-28', '2017-05-29', '2016-05-30', '2016-01-01', '2017-01-01', '2018-01-01', '2019-01-01', '2020-01-01', '2021-01-01', '2015-12-31', '2016-12-31', '2017-12-31', '2018-12-31', '2019-12-31', '2020-12-31', '2018-11-22', '2017-11-23', '2016-11-24', '2015-11-26', '2020-11-26', '2019-11-28', '2018-11-21', '2017-11-22', '2016-11-23', '2015-11-25', '2020-11-25', '2019-11-27', '2015-02-14', '2016-02-14', '2017-02-14', '2018-02-14', '2019-02-14', '2020-02-14', '2015-11-11', '2016-11-11', '2017-11-11', '2018-11-11', '2019-11-11', '2020-11-11', '2016-02-15', '2015-02-16', '2020-02-17', '2019-02-18', '2018-02-19', '2017-02-20', '2016-03-27', '2018-04-01', '2015-04-05', '2020-04-12', '2017-04-16', '2019-04-21']

    #finding days +/- 1 or 2 days after federal holiday
    federal_holidays = []
    for date_str in federal_holidays_str:
        date = pd.to_datetime(date_str)
        for i in range(2):
            before = (date - pd.Timedelta(days=i)).date()
            after = (date + pd.Timedelta(days=i)).date()
            federal_holidays.append(str(before))
            federal_holidays.append(str(after))
    
    #make sure its unique days
    federal_holidays = list(set(federal_holidays))
    def in_holiday_window(date):
        return 1 if date in federal_holidays else 0
    
    in_holiday_window_udf = udf(in_holiday_window, IntegerType())

    df = df.withColumn('in_holiday_window', in_holiday_window_udf(df.FL_DATE))

    ############################################################################################


    ### binning scheduled depature time into morning: 1, afternoon:2 and evening:3
    def time_mapper(scheduled_departing_time):
        time_bins = { '0001-0559': 0,
             '0600-0659':0,
             '0700-0759':0,
             '0800-0859':0,
             '0900-0959':0,
             '1000-1059':0,
             '1100-1159':0,
             '1200-1259':1,
             '1300-1359':1,
             '1400-1459':1,
             '1500-1559':1,
             '1600-1659':1,
             '1700-1759':1,
             '1800-1859':2,
             '1900-1959':2,
             '2000-2059':2,
             '2100-2159':2,
             '2200-2259':2,
             '2300-2359':2,
             }
        
        return time_bins[str(scheduled_departing_time)]

    time_mapper_udf = udf(time_mapper, IntegerType())

    df = df.withColumn('time_of_day_category', time_mapper_udf(df.DEP_TIME_BLK))

    ############################################################################################
    #doublecasting with udf function
    df = df.withColumn("LATITUDE", parse_doubleToDouble_udf(df.LATITUDE))
    df = df.withColumn("LONGITUDE", parse_doubleToDouble_udf(df.LONGITUDE))
    df = df.withColumn("ELEVATION", parse_doubleToDouble_udf(df.ELEVATION))
    df = df.withColumn("HourlyAltimeterSetting", parse_doubleToDouble_udf(df.HourlyAltimeterSetting))
    df = df.withColumn("HourlyDewPointTemperature", parse_doubleToDouble_udf(df.HourlyDewPointTemperature))
    df = df.withColumn("HourlyDryBulbTemperature", parse_doubleToDouble_udf(df.HourlyDryBulbTemperature))
    df = df.withColumn("HourlyPrecipitation", parse_doubleToDouble_udf(df.HourlyPrecipitation))
    df = df.withColumn("HourlyPressureChange", parse_doubleToDouble_udf(df.HourlyPressureChange))
    df = df.withColumn("HourlyRelativeHumidity", parse_doubleToDouble_udf(df.HourlyRelativeHumidity))
    df = df.withColumn("HourlySeaLevelPressure", parse_doubleToDouble_udf(df.HourlySeaLevelPressure))
    df = df.withColumn("HourlyStationPressure", parse_doubleToDouble_udf(df.HourlyStationPressure))
    df = df.withColumn("HourlyVisibility", parse_doubleToDouble_udf(df.HourlyVisibility))
    df = df.withColumn("HourlyWetBulbTemperature", parse_doubleToDouble_udf(df.HourlyWetBulbTemperature))
    df = df.withColumn("HourlyWindGustSpeed", parse_doubleToDouble_udf(df.HourlyWindGustSpeed))
    df = df.withColumn("HourlyWindSpeed", parse_doubleToDouble_udf(df.HourlyWindSpeed))

    ###############################################################################################

    # DEP_DEL15 Parsing
    # Helper function to parse binary columns
    def parse_binary_double(value):
        if value is None:
            return float(1)
        else:
            return float(value)

    parse_binary_double_udf = udf(parse_binary_double, DoubleType())

    def parse_dep_del15(value):
        if value is None:
            return int(1)
        else:
            if value == '0.0':
                return int(0)
            else:
                return int(1)

    parse_dep_del15_udf = udf(parse_dep_del15, IntegerType())

    df = df.withColumn("DEP_DEL15", parse_dep_del15_udf(df.DEP_DEL15))

    # Flight Data Numeric feature Parsing
    df = df.withColumn("DEP_DELAY", parse_doubleToDouble_udf(df.DEP_DELAY))
    df = df.withColumn("DEP_DELAY_NEW", parse_doubleToDouble_udf(df.DEP_DELAY_NEW))
    df = df.withColumn("TAXI_OUT", parse_doubleToDouble_udf(df.TAXI_OUT))
    df = df.withColumn("TAXI_IN", parse_doubleToDouble_udf(df.TAXI_IN))
    df = df.withColumn("ARR_DELAY", parse_doubleToDouble_udf(df.ARR_DELAY))
    df = df.withColumn("ARR_DELAY_NEW", parse_doubleToDouble_udf(df.ARR_DELAY_NEW))
    df = df.withColumn("CRS_ELAPSED_TIME", parse_doubleToDouble_udf(df.CRS_ELAPSED_TIME))
    df = df.withColumn("ACTUAL_ELAPSED_TIME", parse_doubleToDouble_udf(df.ACTUAL_ELAPSED_TIME))
    df = df.withColumn("AIR_TIME", parse_doubleToDouble_udf(df.AIR_TIME))
    df = df.withColumn("FLIGHTS", parse_doubleToDouble_udf(df.FLIGHTS))
    df = df.withColumn("DISTANCE", parse_doubleToDouble_udf(df.DISTANCE))
    df = df.withColumn("CARRIER_DELAY", parse_doubleToDouble_udf(df.CARRIER_DELAY))
    df = df.withColumn("WEATHER_DELAY", parse_doubleToDouble_udf(df.WEATHER_DELAY))
    df = df.withColumn("NAS_DELAY", parse_doubleToDouble_udf(df.NAS_DELAY))
    df = df.withColumn("SECURITY_DELAY", parse_doubleToDouble_udf(df.SECURITY_DELAY))
    df = df.withColumn("LATE_AIRCRAFT_DELAY", parse_doubleToDouble_udf(df.LATE_AIRCRAFT_DELAY))

    ## Feature Selection

    # Weather Feature Selection
    columns_to_zscale = ["CRS_ELAPSED_TIME", "DISTANCE", "ELEVATION", "HourlyAltimeterSetting","HourlyDewPointTemperature", "HourlyDryBulbTemperature","HourlyPrecipitation", "HourlyPressureChange", "HourlyRelativeHumidity", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindSpeed"]

    df_feature_select = df.dropna(subset = columns_to_zscale)
    print("size after dropping null records: " + str(df_feature_select.count()))

    # Check for duplicates
    dupes_dropped = df_feature_select.dropDuplicates()

    if (df_feature_select.count(), len(df_feature_select.columns)) == (dupes_dropped.count(), len(dupes_dropped.columns)):
        print('No Duplicates found')
    else:
        print(f'{df_feature_select.count() - dupes_dropped.count()} duplicates dropped')
        df_feature_select = dupes_dropped

    print("size after dropping duplicates: " + str(df_feature_select.count()))

    if train_set_flag == 1:
        # Balance the data
        major_df = df_feature_select.filter(F.col('DEP_DEL15') == 0)
        minor_df = df_feature_select.filter(F.col('DEP_DEL15') > 0)
        ratio = float(major_df.count() / minor_df.count())
        #print("Unbalanced ratio of No Delays to Delays: {}".format(ratio))
        

        sampled_majority_df = major_df.sample(False, 1/ratio)
        df_feature_select = sampled_majority_df.unionAll(minor_df)
        print("size after balancing: " + str(df_feature_select.count()))
    
    def parse_pagerank(value):
        if value is None or value == "":
            return None
        if pagerank_dict.get(value, None) is None:
            return None
        else:
            return pagerank_dict[value]
        
    parse_pagerank_udf = udf(parse_pagerank, DoubleType())

    df_feature_select = df_feature_select.withColumn("pagerank", parse_pagerank_udf(df_feature_select.origin_station_id))

    hourlySkyConditions_list = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
    hourlyWeatherType_list = ['FG', 'TS', 'PL', 'GR', 'GL', 'DU', 'HZ', 'BLSN', 'FC', 'WIND', 'BLPY', 'BR', 'DZ', 'FZDZ', 'RA', 'FZRA', 'SN', 'UP', "MIFG", 'FZFG']

    for category in hourlySkyConditions_list:
        df_feature_select = df_feature_select.withColumn(f"skyConditions_{category}", array_contains(F.col("HourlySkyConditions_Parsed"), category).cast("int"))

    for category in hourlyWeatherType_list:
        df_feature_select = df_feature_select.withColumn(f"weatherType_{category}", array_contains(F.col("HourlyPresentWeatherType_Parsed"), category).cast("int"))

    df_feature_select = df_feature_select.drop(*['HourlySkyConditions_Parsed', 'HourlyPresentWeatherType_Parsed'])

    return df_feature_select

####3 Month Data

In [0]:
print("3 Months Train Data:")
pagerank_dict_quarter = getPageRankDict(df_otpw_quarter, 0.85)
train_quarter = Pre_Process_DF(df_otpw_quarter, 1, pagerank_dict_quarter)
print("----------------------------------")
print("3 Months Test Data:")
test_quarter = Pre_Process_DF(df_otpw_quarter_test, 0, pagerank_dict_quarter)

3 Months Train Data:
initial size: 897997
size after dropping null records: 243657
No Duplicates found
size after dropping duplicates: 243657
size after balancing: 109277
----------------------------------
3 Months Test Data:
initial size: 503366
size after dropping null records: 144051
No Duplicates found
size after dropping duplicates: 144051


####5 Years Data

In [0]:
print("5 Years Train Data:")
pagerank_dict_5yr = getPageRankDict(df_otpw_5yr, 0.85)
train_5yr = Pre_Process_DF(df_otpw_5yr, 1, pagerank_dict_5yr)
print("----------------------------------")
print("5 Years Test Data:")
test_5yr = Pre_Process_DF(df_otpw_5yr_test, 0, pagerank_dict_5yr)  

5 Years Train Data:
initial size: 24279321
size after dropping null records: 6961697
No Duplicates found
size after dropping duplicates: 6961697
size after balancing: 2535509
----------------------------------
5 Years Test Data:
initial size: 7393798
size after dropping null records: 2096849
No Duplicates found
size after dropping duplicates: 2096849


###Create Pipeline

In [0]:
def create_pipeline(train_df):
    categorical_features_names = ['STATION', 'HourlyPressureTendency', 'HourlyWindDirection', 'DAY_OF_WEEK','MONTH', 'origin_icao','ORIGIN_STATE_ABR','dest_icao','DEST_STATE_ABR', 'time_of_day_category']

    string_indexers = [StringIndexer(inputCol = column, outputCol = f"{column}_Index", handleInvalid = 'keep') for column in categorical_features_names]
    one_hot_encoders = [OneHotEncoder(inputCol = indexer.getOutputCol(), outputCol = f"{column}_Vec") for indexer, column in zip(string_indexers,categorical_features_names)]

    columns_to_zscale = ["CRS_ELAPSED_TIME", "DISTANCE", "ELEVATION", "HourlyAltimeterSetting","HourlyDewPointTemperature", "HourlyDryBulbTemperature","HourlyPrecipitation", "HourlyPressureChange", "HourlyRelativeHumidity", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindSpeed"]

    scaler_assembler = VectorAssembler(inputCols = columns_to_zscale, outputCol = "features_to_zscale", handleInvalid = "keep")
    scaler = StandardScaler(withMean=True, inputCol = scaler_assembler.getOutputCol(), outputCol = "scaled_numeric_features")

    hourlySkyConditions_list = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
    hourlyWeatherType_list = ['FG', 'TS', 'PL', 'GR', 'GL', 'DU', 'HZ', 'BLSN', 'FC', 'WIND', 'BLPY', 'BR', 'DZ', 'FZDZ', 'RA', 'FZRA', 'SN', 'UP', "MIFG", 'FZFG']

    final_features_list = [f"{name}_Vec" for name in categorical_features_names]
    final_features_list.append("scaled_numeric_features")
    final_features_list.append('in_holiday_window')
    final_features_list.append('pagerank')

    for i in hourlySkyConditions_list:
        final_features_list.append(f"skyConditions_{i}")
    for i in hourlyWeatherType_list:
        final_features_list.append(f"weatherType_{i}")

    final_assembler = VectorAssembler(inputCols = final_features_list, outputCol="final_features")

    stages = string_indexers + one_hot_encoders
    stages.append(scaler_assembler)
    stages.append(scaler)
    stages.append(final_assembler)

    regression_pipeline = Pipeline(stages = stages)

    pipeline_model = regression_pipeline.fit(train_df)
    # transform the data
    transformed_train= pipeline_model.transform(train_df)
    
    return transformed_train, pipeline_model

In [0]:
def create_pipeline_w_feature_select(train_df):
    categorical_features_names = ['HourlyPressureTendency', 'HourlyWindDirection', 'DAY_OF_WEEK','MONTH', 'origin_icao','ORIGIN_STATE_ABR','dest_icao','DEST_STATE_ABR', 'time_of_day_category']

    string_indexers = [StringIndexer(inputCol = column, outputCol = f"{column}_Index", handleInvalid = 'keep') for column in categorical_features_names]
    one_hot_encoders = [OneHotEncoder(inputCol = indexer.getOutputCol(), outputCol = f"{column}_Vec") for indexer, column in zip(string_indexers,categorical_features_names)]

    columns_to_zscale = ['HourlyRelativeHumidity', 'HourlyDryBulbTemperature', 'HourlyWindSpeed', 'HourlyWetBulbTemperature', 'HourlyAltimeterSetting']

    scaler_assembler = VectorAssembler(inputCols = columns_to_zscale, outputCol = "features_to_zscale", handleInvalid = "keep")
    scaler = StandardScaler(withMean=True, inputCol = scaler_assembler.getOutputCol(), outputCol = "scaled_numeric_features")

    hourlySkyConditions_list = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
    hourlyWeatherType_list = ['FG', 'TS', 'PL', 'GR', 'GL', 'DU', 'HZ', 'BLSN', 'FC', 'WIND', 'BLPY', 'BR', 'DZ', 'FZDZ', 'RA', 'FZRA', 'SN', 'UP', "MIFG", 'FZFG']

    final_features_list = [f"{name}_Vec" for name in categorical_features_names]
    final_features_list.append("scaled_numeric_features")
    final_features_list.append('in_holiday_window')
    final_features_list.append('pagerank')

    
    for i in hourlySkyConditions_list:
        final_features_list.append(f"skyConditions_{i}")
    for i in hourlyWeatherType_list:
        final_features_list.append(f"weatherType_{i}")
    
    final_assembler = VectorAssembler(inputCols = final_features_list, outputCol="final_features")

    stages = string_indexers + one_hot_encoders
    stages.append(scaler_assembler)
    stages.append(scaler)
    stages.append(final_assembler)

    regression_pipeline = Pipeline(stages = stages)

    pipeline_model = regression_pipeline.fit(train_df)
    # transform the data
    transformed_train= pipeline_model.transform(train_df)
    
    return transformed_train, pipeline_model

####3 Month Data

In [0]:
print("3 Month Data")
transformed_train_quarter, transformed_pipeline = create_pipeline(train_quarter)
transformed_test_quarter = transformed_pipeline.transform(test_quarter)

3 Month Data


Downloading artifacts:   0%|          | 0/153 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

####5 Years Data with Feature Selection

In [0]:
print("5 Years Data with Feature Selection")
transformed_train_5yr_selected, transformed_pipeline_5yr_selected = create_pipeline_w_feature_select(train_5yr)
transformed_test_5yr_selected = transformed_pipeline_5yr_selected.transform(test_5yr)

5 Years Data with Feature Selection


Downloading artifacts:   0%|          | 0/139 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

#Modeling And Results

###Create Logistic Regression Model

In [0]:
def createLRModel(df, iterations= 10):
    '''
    takes the traindf and produces a logistic regression model
    '''
    lr = LogisticRegression(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=iterations)
    lrModel = lr.fit(df)

    # Print the coefficients and intercept for logistic regression
    print("Coefficients: " + str(lrModel.coefficients))
    print("Intercept: " + str(lrModel.intercept))

    return lrModel

####3 Month Data

In [0]:
print("3 Month Data")
train_quarter_balance_lrmodel = createLRModel(transformed_train_quarter)
trainingSummary_balance = train_quarter_balance_lrmodel.summary

3 Month Data


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Coefficients: [-0.0616311798265122,-0.10308837851659149,0.10335708937646568,0.05559858905430236,0.11677824495581114,0.23267083300127359,0.10216267181401632,0.2962959288848246,0.18084839513397172,0.10971408456903554,0.041769512037168735,-0.09286616994108299,-0.0622856545544734,-0.026870713262601237,-0.14591718830585534,-0.06148044093945529,-0.3095254206050996,0.29130576314962153,-0.16322985813987217,-0.18146268205800145,0.08385506120498218,-0.05636205341352939,0.264381998209632,-0.4082785156445402,0.060745805317556446,0.4972062343163322,-0.0763328261188359,-0.130311511657655,0.15545489079322172,-0.10901393578231709,-0.13456366710250203,-0.101993605086944,0.16159989475467657,0.007839267189934704,0.09259343489004662,0.0023958959955063053,-0.2604559778068632,0.5132618563974654,-0.09710998915660915,0.021935171531669427,0.2114303049524007,-0.06363656248417045,-0.24948577447525963,0.2782123598946224,-0.15654525194777752,-0.3897628743071228,0.04459759130303905,-0.16690447774082678,-0.142243112

####3 Month Data with Feature Selection

In [0]:
print("3 Month Data with Feature Selection")
train_quarter_selected_lrmodel = createLRModel(transformed_train_quarter_selected)
trainingSummary_selected = train_quarter_selected_lrmodel.summary

3 Month Data with Feature Selection


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Coefficients: [-0.11288649738518008,0.04677432865071366,-0.052883295601613325,0.11010968391500896,0.18294170118855316,-0.10871306374270233,0.002236376073739537,0.03738768274517956,-0.04655141281035854,0.04428133455836615,0.16984050088453617,0.11691375162321568,0.007952419670665319,0.17591360308395276,0.11832025965675876,0.09451044505930165,0.014975781151300167,-0.04241043598051555,0.179140381349855,-0.16201677502506368,0.11284529429937591,0.03805010145390786,-0.08405671262465639,-0.03748958615253454,-0.0177517544042697,0.05947442676924928,-0.18326514132479965,-0.13541558841518805,-0.11168412595872097,0.04575875005308182,-0.03443674490076374,-0.11311399542848946,-0.10819322709292488,-0.06844027641859758,0.0036664457653203812,-0.03341924844412984,-0.13019540621035305,0.008390977946548696,-0.19330325185182923,-0.0773427326425514,-0.0830661475497612,-0.08461559322209788,-0.11267188171048266,-0.11600524603621158,0.06717969087124073,-0.04984491437059559,-0.15795883063706778,-0.10429088283778

####5 Years Data with Feature Selection

In [0]:
print("5 Years Train Data with Feature Selection:")
train_5yr_balance_lrmodel_selected = createLRModel(transformed_train_5yr_selected)
trainingSummary_balance_5yr_selected = train_5yr_balance_lrmodel_selected.summary

5 Years Train Data with Feature Selection:


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Coefficients: [0.043190879599749626,-0.017654382468304555,-0.060174578146844196,0.015956702949858004,-0.03378947639685919,0.06927397871010063,-0.09464169806773823,0.01921710231976219,-0.09817158821253667,0.03750122610983623,0.02995509912201618,-0.11099316396779355,-0.07518311976149719,0.07447013394830435,-0.04478207451335791,0.06900030458901556,-0.041893194162030876,0.08284390545172775,0.07005660524532535,-0.0916901556183549,0.032798180642458825,-0.04200152043738857,0.06678708118248597,-0.023187488129683027,0.09862747594571947,0.08651282960781301,0.0011249146400483655,-0.01746286308322509,-0.07477166572072122,0.07851462215326432,-0.039453242789548,-0.038441260615210956,0.06877870257695311,0.07398472798614666,-0.014526804798093191,-0.05790452636898517,0.06926358600122881,-0.07429724023883004,-0.08495864599189193,-0.07514074877841416,-0.05038813234130112,0.031538107558999315,0.006055234311740168,-0.03521375055264964,-0.08849658055155378,-0.024458761256095703,-0.07342689442014065,-1.23271

###Logistic Regression Train Results

In [0]:
def evaluate_model(model_prediction, beta=1.0):
    # Convert DataFrame to RDD
    prediction_and_label = model_prediction.select(["prediction", "DEP_DEL15"]).withColumn('DEP_DEL15', col('DEP_DEL15').cast(FloatType())).orderBy('DEP_DEL15')

    # Create MulticlassMetrics object
    metrics = MulticlassMetrics(prediction_and_label.rdd.map(tuple))

    return metrics.accuracy, metrics.precision(1.0), metrics.recall(1.0), metrics.fMeasure(1.0), metrics.fMeasure(1.0, beta=beta)

####3 Month Data

In [0]:
lr_train_predictions_balance = train_quarter_balance_lrmodel.transform(transformed_train_quarter)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_train_predictions_balance, 1.0)

print("3 Month Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



3 Month Data Train Results
Precision: 0.6277
Recall: 0.6159
F-1: 0.6218
F-1 Beta: 0.6218


####3 Month Data with Feature Selection

In [0]:
lr_train_predictions_selected = train_quarter_selected_lrmodel.transform(transformed_train_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_train_predictions_selected, 1.0)

print("3 Month Data with Feature Selection Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")


Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.



3 Month Data with Feature Selection Train Results
Precision: 0.6271
Recall: 0.6083
F-1: 0.6176
F-1 Beta: 0.6176


####5 Years Data with Feature Selection

In [0]:
lr_train_predictions_balance_5yr_selected = train_5yr_balance_lrmodel_selected.transform(transformed_train_5yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_train_predictions_balance_5yr_selected, 0.5)

print("5 Years Data with Feature Selection Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



5 Years Data with Feature Selection Train Results
Precision: 0.6188
Recall: 0.6398
F-1: 0.6291
F-1 Beta: 0.6229


###Logistic Regression Test Results

####3 Month Data

In [0]:
lr_predictions_balance = train_quarter_balance_lrmodel.transform(transformed_test_quarter)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_predictions_balance, 1.0)

print("3 Month Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

3 Month Data Test Results
Precision: 0.2966
Recall: 0.3603
F-1: 0.3253
F-1 Beta: 0.3253


####3 Month Data with Feature Selection

In [0]:
lr_test_predictions_selected = train_quarter_selected_lrmodel.transform(transformed_test_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_test_predictions_selected, 1.0)

print("3 Month Data with Feature Selection Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

3 Month Data with Feature Selection Test Results
Precision: 0.2957
Recall: 0.3602
F-1: 0.3248
F-1 Beta: 0.3248


####5 Years Data with Feature Selection

In [0]:
lr_predictions_balance_5yr_selected = train_5yr_balance_lrmodel_selected.transform(transformed_test_5yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(lr_predictions_balance_5yr_selected, 0.5)

print("5 Years Data with Feature Selection Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



5 Years Data with Feature Selection Test Results
Precision: 0.2715
Recall: 0.6308
F-1: 0.3796
F-1 Beta: 0.3064


###Cross-Validation

####Fit Cross Validation Models

In [0]:
lr = LogisticRegression(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=10)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20, 50])
             .build())

crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="DEP_DEL15"),
                          numFolds = 5)

# Run cross-validation, and choose the best set of parameters.
cv_balanced = crossval.fit(transformed_train_1yr)

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
lr = LogisticRegression(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=10)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20, 50])
             .build())

crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="DEP_DEL15"),
                          numFolds = 5)

# Run cross-validation, and choose the best set of parameters.
cv_balanced_selected = crossval.fit(transformed_train_1yr_selected)

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
lr = LogisticRegression(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=10)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20, 50])
             .build())

crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="DEP_DEL15"),
                          numFolds = 5)

# Run cross-validation, and choose the best set of parameters.
cv_unbalanced = crossval.fit(transformed_train_1yr_unbalanced)

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
lr = LogisticRegression(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=10)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [5, 10, 20, 50])
             .build())

crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="DEP_DEL15"),
                          numFolds = 5)

# Run cross-validation, and choose the best set of parameters.
cv_unbalanced_selected = crossval.fit(transformed_train_1yr_unbalanced_selected)

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

####Cross-Validation Train Results

In [0]:
cross_valid_train_prediction = cv_balanced.transform(transformed_train_1yr)

cross_acc, cross_pre, cross_recall, cross_f1, cross_f1_b = evaluate_model(cross_valid_train_prediction, 1.0)

print("1 Year Data Cross Validation Train Results")
#print(f"Accuracy: {cross_acc:.4f}")
print(f"Precision: {cross_pre:.4f}")
print(f"Recall: {cross_recall:.4f}")
print(f"F-1: {cross_f1:.4f}")
print(f"F-1 Beta: {cross_f1_b:.4f}")


Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.



1 Year Data Cross Validation Train Results
Precision: 0.5969
Recall: 0.6070
F-1: 0.6019
F-1 Beta: 0.6019


In [0]:
cross_valid_train_prediction_selected = cv_balanced_selected.transform(transformed_train_1yr_selected)

cross_acc, cross_pre, cross_recall, cross_f1, cross_f1_b = evaluate_model(cross_valid_train_prediction_selected, 1.0)

print("1 Year Data with Feature Selection Cross Validation Train Results")
#print(f"Accuracy: {cross_acc:.4f}")
print(f"Precision: {cross_pre:.4f}")
print(f"Recall: {cross_recall:.4f}")
print(f"F-1: {cross_f1:.4f}")
print(f"F-1 Beta: {cross_f1_b:.4f}")


Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.



1 Year Data with Feature Selection Cross Validation Train Results
Precision: 0.5512
Recall: 0.4665
F-1: 0.5053
F-1 Beta: 0.5053


####Cross-Validation Test Results

In [0]:
cross_valid_test_prediction = cv_balanced.transform(transformed_test_1yr)

cross_acc, cross_pre, cross_recall, cross_f1, cross_f1_b = evaluate_model(cross_valid_test_prediction, 1.0)

print("1 Year Data Cross Validation Test Results")
#print(f"Accuracy: {cross_acc:.4f}")
print(f"Precision: {cross_pre:.4f}")
print(f"Recall: {cross_recall:.4f}")
print(f"F-1: {cross_f1:.4f}")
print(f"F-1 Beta: {cross_f1_b:.4f}")

1 Year Data Cross Validation Test Results
Precision: 0.1944
Recall: 0.6806
F-1: 0.3024
F-1 Beta: 0.3024


In [0]:
cross_valid_test_prediction_selected = cv_balanced_selected.transform(transformed_test_1yr_selected)

cross_acc, cross_pre, cross_recall, cross_f1, cross_f1_b = evaluate_model(cross_valid_test_prediction_selected, 1.0)

print("1 Year Data with Feature Selection Cross Validation Test Results")
#print(f"Accuracy: {cross_acc:.4f}")
print(f"Precision: {cross_pre:.4f}")
print(f"Recall: {cross_recall:.4f}")
print(f"F-1: {cross_f1:.4f}")
print(f"F-1 Beta: {cross_f1_b:.4f}")

1 Year Data with Feature Selection Cross Validation Test Results
Precision: 0.1862
Recall: 0.5706
F-1: 0.2808
F-1 Beta: 0.2808


##Baseline Model Summary Table

### Training Set Results 

<table>
<tr><th>Unbalanced Dataset </th><th>Balanced Dataset</th></tr>
<tr><td>
    
    
|Without Feature Selection| Result|
|--|--|
|Precision| 0.6049|
|Recall| 0.0099|
|F-1| 0.0194|
|F-Beta| 0.0194|
    

|With Feature Selection| Result|
|--|--|
|Precision| 0.6078|
|Recall| 0.0099|
|F-1| 0.0194|
|F-Beta| 0.0194|

</td><td>


|Without Feature Selection| Result|
|--|--|
|Precision| 0.5972|
|Recall| 0.6059|
|F-1| 0.6015|
|F-Beta| 0.6015|
    

|With Feature Selection| Result|
|--|--|
|Precision| 0.5961|
|Recall| 0.6085|
|F-1| 0.6022|
|F-Beta| 0.6022|

</td></tr> </table>


### Test Set Results 

<table>
<tr><th>Unbalanced Dataset </th><th>Balanced Dataset</th></tr>
<tr><td>
    
    
|Without Feature Selection| Result|
|--|--|
|Precision| 0.5645|
|Recall| 0.0104|
|F-1| 0.0204|
|F-Beta| 0.0204|
    

|With Feature Selection| Result|
|--|--|
|Precision| 0.5803|
|Recall| 0.0105|
|F-1| 0.0207|
|F-Beta| 0.0207|

</td><td>


|Without Feature Selection| Result|
|--|--|
|Precision| 0.1938|
|Recall| 0.6878|
|F-1| 0.3024|
|F-Beta| 0.3024|
    

|With Feature Selection| Result|
|--|--|
|Precision| 0.5654|
|Recall| 0.0104|
|F-1| 0.0204|
|F-Beta| 0.0204|

</td></tr> </table>

#Conclusions and Future Directions

Final analysis of the regularized datasets indicated that the Logistic Regression (LR) model worked best with a balanced dataset and with feature selection- the unbalanced dataset's results initially indicated higher precision, but resulted in the model just guessing which samples were delayed or not in line with the 3:1 ratio of undelayed/delayed present in the dataset itself. The LR model with balancing performed well overall, but the model with feature selection performed well enough that our target metric (F-beta, weighted towards precision) scored ~52-56% on average, with the model's ROC curves confirming this finding.   


Future directions for the model development include investigating graph network and support vector machine usage, as it's hypothesized that the data is multidimensional; implementing BlockSplit cross-validation since the initial dataset is composed of timeseries data; and adjusting the F-beta calculations to weight more for precision, since false positives are considered the worst-case scenario for this project's particular implementations.

#More Advanced Model Testing (Support Vector Machine)

In [0]:
from pyspark.ml.classification import LinearSVC

# Load training data

transformed_train_quarter_selected, transformed_pipeline_quarter_selected = create_pipeline_w_feature_select(train_quarter)
transformed_test_quarter_selected = transformed_pipeline_quarter_selected.transform(test_quarter)

def createSVModel(df, iterations= 10):
    '''
    takes the traindf and produces a logistic regression model
    '''
    lr = LinearSVC(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=iterations)
    lrModel = lr.fit(df)

    # Print the coefficients and intercept for logistic regression
    print("Coefficients: " + str(lrModel.coefficients))
    print("Intercept: " + str(lrModel.intercept))

    return lrModel

print("3 Month Data")
train_quarter_balance_svmodel = createSVModel(transformed_train_quarter)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:429)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1225)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:958)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.Usag

In [0]:
svmtrainingSummary_balance = train_quarter_balance_svmodel.summary
display(svmtrainingSummary_balance)

<bound method LinearSVCModel.summary of LinearSVCModel: uid=LinearSVC_63ee85efe9a5, numClasses=2, numFeatures=1039>

In [0]:
#SVM training and results

svm_train_predictions_balance = train_quarter_balance_svmodel.transform(transformed_train_quarter)

svm_acc, svm_pre, svm_recall, svm_f1, svm_f1_b = evaluate_model(svm_train_predictions_balance, 1.0)

print("3 Month Data with Feature Selection Cross Validation Train Results")
print(f"Accuracy: {svm_acc:.4f}")
print(f"Precision: {svm_pre:.4f}")
print(f"Recall: {svm_recall:.4f}")
print(f"F-1: {svm_f1:.4f}")
print(f"F-1 Beta: {svm_f1_b:.4f}")


Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.



3 Month Data with Feature Selection Cross Validation Train Results
Accuracy: 0.6348
Precision: 0.6371
Recall: 0.6249
F-1: 0.6310
F-1 Beta: 0.6310


In [0]:
#SVM testing and results

svm_train_predictions_balance = train_quarter_balance_svmodel.transform(transformed_test_quarter)

svm_acc, svm_pre, svm_recall, svm_f1, svm_f1_b = evaluate_model(svm_train_predictions_balance, 1.0)

print("3 Month Data with Feature Selection SVM Test Results")
#print(f"Accuracy: {svm_acc:.4f}")
print(f"Precision: {svm_pre:.4f}")
print(f"Recall: {svm_recall:.4f}")
print(f"F-1: {svm_f1:.4f}")
print(f"F-1 Beta: {svm_f1_b:.4f}")

3 Month Data with Feature Selection SVM Test Results
Precision: 0.2989
Recall: 0.4544
F-1: 0.3606
F-1 Beta: 0.3606


#More Advanced Model Testing (Multilayer Perceptron)

In [0]:
#variables reference for model runs
#transformed_test_quarter_selected is BALANCED, 3 MOS, WITH FEATURE SELECTION

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import IntegerType 

def trainMLPModel(df, iterations= 10):
    '''
    takes the traindf and produces a multilayered perceptron model
    '''
    feature_len = len(df.select('final_features').limit(1).collect()[0][0])
    print("feature length: " + str(feature_len))

    layers = [feature_len, 14, 20, 14, 2]
    mlp = MultilayerPerceptronClassifier(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=100, layers=layers, seed=1234)
    mlpModel = mlp.fit(df)

    return mlpModel

###3 Month

In [0]:
print("MLP 3 Month Data")
train_quarter_balance_mlpmodel = trainMLPModel(transformed_train_quarter_selected)

train_result = train_quarter_balance_mlpmodel.transform(transformed_train_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_result, 1.0)

print("3 Month Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



3 Month Data Train Results
Precision: 0.6553
Recall: 0.6206
F-1: 0.6375
F-1 Beta: 0.6375


In [0]:
test_result = train_quarter_balance_mlpmodel.transform(transformed_test_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_result, 1.0)

print("3 Month Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

3 Month Data Test Results
Precision: 0.2884
Recall: 0.5419
F-1: 0.3765
F-1 Beta: 0.3765


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

metrics = ['f1', 'weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
        print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_result)))
        print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_result)))

Train f1 = 0.6456604313862144
Test f1 = 0.6741053176051017
Train weightedPrecision = 0.646046588823221
Test weightedPrecision = 0.740978667797093
Train weightedRecall = 0.6458007303077725
Test weightedRecall = 0.6410993328751623
Train accuracy = 0.6458007303077725
Test accuracy = 0.6410993328751623


###1 Year

In [0]:
transformed_train_1yr_selected, transformed_pipeline_1yr_selected = create_pipeline_w_feature_select(train_1yr)
transformed_test_1yr_selected = transformed_pipeline_1yr_selected.transform(test_1yr)

transformed_train_1yr_selected = transformed_train_1yr_selected.select(*keep_list)
transformed_test_1yr_selected = transformed_test_1yr_selected.select(*keep_list)

Downloading artifacts:   0%|          | 0/139 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
print("MLP 1 Year Data")
train_1yr_balance_mlpmodel = trainMLPModel(transformed_train_1yr_selected)

train_1yr_result = train_1yr_balance_mlpmodel.transform(transformed_train_1yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_1yr_result, 1.0)

print("1 Year Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



1 Year Data Train Results
Precision: 0.6337
Recall: 0.6217
F-1: 0.6276
F-1 Beta: 0.6276


In [0]:
test_1yr_result = train_1yr_balance_mlpmodel.transform(transformed_test_1yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_1yr_result, 1.0)

print("1 Year Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

1 Year Data Test Results
Precision: 0.2243
Recall: 0.5865
F-1: 0.3245
F-1 Beta: 0.3245


In [0]:
print("1 Year Data Results")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_1yr_result)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_1yr_result)))

1 Year Data Results
Train f1 = 0.6306081878998775
Test f1 = 0.6652447096436592
Train weightedPrecision = 0.6306931654036256
Test weightedPrecision = 0.7832152911173159
Train weightedRecall = 0.6306366801100354
Test weightedRecall = 0.6135191760027099
Train accuracy = 0.6306366801100355
Test accuracy = 0.6135191760027099


###3 Years

In [0]:
transformed_train_3yr_selected, transformed_pipeline_3yr_selected = create_pipeline_w_feature_select(train_3yr)
transformed_test_3yr_selected = transformed_pipeline_3yr_selected.transform(test_3yr)

transformed_train_3yr_selected = transformed_train_3yr_selected.select(*keep_list)
transformed_test_3yr_selected = transformed_test_3yr_selected.select(*keep_list)



Downloading artifacts:   0%|          | 0/139 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
#TRAIN
print("MLP 3 Years Data")
train_3yr_balance_mlpmodel = trainMLPModel(transformed_train_3yr_selected)

train_3yr_result = train_3yr_balance_mlpmodel.transform(transformed_train_3yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_3yr_result, 1.0)

print("3 Years Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



3 Years Data Train Results
Precision: 0.6350
Recall: 0.6047
F-1: 0.6195
F-1 Beta: 0.6195


In [0]:
#TEST
test_3yr_result = train_3yr_balance_mlpmodel.transform(transformed_test_3yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_3yr_result, 1.0)

print("3 Years Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

3 Years Data Test Results
Precision: 0.2739
Recall: 0.5896
F-1: 0.3740
F-1 Beta: 0.3740


In [0]:
print("3 Years Data Results")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_3yr_result)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_3yr_result)))

3 Years Data Results
Train f1 = 0.6285544762620634
Test f1 = 0.6793489792656717
Train weightedPrecision = 0.6290566646168612
Test weightedPrecision = 0.7668621243124075
Train weightedRecall = 0.6287715776694722
Test weightedRecall = 0.6400912189284763
Train accuracy = 0.6287715776694723
Test accuracy = 0.6400912189284763


###5 Years

In [0]:
print("MLP 5 Years Data")
keep_list = ['final_features', 'DEP_DEL15']
train_5yr_balance_mlpmodel = trainMLPModel(transformed_train_5yr_selected)

train_5yr_result = train_5yr_balance_mlpmodel.transform(transformed_train_5yr_selected)

MLP 5 Years Data
feature length: 1026


In [0]:
balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_5yr_result, 0.5)

print("5 Years Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")



5 Years Data Train Results
Precision: 0.6315
Recall: 0.6034
F-1: 0.6171
F-1 Beta: 0.6257


In [0]:
train_5yr_result.summary()

DataFrame[summary: string, QUARTER: string, DAY_OF_MONTH: string, DAY_OF_WEEK: string, OP_UNIQUE_CARRIER: string, OP_CARRIER_AIRLINE_ID: string, OP_CARRIER: string, TAIL_NUM: string, OP_CARRIER_FL_NUM: string, ORIGIN_AIRPORT_ID: string, ORIGIN_AIRPORT_SEQ_ID: string, ORIGIN_CITY_MARKET_ID: string, ORIGIN: string, ORIGIN_CITY_NAME: string, ORIGIN_STATE_ABR: string, ORIGIN_STATE_FIPS: string, ORIGIN_STATE_NM: string, ORIGIN_WAC: string, DEST_AIRPORT_ID: string, DEST_AIRPORT_SEQ_ID: string, DEST_CITY_MARKET_ID: string, DEST: string, DEST_CITY_NAME: string, DEST_STATE_ABR: string, DEST_STATE_FIPS: string, DEST_STATE_NM: string, DEST_WAC: string, CRS_DEP_TIME: string, DEP_TIME: string, DEP_DELAY: string, DEP_DELAY_NEW: string, DEP_DEL15: string, DEP_DELAY_GROUP: string, DEP_TIME_BLK: string, TAXI_OUT: string, WHEELS_OFF: string, WHEELS_ON: string, TAXI_IN: string, CRS_ARR_TIME: string, ARR_TIME: string, ARR_DELAY: string, ARR_DELAY_NEW: string, ARR_DEL15: string, ARR_DELAY_GROUP: string, AR

In [0]:
test_5yr_result = train_5yr_balance_mlpmodel.transform(transformed_test_5yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_5yr_result, 0.5)

print("5 Years Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

5 Years Data Test Results
Precision: 0.2818
Recall: 0.5918
F-1: 0.3818
F-1 Beta: 0.3148


In [0]:
print("5 Years Data Results")
metrics = ['f1', 'weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 0.5)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_5yr_result)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_5yr_result)))

5 Years Data Results
Train f1 = 0.6254824905236205
Test f1 = 0.6731938906167972
Train weightedPrecision = 0.6259163647769693
Test weightedPrecision = 0.7587891763188005
Train weightedRecall = 0.6256688499232304
Test weightedRecall = 0.6355498178457295
Train accuracy = 0.6256688499232304
Test accuracy = 0.6355498178457295


#MLP ARCH 2 TESTING [NUM, NUM*2, NUM, 2]

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import IntegerType 

def trainMLPModel2(df, iterations= 10):
    '''
    takes the traindf and produces a multilayered perceptron model
    '''
    feature_len = len(df.select('final_features').limit(1).collect()[0][0])
    print("feature length: " + str(feature_len))

    layers = [feature_len, feature_len*2, feature_len, 2]
    mlp = MultilayerPerceptronClassifier(featuresCol = 'final_features' , labelCol = 'DEP_DEL15', maxIter=100, layers=layers, seed=1234)
    mlpModel = mlp.fit(df)

    return mlpModel

###3 MONTH

In [0]:
keep_list = ['final_features', 'DEP_DEL15']
transformed_train_quarter_selected, transformed_pipeline_quarter_selected = create_pipeline_w_feature_select(train_quarter)
transformed_test_quarter_selected = transformed_pipeline_quarter_selected.transform(test_quarter)

transformed_train_quarter_selected = transformed_train_quarter_selected.select(*keep_list)
transformed_test_quarter_selected = transformed_test_quarter_selected.select(*keep_list)

In [0]:
#TRAIN
print("MLP 3 Month Data")
train_quarter_balance_mlpmodel = trainMLPModel(transformed_train_quarter_selected)

train_result = train_quarter_balance_mlpmodel.transform(transformed_train_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_result, 1.0)

print("3 Month Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
#TEST
test_result = train_quarter_balance_mlpmodel.transform(transformed_test_quarter_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_result, 1.0)

print("3 Month Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

metrics = ['f1', 'weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
        print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_result)))
        print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_result)))

### 1 YR

In [0]:
transformed_train_1yr_selected, transformed_pipeline_1yr_selected = create_pipeline_w_feature_select(train_1yr)
transformed_test_1yr_selected = transformed_pipeline_1yr_selected.transform(test_1yr)

transformed_train_1yr_selected = transformed_train_1yr_selected.select(*keep_list)
transformed_test_1yr_selected = transformed_test_1yr_selected.select(*keep_list)

In [0]:
#TRAIN
print("MLP 1 Year Data")
train_1yr_balance_mlpmodel = trainMLPModel(transformed_train_1yr_selected)

train_1yr_result = train_1yr_balance_mlpmodel.transform(transformed_train_1yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_1yr_result, 1.0)

print("1 Year Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
#TEST
test_1yr_result = train_1yr_balance_mlpmodel.transform(transformed_test_1yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_1yr_result, 1.0)

print("1 Year Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
print("1 Year Data Results")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_1yr_result)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_1yr_result)))

### 3 YEARS

In [0]:
transformed_train_3yr_selected, transformed_pipeline_3yr_selected = create_pipeline_w_feature_select(train_3yr)
transformed_test_3yr_selected = transformed_pipeline_3yr_selected.transform(test_3yr)

transformed_train_3yr_selected = transformed_train_3yr_selected.select(*keep_list)
transformed_test_3yr_selected = transformed_test_3yr_selected.select(*keep_list)

In [0]:
#TRAIN
print("MLP 3 Years Data")
train_3yr_balance_mlpmodel = trainMLPModel(transformed_train_3yr_selected)

train_3yr_result = train_3yr_balance_mlpmodel.transform(transformed_train_3yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_3yr_result, 1.0)

print("3 Years Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
#TEST
test_3yr_result = train_3yr_balance_mlpmodel.transform(transformed_test_3yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_3yr_result, 1.0)

print("3 Years Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

In [0]:
#TOTAL RESULTS
print("3 Years Data Results")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol = 'DEP_DEL15', predictionCol = 'prediction', metricName = metric, beta = 1.0)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_3yr_result)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_3yr_result)))

### 5 YEARS

In [0]:
transformed_train_5yr_selected, transformed_pipeline_5yr_selected = create_pipeline_w_feature_select(train_5yr)
transformed_test_5yr_selected = transformed_pipeline_5yr_selected.transform(test_5yr)

transformed_train_5yr_selected = transformed_train_5yr_selected.select(*keep_list)
transformed_test_5yr_selected = transformed_test_5yr_selected.select(*keep_list)

In [0]:
#TRAIN
print("MLP 5 Years Data")
train_5yr_balance_mlpmodel = trainMLPModel(transformed_train_5yr_selected)

train_5yr_result = train_5yr_balance_mlpmodel.transform(transformed_train_5yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(train_5yr_result, 0.5)

print("5 Years Data Train Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

MLP 5 Years Data
feature length: 1024




5 Years Data Train Results
Precision: 0.6302
Recall: 0.6193
F-1: 0.6247
F-1 Beta: 0.6280


In [0]:
#TEST
test_5yr_result = train_5yr_balance_mlpmodel.transform(transformed_test_5yr_selected)

balance_acc, balance_pre, balance_recall, balance_f1, balance_f1_b = evaluate_model(test_5yr_result, 0.5)

print("5 Years Data Test Results")
#print(f"Accuracy: {balance_acc:.4f}")
print(f"Precision: {balance_pre:.4f}")
print(f"Recall: {balance_recall:.4f}")
print(f"F-1: {balance_f1:.4f}")
print(f"F-1 Beta: {balance_f1_b:.4f}")

5 Years Data Test Results
Precision: 0.2808
Recall: 0.6033
F-1: 0.3832
F-1 Beta: 0.3144
