In [50]:
# Load our Pkgs
# import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, PolynomialExpansion, StandardScaler
from pyspark.sql.functions import col


In [51]:
# Create a SparkSession
spark = SparkSession.builder.appName('pyspark').getOrCreate()

In [52]:
spark

In [53]:
# Read the CSV file
df = spark.read.csv('Combined_Flights_2022.csv', inferSchema=True, header=True)

In [54]:
# Number of rows
num_rows = df.count()

# Number of columns
num_cols = len(df.columns)

print("Number of rows: ", num_rows)
print("Number of columns: ", num_cols)


Number of rows:  4078318
Number of columns:  61


In [55]:
df.show(5)

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|De

In [56]:
df.printSchema()

root
 |-- FlightDate: date (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |

In [57]:
from pyspark.sql.functions import col, sum as _sum

df_agg = df.agg(*[_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
df_agg.show()


+----------+-------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|

In [58]:
from pyspark.sql.functions import col, sum as _sum

# Calculate the percentage of null values in each column
df_agg = df.agg(*[((_sum(col(c).isNull().cast("int")) / df.count()) * 100).alias(c) for c in df.columns])

# Display the results in descending order
df_agg.orderBy([col(c).desc() for c in df.columns]).show(50)


+----------+-------+------+----+---------+--------+----------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------+------------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+------------------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+------------------+--------------------+----------+------------------+------------------+-----------------+-----------------+----------+------------------+------------------+------------------+----------+-------------+------------------+
|

In [59]:
from pyspark.sql.functions import col, first

# Calculate the mode of the 'Tail_Number' column
mode = df.groupBy('Tail_Number').count().orderBy(col('count').desc()).select(first('Tail_Number')).collect()[0][0]

# Check if the mode is null
if mode is not None:
    # Fill null values with the mode
    df = df.fillna({'Tail_Number': mode})
else:
    # Fill null values with a default value or another valid value
    df = df.fillna({'Tail_Number': 'N475HA'})



In [60]:
from pyspark.sql.functions import col

cols = ["AirTime", "ArrivalDelayGroups", "ArrDel15", "ArrDelay", "ActualElapsedTime", "ArrDelayMinutes", "WheelsOn", "TaxiIn", 
"ArrTime", "TaxiOut", "WheelsOff", "DepDelay", "DepDelayMinutes", "DepartureDelayGroups", "DepDel15", "DepTime"]

for c in cols:
    median = df.approxQuantile(c, [0.5], 0.25)[0]
    df = df.fillna({c: median})


In [61]:
df_agg = df.agg(*[_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
df_agg.show()

+----------+-------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|

In [62]:
df.printSchema()

root
 |-- FlightDate: date (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = false)
 |-- DepDelayMinutes: double (nullable = false)
 |-- DepDelay: double (nullable = false)
 |-- ArrTime: double (nullable = false)
 |-- ArrDelayMinutes: double (nullable = false)
 |-- AirTime: double (nullable = false)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = false)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = t

In [None]:

from pyspark.sql import functions as F

df = df.withColumn(
    "DelayGroup",
    F.when(df["Cancelled"], "Cancelled")
    .otherwise(
        F.when(df["DepDelayMinutes"] == 0, "OnTime_Early")
        .otherwise(
            F.when((df["DepDelayMinutes"] > 0) & (df["DepDelayMinutes"] <= 15), "Small_Delay")
            .otherwise(
                F.when((df["DepDelayMinutes"] > 15) & (df["DepDelayMinutes"] <= 45), "Medium_Delay")
                .otherwise(
                    F.when(df["DepDelayMinutes"] > 45, "Large_Delay")
                )
            )
        )
    )
)


In [None]:
unique_values = df.select("DelayGroup").distinct()
unique_values.show()


In [63]:
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer

# Select string columns
string_cols = [f.name for f in df.schema.fields if f.dataType == StringType()]

# Apply StringIndexer to each column
for col in string_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_index")
    df = indexer.fit(df).transform(df)
    df = df.drop(col)
    df = df.withColumnRenamed(col+"_index", col)
    

In [64]:
from pyspark.sql.functions import when, col
from pyspark.sql.types import BooleanType, IntegerType

# Select boolean columns
bool_cols = [f.name for f in df.schema.fields if f.dataType == BooleanType()]

# Apply transformation to each column
for c in bool_cols:
    df = df.withColumn(c, when(col(c) == True, 1).otherwise(0).cast(IntegerType()))


In [65]:
df.printSchema()

root
 |-- FlightDate: date (nullable = true)
 |-- Cancelled: integer (nullable = false)
 |-- Diverted: integer (nullable = false)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = false)
 |-- DepDelayMinutes: double (nullable = false)
 |-- DepDelay: double (nullable = false)
 |-- ArrTime: double (nullable = false)
 |-- ArrDelayMinutes: double (nullable = false)
 |-- AirTime: double (nullable = false)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = false)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DOT_ID_Marketing_Airline: integer (nullable = true)
 |-- Flight_Number_Marketing_Airline: integer (nullable = true)
 |-- DOT_ID_Operating_Airline: integer (nullable = true)
 |-- Flight_Number_Operating_Airline: integer (nullable 

In [66]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Select numeric columns
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DoubleType)]

# Calculate the correlation of 'DepDelay' with all other columns
correlations = {c: df.stat.corr('DepDelay', c) for c in numeric_cols if c != 'DepDelay'}

# Sort the results
sorted_correlations = sorted(correlations.items(), key=lambda x: x[1], reverse=True)

for col, corr in sorted_correlations:
    print(f"{col}: {corr}")


DepDelayMinutes: 0.9979355801214388
ArrDelayMinutes: 0.972251999294954
ArrDelay: 0.9635955887963481
DepartureDelayGroups: 0.8230212756224428
ArrivalDelayGroups: 0.7720932635831529
DepDel15: 0.54935752878461
ArrDel15: 0.5090313555168641
DepTime: 0.1254656182818311
WheelsOff: 0.1158623732377179
DepTimeBlk: 0.06658269835492825
TaxiOut: 0.045915624237954346
ActualElapsedTime: 0.030590810290249848
Tail_Number: 0.026821179963078805
Marketing_Airline_Network: 0.02646299530151832
IATA_Code_Marketing_Airline: 0.02646299530151832
ArrTimeBlk: 0.024761832096876355
AirTime: 0.024163465639409878
CRSElapsedTime: 0.023483846253405355
WheelsOn: 0.023268044320780983
Distance: 0.022458096761029306
ArrTime: 0.015969415970616508
TaxiIn: 0.013126364310944164
Operated_or_Branded_Code_Share_Partners: 0.006454744496213054
DestState: -0.004938433298400229
DestStateName: -0.004938433298400229
Dest: -0.010474595485545218
OriginState: -0.010499964918949018
OriginStateName: -0.010499964918949018
Airline: -0.0113079

In [67]:
# Select the features with correlation greater than 0.1 with 'DepDelay'
selected_features = [col for col, corr in correlations.items() if corr > 0.1]
selected_features

['DepTime',
 'DepDelayMinutes',
 'ArrDelayMinutes',
 'DepDel15',
 'DepartureDelayGroups',
 'WheelsOff',
 'ArrDelay',
 'ArrDel15',
 'ArrivalDelayGroups']

In [68]:
df = df.drop('FlightDate')


In [69]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor

# Assuming that your DataFrame is named 'df' and the feature columns are in a list 'selected_features'

# First, let's assemble the features into a single vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol='features')
df = assembler.transform(df)

# Now, let's split the data into training and testing sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)



In [70]:
# Create a Random Forest Regressor
rf = RandomForestRegressor(featuresCol='features', labelCol='DepDelay', numTrees=5, maxDepth=10, seed=0)

# Train the model
model = rf.fit(train_df)


In [71]:
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt

# Use the model to make predictions
predictions = model.transform(test_df)

# Initialize evaluators
evaluator_mae = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mse")

# Calculate the metrics
mae = evaluator_mae.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)
rmse = sqrt(mse)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")



Mean Absolute Error (MAE): 3.6517883338856367
Mean Squared Error (MSE): 522.2024432193721
Root Mean Squared Error (RMSE): 22.85174923762669


In [72]:
predictions.show(1)

+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+------------------------+-------------------------------+------------------------+-------------------------------+---------------+------------------+------------------+---------------+---------+-------------+----------------+----------------+-------------+-------+--------+--------------------+-------+---------+--------+------+----------+--------+--------+------------------+-------------+------------------+-------+------+----+-------------------------+---------------------------------------+---------------------------+-----------------+---------------------------+-----------+--------------+-----------+---------------+------------+---------+-------------+----------+----------+--------------------+------------------+
|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|CR

In [73]:
predictions.select("DepDelay").show(5)

+--------+
|DepDelay|
+--------+
|    -2.0|
|     0.0|
|     1.0|
|    -5.0|
|    -5.0|
+--------+
only showing top 5 rows



In [74]:
predictions.select("prediction").show(5)

+--------------------+
|          prediction|
+--------------------+
|  -4.466818022406024|
|-0.01661620249837...|
|  1.9664684200816087|
| -5.4402399729581346|
|  -5.762462274505031|
+--------------------+
only showing top 5 rows



In [75]:
train_df.show(1)

+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+------------------------+-------------------------------+------------------------+-------------------------------+---------------+------------------+------------------+---------------+---------+-------------+----------------+----------------+-------------+-------+--------+--------------------+-------+---------+--------+------+----------+--------+--------+------------------+-------------+------------------+-------+------+----+-------------------------+---------------------------------------+---------------------------+-----------------+---------------------------+-----------+--------------+-----------+---------------+------------+---------+-------------+----------+----------+--------------------+
|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|CRSElapsedTime|Actual

In [76]:
# Create a Random Forest Regressor
rf = RandomForestRegressor(featuresCol='features', labelCol='DepDelay', numTrees=25, maxDepth=10, seed=0)

# Train the model
model2 = rf.fit(train_df)

In [77]:
# Use the model to make predictions
predictions = model2.transform(test_df)

# Initialize evaluators
evaluator_mae = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mse")

# Calculate the metrics
mae = evaluator_mae.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)
rmse = sqrt(mse)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")

Mean Absolute Error (MAE): 3.5274226194573437
Mean Squared Error (MSE): 527.264376280514
Root Mean Squared Error (RMSE): 22.962238050340694


In [78]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a RandomForestRegressor
rf = RandomForestRegressor(featuresCol='features', labelCol='DepDelay')

# Define the parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [5, 10, 12])  # number of trees
             .addGrid(rf.maxDepth, [5, 10, 12])  # maximum depth
             .addGrid(rf.maxBins, [32, 64, 100])  # max number of bins
             .build())

# Define a regression evaluator
evaluator = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="rmse")

# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Run cross-validation, and choose the best set of parameters. 
cvModel = cv.fit(train_df)



In [79]:
# from pyspark.ml.regression import RandomForestRegressor

# # Create a RandomForestRegressor
# rf1 = RandomForestRegressor(
#     numTrees=5,  # number of trees
#     maxDepth=10,  # maximum depth
#     minInstancesPerNode=2,  # min_samples_leaf equivalent in PySpark
#     minInfoGain=0.0,  # min_samples_split equivalent in PySpark
#     maxMemoryInMB=256,  # max_features equivalent in PySpark
#     seed=0  # for reproducibility
# )
# # Train the model
# model3 = rf1.fit(train_df)


In [80]:
# # Use the model to make predictions
# predictions = model3.transform(test_df)

# # Initialize evaluators
# evaluator_mae = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mae")
# evaluator_mse = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="mse")

# # Calculate the metrics
# mae = evaluator_mae.evaluate(predictions)
# mse = evaluator_mse.evaluate(predictions)
# rmse = sqrt(mse)

# print(f"Mean Absolute Error (MAE): {mae}")
# print(f"Mean Squared Error (MSE): {mse}")
# print(f"Root Mean Squared Error (RMSE): {rmse}")

# Model Save

In [81]:
# Save the model
# model2.write().overwrite().save(r"D:\CDAC_DBDA\Final Project\Model_In_Pyspark")


In [82]:
# from pyspark.ml.regression import RandomForestRegressionModel

# # Load the model
# loaded_model = RandomForestRegressionModel.load("/path/to/save/model")


# XGB

In [83]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Define the parameters for the GBTRegressor model
gbt = GBTRegressor(featuresCol="features", labelCol="DepDelay", maxDepth=6)

# Train the model
model = gbt.fit(train_df)

# Make predictions on the test data
predictions = model.transform(test_df)

# Initialize evaluator
evaluator = RegressionEvaluator(labelCol="DepDelay", predictionCol="prediction", metricName="rmse")

# Calculate RMSE
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")



Root Mean Squared Error (RMSE) on test data: 22.284265394107724


In [85]:
spark.stop()
# 