In [1]:
import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objs as go
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
from pyspark.sql import functions as F
from numerize.numerize import numerize
import functions as f
from functools import reduce
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import isnull, when, count, col
from st_pages import Page, show_pages, Section
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC

In [2]:
# Create a SparkSession with appropriate settings
spark = SparkSession.builder \
    .appName("AirlineDelays") \
    .config('spark.master', 'local[*]') \
    .config("spark.default.parallelism", "16") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Define the schema
schema_1991 = StructType([
    StructField('Month', IntegerType(), True),
    StructField('DayofMonth', IntegerType(), True),
    StructField('DayOfWeek', IntegerType(), True),
    StructField('DepTime', DoubleType(), True),
    StructField('UniqueCarrier', StringType(), True),
    StructField('FlightNum', IntegerType(), True),
    StructField('ActualElapsedTime', DoubleType(), True),
    StructField('ArrDelay', DoubleType(), True),
    StructField('DepDelay', DoubleType(), True),
    StructField('Origin', StringType(), True),
    StructField('Dest', StringType(), True),
    StructField('Distance', DoubleType(), True),
    StructField('Cancelled', IntegerType(), True),
    StructField('Diverted', IntegerType(), True),
])

schema_2001 = StructType([
    StructField('Month', IntegerType(), True),
    StructField('DayofMonth', IntegerType(), True),
    StructField('DayOfWeek', IntegerType(), True),
    StructField('DepTime', DoubleType(), True),
    StructField('UniqueCarrier', StringType(), True),
    StructField('FlightNum', IntegerType(), True),
    StructField('TailNum', StringType(), True),
    StructField('ActualElapsedTime', DoubleType(), True),
    StructField('AirTime', DoubleType(), True),
    StructField('ArrDelay', DoubleType(), True),
    StructField('DepDelay', DoubleType(), True),
    StructField('Origin', StringType(), True),
    StructField('Dest', StringType(), True),
    StructField('Distance', IntegerType(), True),
    StructField('TaxiIn', IntegerType(), True),
    StructField('TaxiOut', IntegerType(), True),
    StructField('Cancelled', IntegerType(), True),
    StructField('Diverted', IntegerType(), True),
])

def load_data():
    # Read the CSV files into PySpark DataFrames
    df1 = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("compression", "gzip") \
        .schema(schema_1991) \
        .load('1991_cleaned.csv.gz')

    df2 = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("compression", "gzip") \
        .schema(schema_2001) \
        .load('2001_cleaned.csv.gz')

    return df1, df2

# Load data
df1, df2 = load_data()

df1_agg = df1.withColumn('DELAYED', when(df1['ArrDelay'] <= 0, 0).otherwise(1))
df2_agg = df2.withColumn('DELAYED', when(df2['ArrDelay'] <= 0, 0).otherwise(1))

In [3]:
df1_agg.show(10)

+-----+----------+---------+-------+-------------+---------+-----------------+--------+--------+------+----+--------+---------+--------+-------+
|Month|DayofMonth|DayOfWeek|DepTime|UniqueCarrier|FlightNum|ActualElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|Diverted|DELAYED|
+-----+----------+---------+-------+-------------+---------+-----------------+--------+--------+------+----+--------+---------+--------+-------+
|    1|         1|        2| 1709.0|           US|      112|            155.0|     0.0|     4.0|   TPA| SYR|  1104.0|        0|       0|      0|
|    1|         2|        3| 1704.0|           US|      112|            162.0|     2.0|    -1.0|   TPA| SYR|  1104.0|        0|       0|      1|
|    1|         3|        4| 1705.0|           US|      112|            149.0|   -10.0|     0.0|   TPA| SYR|  1104.0|        0|       0|      0|
|    1|         4|        5| 1709.0|           US|      112|            162.0|     7.0|     4.0|   TPA| SYR|  1104.0|        0|   

In [4]:
def preprocess_data(df, categorical_cols, numerical_cols):
    # Remove null values
    all_columns = list()
    for col_name in df.columns:
        all_columns.append(col_name)
        df = df.filter(col(col_name).isNotNull())

    # Indexing and One-Hot Encoding for categorical columns
    indexers = [
        StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="keep")
        for col_name in categorical_cols
    ]
    # encoders = [
    #     OneHotEncoder(inputCol=col_name + "_index", outputCol=col_name + "_encoded")
    #     for col_name in categorical_cols
    # ]

    # Assemble all encoded categorical columns and numerical columns into a single feature vector
    assembler = VectorAssembler(
        inputCols=[col_name + "_index" for col_name in categorical_cols] + numerical_cols,
        outputCol="features"
    )

    # Assemble all stages into a pipeline
    stages = indexers + [assembler]
    pipeline = Pipeline(stages=stages)

    # Fit the pipeline to the DataFrame
    pipeline_model = pipeline.fit(df)

    # Transform the DataFrame using the fitted pipeline
    transformed_df = pipeline_model.transform(df)

    # transformed_df = transformed_df.drop(*[col_name + "_index" for col_name in categorical_cols] + [col_name + "_encoded" for col_name in categorical_cols])
    transformed_df = transformed_df.drop(*[col_name + "_index" for col_name in categorical_cols])
    
    return transformed_df

In [9]:
# Assuming df is your PySpark DataFrame and categorical_cols is a list of categorical column names
transformed_df = preprocess_data(df1_agg, categorical_cols=["UniqueCarrier", "Origin", "Dest"],
                                     numerical_cols=['Month','DayofMonth','DayOfWeek','DepTime','FlightNum','ActualElapsedTime','DepDelay','Distance','Cancelled','Diverted'])
transformed_df.show(10, truncate=False)

+-----+----------+---------+-------+-------------+---------+-----------------+--------+--------+------+----+--------+---------+--------+-------+-------------------------------------------------------------------+
|Month|DayofMonth|DayOfWeek|DepTime|UniqueCarrier|FlightNum|ActualElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|Diverted|DELAYED|features                                                           |
+-----+----------+---------+-------+-------------+---------+-----------------+--------+--------+------+----+--------+---------+--------+-------+-------------------------------------------------------------------+
|1    |1         |2        |1709.0 |US           |112      |155.0            |0.0     |4.0     |TPA   |SYR |1104.0  |0        |0       |0      |[0.0,30.0,58.0,1.0,1.0,2.0,1709.0,112.0,155.0,4.0,1104.0,0.0,0.0]  |
|1    |2         |3        |1704.0 |US           |112      |162.0            |2.0     |-1.0    |TPA   |SYR |1104.0  |0        |0       |1      |[0.0

In [6]:
sample_df1991 = transformed_df.sample(fraction=0.002, seed=42)

In [7]:
# Split data into training and testing sets
train_data, test_data = sample_df1991.randomSplit([0.8, 0.2], seed=42)

In [8]:
# Create a Gradient-Boosted Tree classifier with the label column set to "DELAY" and the features column set to "FEATURES"
GBT = GBTClassifier(labelCol="DELAYED", featuresCol="features")

# Define the pipeline
GBT_pipeline = Pipeline(stages=[GBT])

# Train the GBT model on the `train_data` DataFrame
GBT_model = GBT.fit(train_data)

IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 1 has 233 values. Consider removing this and other categorical features with a large number of values, or add more training examples.

In [None]:
# Extract feature importance
feature_importance = GBT_model.stages[-1].featureImportances

# Display feature importance
print("Feature Importance:")
for i, importance in enumerate(feature_importance.toArray()):
    print(f"Feature {i}: {importance}")

In [None]:
# Use the trained GBT model to make predictions on the `test_data` DataFrame
GBT_predictions = GBT_model.transform(test_data)

In [None]:
# Model evaluation for accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the evaluator
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="DELAYED", predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = accuracy_evaluator.evaluate(GBT_predictions)

# Convert to percentage
percentage_accuracy = accuracy * 100

print(f"Accuracy: {percentage_accuracy:.2f}%")

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Assuming 'DELAYED' is the target variable
rf = RandomForestClassifier(featuresCol="features", labelCol="DELAYED", numTrees=100)

# Define the pipeline
# rf_pipeline = Pipeline(stages=[rf])

model = rf.fit(transformed_df)

In [None]:
feature_importances = model.stages[-1].featureImportances

In [None]:
# Get feature importances as Python floats
feature_importance = [float(importance) for importance in feature_importances]

# Create a DataFrame to store feature importances
feature_importance_df = spark.createDataFrame([(feature, importance) for feature, importance in zip(transformed_df.columns, feature_importance)],
                                               ["Feature", "Importance"])

# Display the feature importances
feature_importance_df.show(truncate=False)