**Flight Delay Prediction** by *FlightLogix*
@author - ibanerjee32

In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline 
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.stat import ChiSquareTest
from pyspark.sql.types import *

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("FlightDelayPrediction").getOrCreate()

In [4]:
# This method is used for loading CSV files from input path
def load_data(data_path, qtr):
    
    #data_path = "../Data/data_by_qtr/Q4.csv"
    #df = sqlContext.read.option("header",True) \
    # .csv(data_file_path)
    
    df = spark.read.option("header", True).csv(data_path)
    #df = df.filter(col("Quarter") == 4)#should we change it to month later
    dp_cnt = df.count()
    attr_cnt = len(df.columns)
    print("Total Number of datapoints :{} and the total number of attributes :{}".format(dp_cnt, attr_cnt))
    
    
    attributes = df.columns
    # Convert the column names to a list 
    attributes_list = list(attributes)

    # Print the column names
    print("attributes are", attributes_list)
    return df

In [5]:
def clean_data(df, isPredict):
    '''
    input: df a dataframe
    output: df a dataframe with a filtered and cleaned subset of attributes
    '''
   
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType, FloatType, TimestampType
    
    filtered_df=df.select(['Year','Month', 'Quarter', 'DayofMonth', 'DayOfWeek', 'Origin', 'Dest',  'ScheduledDepartureTime', \
                           'ScheduledArrivalTime',  'ArrDelayMinutes', 'Cancelled',  \
                           'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay',\
                           'LateAircraftDelay', 'AirlineCommonName']) 
    
    # Create a list of column names in the DataFrame
    columns = filtered_df.columns

    # Use a list comprehension to count null values in each column
    null_counts = [count(when(col(c).isNull(), c)).alias(c) for c in columns]

    # Select the null counts for each column
    null_counts_df = filtered_df.select(null_counts)

    # Show the result
    null_counts_df.show()
    
    nan_pattern = '"""NaN"""'
    # Check for missing (NaN) and "NaN" values
    missing_values = filtered_df.select([count(when(isnan(col) | (trim(col) == lit(nan_pattern)), col)).alias(col)\
                                         for col in filtered_df.columns])

    # Create a list of column names with missing values
    columns_with_missing_values = [col_name for col_name in missing_values.columns \
                                   if missing_values.select(col_name).first()[0] > 0]

    # Print the column names with missing values
    if columns_with_missing_values:
        print("Columns with missing values (NaN or 'NaN'):")
        for col_name in columns_with_missing_values:
            print(col_name)
            # Iterate through the columns and replace missing values
            filtered_df = filtered_df.withColumn(col_name, when((trim(col(col_name)) == lit(nan_pattern)), 0)\
                                              .otherwise(col(col_name)))
            
    else:
        print("No missing values (NaN or 'NaN') found in any column.")
    
    # Drop rows with any null values
    if not isPredict:
        filtered_df = filtered_df.na.drop()
    
    
    # Convert "ScheduledArrivalTime" to a string column and add a leading zero if necessary
    filtered_df = filtered_df.withColumn("ScheduledArrivalTime", expr("LPAD(ScheduledArrivalTime, 4, '0')"))\
                             .withColumn("ScheduledDepartureTime", expr("LPAD(ScheduledDepartureTime, 4, '0')"))

    # Extract the hour component from "ScheduledArrivalTime"
    filtered_df = filtered_df.withColumn("ArrivalHour", substring("ScheduledArrivalTime", 1, 2))\
                             .withColumn("DepartureHour", substring("ScheduledDepartureTime", 1, 2))

    
    # Define time ranges and labels for the categorical feature
    time_ranges = [(0, 11, "morning"), (12, 17, "afternoon"), (18, 23, "evening")]

    # Create binary columns for each time of day category
    for start, end, label in time_ranges:
        filtered_df = filtered_df.withColumn("Arrtimeofday_" + label, when(
            (col("ArrivalHour").cast("int") >= start) & (col("ArrivalHour").cast("int") <= end),
            1
        ).otherwise(0))

        filtered_df = filtered_df.withColumn("Deptimeofday_" + label, when(
            (col("DepartureHour").cast("int") >= start) & (col("DepartureHour").cast("int") <= end),
            1
        ).otherwise(0))
    
    
    # Add a new column "WeekOfMonth" based on "DayofMonth"
    # The number of weeks is calculated based on the integer division by 7.
    filtered_df = filtered_df.withColumn("WeekOfMonth", (filtered_df["DayofMonth"] / 7).cast("int"))
    
    
    
    # Define a dictionary of column names and their corresponding data types
    column_types = {
        "Year": IntegerType(),
        "Month": IntegerType(),
        "Quarter":IntegerType(),
        "DayofMonth": IntegerType(),
        "WeekOfMonth": IntegerType(),
        "DayOfWeek": IntegerType(),
        #"Origin": StringType(),
        #"Dest": StringType(),
        "ScheduledDepartureTime": IntegerType(),
        "ScheduledArrivalTime": IntegerType(),
        "ArrDelayMinutes": IntegerType(),
        "Cancelled": IntegerType(),
        "CarrierDelay": IntegerType(),
        "WeatherDelay":IntegerType(),
        "NASDelay": IntegerType(),
        "SecurityDelay":IntegerType(),
        "LateAircraftDelay":IntegerType()
        #"AirlineCommonName": StringType()
    }

    # Apply the conversions using a loop
    for column_name, data_type in column_types.items():
        filtered_df = filtered_df.withColumn(column_name, col(column_name).cast(data_type))

    
    

    
    return filtered_df

In [6]:
def apply_encoding(df):
    
   
    # Define a list of categorical columns to be encoded, add quarter if needed
    categorical_cols = ["Month"] + \
        ["DayOfWeek", "WeekOfMonth", "Origin", "Dest", "AirlineCommonName"] + \
        ["Deptimeofday_morning", "Deptimeofday_afternoon", "Deptimeofday_evening"] + \
        ["Arrtimeofday_morning", "Arrtimeofday_afternoon", "Arrtimeofday_evening"]



    # Apply one-hot encoding to categorical columns
    indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_cols]
    encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_encoded") for indexer in indexers]

    indexed_df = df
    for indexer in indexers:
        indexed_df = indexer.fit(indexed_df).transform(indexed_df)

    encoded_df = indexed_df
    for encoder in encoders:
        encoded_df = encoder.fit(encoded_df).transform(encoded_df)

    # Define feature columns and create a feature vector, #"Quarter_index_encoded",
    feature_columns = ["Month_index_encoded", \
                       "DayOfWeek_index_encoded", "WeekOfMonth_index_encoded",\
                   "Deptimeofday_morning_index_encoded", "Deptimeofday_afternoon_index_encoded", "Deptimeofday_evening_index_encoded",\
                   "Arrtimeofday_morning_index_encoded", "Arrtimeofday_afternoon_index_encoded", "Arrtimeofday_evening_index_encoded",\
                   "Origin_index_encoded", "Dest_index_encoded", "AirlineCommonName_index_encoded"]
    vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    
    df = vector_assembler.transform(encoded_df)
    
    return df

In [7]:
def analyzeData(df):
    result_df = df.groupBy('Month', 'Year')\
    .agg(
        sum(col('ArrDelayMinutes')).alias('Total_Delay_In_Mins'),
        sum(col('ArrivalDelayed')).alias('Total_Delay'),
        sum((col('Cancelled') == 1).cast('int')).alias('Total_Cancelled'),
        sum((col('ArrivalDelayed') == 1).cast('int')).alias('Sum_ArrivalDelayed_1s'),
        sum((col('ArrivalDelayed') == 0).cast('int')).alias('Count_ArrivalDelayed_0s'),
        sum((col('ArrivalDelayed') == 1).cast('int') + (col('ArrivalDelayed') == 0).cast('int')).alias('Total_Arrivals')
    )\
    .orderBy(col('Month').desc(), col('Year').desc(), col('Total_Delay').desc())

    # Calculate delay and cancelled percentage
    result_df = result_df.withColumn('Delay_Percentage_NC', (col('Total_Delay') / (col('Total_Arrivals') - col('Total_Cancelled'))) * 100)
    result_df = result_df.withColumn('Cancelled_Percentage', (col('Total_Cancelled') / col('Total_Arrivals')) * 100)

    # Select the desired columns
    result_df = result_df.select('Month', 'Year', 'Total_Delay_In_Mins', 'Total_Delay', 'Total_Cancelled', 'Total_Arrivals', 'Delay_Percentage_NC', 'Cancelled_Percentage')

    result_df.show(25)


    
    # Calculate the percentage of 'CarrierDelayed' instances
    total_count = df.count()
    
    arrival_delayed_count = df.filter(col("ArrivalDelayed") == 1).count()

    # Calculate the percentage
    arrival_delayed_percentage = (arrival_delayed_count / total_count) * 100

    print(f"Percentage of 'ArrivalDelayed' instances: {arrival_delayed_percentage:.2f}%")
    
    carrier_delayed_count = df.filter(col("CarrierDelayed") == 1).count()

    # Calculate the percentage
    carrier_delayed_percentage = (carrier_delayed_count / arrival_delayed_count) * 100

    print(f"Percentage of 'CarrierDelayed' instances: {carrier_delayed_percentage:.2f}%")
    
    weather_delayed_count = df.filter(col("WeatherDelayed") == 1).count()

    # Calculate the percentage
    weather_delayed_percentage = (weather_delayed_count / arrival_delayed_count) * 100

    print(f"Percentage of 'WeatherDelayed' instances: {weather_delayed_percentage:.2f}%")
    
    nas_delayed_count = df.filter(col("NASDelayed") == 1).count()
    # Calculate the percentage
    nas_delayed_percentage = (nas_delayed_count / arrival_delayed_count) * 100

    print(f"Percentage of 'NASDelayed' instances: {nas_delayed_percentage:.2f}%")
    
    security_delayed_count = df.filter(col("SecurityDelayed") == 1).count()

    # Calculate the percentage
    security_delayed_percentage = (security_delayed_count / arrival_delayed_count) * 100

    print(f"Percentage of 'SecurityDelayed' instances: {security_delayed_percentage:.2f}%")
    
    lateAircraft_delayed_count = df.filter(col("LateAircraftDelay") == 1).count()
    # Calculate the percentage
    lateAircraft_delayed_percentage = (lateAircraft_delayed_count / arrival_delayed_count) * 100

    print(f"Percentage of 'LateAircraftDelay' instances: {lateAircraft_delayed_percentage:.2f}%")
    

    return

In [8]:
def performRegression(train_data, test_data, regressor, isPredict):    

    # Fit the model to your training data
    reg_model = regressor.fit(train_data)
    
    if not isPredict:
        # Make predictions
        reg_predictions = reg_model.transform(test_data)

        # Evaluate the model using Mean Absolute Error (MAE)
        evaluator = RegressionEvaluator(labelCol="ArrDelayMinutes", predictionCol="prediction", metricName="mae")
        mae = evaluator.evaluate(reg_predictions)
        print("Regressor - Mean Absolute Error (MAE):", mae)

        # Evaluate the model using R-squared
        evaluator = RegressionEvaluator(labelCol="ArrDelayMinutes", predictionCol="prediction", metricName="r2")
        r_squared = evaluator.evaluate(reg_predictions)
        print(' R-squared Score:', r_squared)
    
    return reg_model


In [9]:
def performClassification(train_data, test_data, classifier, resp):    



    # Fit the model to  data
    model = classifier.fit(train_data)

    # use the model to make predictions on new data
    predictions = model.transform(test_data)
    
    

    # Create a BinaryClassificationEvaluator
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=resp)

    # Evaluate the model on the testing data
    area_under_roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    area_under_pr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

    # Print the evaluation results
    print("Area Under ROC: {:.2f}".format(area_under_roc))
    print("Area Under PR: {:.2f}".format(area_under_pr))
    
       

    # Calculate accuracy
    correct_predictions = predictions.filter(expr('prediction = ' +resp))
    accuracy = correct_predictions.count() / predictions.count()
    print("Accuracy: {:.2f}".format(accuracy))

    # Calculate precision
    true_positives = correct_predictions.filter(expr(resp+' = 1'))
    false_positives = predictions.filter(expr('(prediction = 1) AND (' +resp+' = 0)'))
    precision = true_positives.count() / (true_positives.count() + false_positives.count())
    print("Precision: {:.2f}".format(precision))
    
    return model

In [10]:
def performRandomForestClassification(train_data, test_data):
    # Define the Random Forest model
    rf = RandomForestClassifier(labelCol="ArrivalDelayed", featuresCol="features", numTrees=100, maxDepth=6)

    # Create a pipeline
    pipeline = Pipeline(stages=[rf])

    # Fit the model to training data
    model = pipeline.fit(train_data)

    # Make predictions on the test data
    predictions = model.transform(test_data)

    # Evaluate the model
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="ArrivalDelayed")
    area_under_roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    area_under_pr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
    print("Random Forest - Area Under ROC: {:.2f}".format(area_under_roc))
    print("Random Forest - Area Under PR: {:.2f}".format(area_under_pr))
    
    # Calculate accuracy
    correct_predictions = predictions.filter(expr('prediction = ArrivalDelayed'))
    accuracy = correct_predictions.count() / predictions.count()
    print("Accuracy: {:.2f}".format(accuracy))

    # Calculate precision
    true_positives = correct_predictions.filter(expr('ArrivalDelayed = 1'))
    false_positives = predictions.filter(expr('(prediction = 1) AND (ArrivalDelayed = 0)'))
    precision = true_positives.count() / (true_positives.count() + false_positives.count())
    print("Precision: {:.2f}".format(precision))
    
    return model


In [11]:
def performGBClassification(train_data, test_data, resp, isPredict):
    from pyspark.ml.classification import GBTClassifier

    # Define the Gradient Boosting model
    gbt = GBTClassifier(labelCol=resp, featuresCol="features", maxIter=10, maxDepth=5, stepSize=0.1)

    # Create a pipeline
    pipeline = Pipeline(stages=[gbt])

    # Fit the model to training data
    model = pipeline.fit(train_data)
    
    if not isPredict:
        # Make predictions on the test data
        predictions = model.transform(test_data)

        # Evaluate the model
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=resp)
        area_under_roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
        area_under_pr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
        print("Gradient Boosting - Area Under ROC: {:.2f}".format(area_under_roc))
        print("Gradient Boosting - Area Under PR: {:.2f}".format(area_under_pr))

        # Calculate accuracy
        correct_predictions = predictions.filter(expr('prediction = ' +resp))
        accuracy = correct_predictions.count() / predictions.count()
        print("Accuracy: {:.2f}".format(accuracy))

        # Calculate precision
        true_positives = correct_predictions.filter(expr(resp+' = 1'))
        false_positives = predictions.filter(expr('(prediction = 1) AND (' +resp+' = 0)'))
        precision = true_positives.count() / (true_positives.count() + false_positives.count())
        print("Precision: {:.2f}".format(precision))
    
    return model

In [12]:
data_path = "../Data/data_by_qtr/Q1.csv"
Qtr=1
source_df=load_data(data_path, Qtr)
isPredict=False
source_df=clean_data(source_df, isPredict)


test_data_path = "../Data/data_by_qtr/template-2024_Q1.csv"
Qtr=1
unseen_df=load_data(test_data_path, Qtr)
isPredict=True
unseen_df=clean_data(unseen_df,isPredict )

# Adding a 'Dataset' column to distinguish between source and unseen datasets
source_df = source_df.withColumn("Dataset", lit("source"))
unseen_df = unseen_df.withColumn("Dataset", lit("unseen"))

# Combine the source and unseen data
combined_df = source_df.union(unseen_df)
combined_df=apply_encoding(combined_df)

# Separate back into source and unseen data
source_df = combined_df.filter(col("Dataset") == "source").drop("Dataset")
unseen_df = combined_df.filter(col("Dataset") == "unseen").drop("Dataset")

# Create  new binary columns for predictions
source_df = source_df.withColumn("ArrivalDelayed", when(col("ArrDelayMinutes") > 0, 1).otherwise(0)) 
source_df = source_df.withColumn("CarrierDelayed", when(col("CarrierDelay") > 0, 1).otherwise(0))
source_df = source_df.withColumn("WeatherDelayed", when(col("WeatherDelay") > 0, 1).otherwise(0))
source_df = source_df.withColumn("NASDelayed", when(col("NASDelay") > 0, 1).otherwise(0))
source_df = source_df.withColumn("SecurityDelayed", when(col("SecurityDelay") > 0, 1).otherwise(0))
source_df = source_df.withColumn("LateAircraftDelayed", when(col("LateAircraftDelay") > 0, 1).otherwise(0))

Total Number of datapoints :7838605 and the total number of attributes :53
attributes are ['Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek', 'Marketing_Airline_Network', 'DOT_ID_Marketing_Airline', 'Flight_Number_Marketing_Airline', 'Operating_Airline', 'Tail_Number', 'OriginAirportID', 'OriginCityMarketID', 'Origin', 'OriginCityName', 'DestAirportID', 'DestCityMarketID', 'Dest', 'DestCityName', 'ScheduledDepartureTime', 'ActualDepartureTime', 'DepDelay', 'DepartureDelayGroups', 'TaxiOut', 'WheelsOff', 'WheelsOn', 'TaxiIn', 'ScheduledArrivalTime', 'ActualArrivalTime', 'ArrDelay', 'ArrDelayMinutes', 'ArrivalDelayGroups', 'Cancelled', 'CancellationCode', 'Diverted', 'ScheduledElapseTime', 'ActualElapsedTime', 'AirTime', 'Distance', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'FirstDepTime', 'TotalAddGTime', 'LongestAddGTime', 'DivAirportLandings', 'DivReachedDest', 'DivActualElapsedTime', 'DivArrDelay', 'DivDistance', 'Duplicate', 'AirlineCommon

In [13]:
def performTheilUTest(df):
    from pyspark.sql import functions as F
    from pyspark.sql import Window
    from pyspark.sql.window import Window
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType
    # "DayOfWeek_index_encoded", "WeekOfMonth_index_encoded",\
    categorical_features = ["Month_index_encoded", \
                      
                   "Deptimeofday_morning_index_encoded", "Deptimeofday_afternoon_index_encoded", "Deptimeofday_evening_index_encoded",\
                   "Arrtimeofday_morning_index_encoded", "Arrtimeofday_afternoon_index_encoded", "Arrtimeofday_evening_index_encoded",\
                   "Origin_index_encoded", "Dest_index_encoded", "AirlineCommonName_index_encoded"]
    # Create an empty DataFrame to store the results
    # Create an empty DataFrame to store the results with schema
    schema = StructType([
        StructField("Feature1", StringType(), False),
        StructField("Feature2", StringType(), False),
        StructField("Theil's U", DoubleType(), False)
    ])
    results_df = spark.createDataFrame([], schema)

    for feature1 in categorical_features:
        for feature2 in categorical_features:
            if feature1 != feature2:
                # Calculate the Theil's U value
                u = df.groupBy(feature1, feature2).count()
                total_counts = u.groupBy(feature1).agg(F.sum("count").alias("total"))
                u = u.join(total_counts, feature1, "left")
                u = u.withColumn("p_x", F.col("count") / F.col("total"))
                p_y_given_x = Window.partitionBy(feature1)
                u = u.withColumn("p_y_given_x", F.sum("count").over(p_y_given_x) / F.col("total"))
                u = u.withColumn("u", F.col("p_x") * F.log(F.col("p_x") / F.col("p_y_given_x")))
                u = u.groupBy(feature1, feature2).agg(F.sum("u").alias("u"))
                u.show(1)
                u = u.withColumn("u_max", F.log(F.countDistinct(F.col(feature2)).cast("double")))
                u = u.withColumn("theil_u", (u["u_max"] - u["u"]) / u["u_max"])

                # Append the results to the results DataFrame
                results_df = results_df.unionAll(spark.createDataFrame([(feature1, feature2, u.select("theil_u").first()["theil_u"])], results_df.schema))

    # Show the results
    results_df.show()

    return

In [14]:
def performChiSquaredTest(df):
    # Define feature columns and create a feature vector, #"Quarter_index_encoded",
    feature_columns = ["Month_index_encoded", \
                       "DayOfWeek_index_encoded", "WeekOfMonth_index_encoded",\
                   "Deptimeofday_morning_index_encoded", "Deptimeofday_afternoon_index_encoded", "Deptimeofday_evening_index_encoded",\
                   "Arrtimeofday_morning_index_encoded", "Arrtimeofday_afternoon_index_encoded", "Arrtimeofday_evening_index_encoded",\
                   "Origin_index_encoded", "Dest_index_encoded", "AirlineCommonName_index_encoded"]
    # Perform the Chi-Square test for each categorical feature
    chi_square_results = []

    for feature in feature_columns:
        chiSqResult = ChiSquareTest.test(df1, feature, "ArrivalDelayed")
        chi_square_results.append((feature, chiSqResult))

    # Print the results
    for feature, result in chi_square_results:
        print(f"Chi-Square Test for Feature: {feature}")
        result.show(truncate=False)
    
    return

In [15]:
analyzeData(source_df)

In [16]:
#for Qtr 1 and Step 1, uncomment the following
#source_df = source_df.filter(~((col("Year") == 2020) & (col("Month") == 3)))
#for Qtr 2 and Step 1, uncomment the following
#source_df = source_df.filter(~((col("Year") == 2020) & (col("Month") == 4)))
#analyzeData(source_df)
#comment the following for Step 1
source_df = source_df.filter(col("Cancelled") == 0)

#following needed for Step 3 onwards
#if interested in prediction of delay factors, otherwise comment this line
source_df = source_df.filter(col("ArrivalDelayed") == 1)

#source_df.show(1)

#uncomment to check correlation
#performChiSquaredTest(source_df)

# Split the data into training and test sets (70% training, 30% test)
train_data, test_data = source_df.randomSplit([0.7, 0.3], seed=42)



In [17]:
#uncomment below to run any specific model other than GBBoosting, GBBoosting performed slightly better
'''
isPredict=False
from pyspark.ml.regression import LinearRegression
# Create and train a model (e.g., Linear Regression)
lr = LinearRegression(featuresCol="features", labelCol="ArrDelayMinutes")
performRegression(train_data, test_data,lr, isPredict)

from pyspark.ml.regression import DecisionTreeRegressor
# Create a Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol="ArrDelayMinutes")
performRegression(train_data, test_data,dt, isPredict)

from pyspark.ml.regression import RandomForestRegressor
# Create a Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="ArrDelayMinutes")
performRegression(train_data, test_data,rf, isPredict)

'''
from pyspark.ml.regression import GBTRegressor
# Create a Gradient Boosting Regressor # consider LightGBM
gbt = GBTRegressor(featuresCol="features", labelCol="ArrDelayMinutes", maxIter=10)




In [18]:
#uncomment below to run any specific model other than GBBoosting, GBBoosting performed slightly better
#from pyspark.ml.classification import LogisticRegression

# Define the logistic regression model
#logistic_regression = LogisticRegression(featuresCol="features", labelCol="ArrivalDelayed")
#performClassification(train_data, test_data,logistic_regression, 'ArrivalDelayed')

In [19]:
#from pyspark.ml.classification import DecisionTreeClassifier

# Define the Define the DT model
#dtClassifier = DecisionTreeClassifier(featuresCol="features", labelCol="ArrivalDelayed")
#performClassification(train_data, test_data,dtClassifier, 'ArrivalDelayed')

In [20]:
#performRandomForestClassification(train_data, test_data)

In [21]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def extract_prob(v):
    try:
        return float(v[1])  # VectorUDT is of length 2
    except ValueError:
        return None

#get_cancelled_prob = udf(extract_prob, DoubleType())

In [22]:
isPredict=True
#Step1 - Predict Cancellation
#model=performGBClassification(train_data, test_data,'Cancelled',isPredict )
#Step2 - Predict ArrivalDelayed
#model=performGBClassification(train_data, test_data,'ArrivalDelayed',isPredict )
#Step3 - Predict CarrierDelayed
#model=performGBClassification(train_data, test_data,'CarrierDelayed',isPredict )
#Step4 - Predict WeatherDelayed
#model=performGBClassification(train_data, test_data,'WeatherDelayed',isPredict )
#Step5 - Predict NASDelayed
#model=performGBClassification(train_data, test_data,'NASDelayed',isPredict )
#Step6 - Predict ArrDelayMinutes
model=performRegression(train_data, test_data,gbt, isPredict)

source_df=None




# Function to extract the probability of class 1
#get_cancelled_prob = udf(lambda probability: str(probability), StringType())

# Make predictions on the test data
predictions = model.transform(unseen_df)

predictions.show(2, truncate=False)

unseen_df=None
# Extract the probability of class 1 and assign it to 'Cancelled' column
#predictions = predictions.withColumn('Cancelled_Prob', get_cancelled_prob(col('probability')))

#uncomment below 2 lines for Step 1 - 5
'''
predictions=predictions.select(['Year', 'Month', 'Quarter', 'DayofMonth', 'DayOfWeek', 'Origin', 'Dest', \
                                'ScheduledDepartureTime', 'ScheduledArrivalTime', 'ArrDelayMinutes',  \
                                'probability','prediction'])
predictions = predictions.withColumn('probability', col('probability').cast(StringType()))
'''

#uncomment for Step 1
#predictions.coalesce(1).write.csv('Q1_cancel_pred_output', header=True)

#uncomment for Step 2
#predictions.coalesce(1).write.csv('Q1_arrival_delay_pred_output', header=True)

#uncomment for Step 3
#predictions.coalesce(1).write.csv('Q1_carrier_delay_pred_output', header=True)

#uncomment for Step 4
#predictions.coalesce(1).write.csv('Q1_weather_delay_pred_output', header=True)

#uncomment for Step 5
#predictions.coalesce(1).write.csv('Q1_nas_delay_pred_output', header=True)


#uncomment for Step 6

predictions=predictions.select(['Year', 'Month', 'Quarter', 'DayofMonth', 'DayOfWeek', 'Origin', 'Dest', \
                                'ScheduledDepartureTime', 'ScheduledArrivalTime', 'ArrDelayMinutes',  \
                                'prediction'])
predictions.coalesce(1).write.csv('Q1_arr_delay_in_mins_pred_output', header=True)



+----+-----+-------+----------+---------+------+----+----------------------+--------------------+---------------+---------+------------+------------+--------+-------------+-----------------+-----------------+-----------+-------------+--------------------+--------------------+----------------------+----------------------+--------------------+--------------------+-----------+-----------+---------------+-----------------+------------+----------+-----------------------+--------------------------+----------------------------+--------------------------+--------------------------+----------------------------+--------------------------+-------------------+-----------------------+-------------------------+--------------------+------------------+-------------------------------+----------------------------------+------------------------------------+----------------------------------+----------------------------------+------------------------------------+----------------------------------+--------

In [23]:
# Stop the Spark session
spark.stop()