# W261 Final Project - Experiment, Fine-Tune, Select the Optimal Pipeline (Coding Notebook - Data Preprocessing + EDA + Dimensionality Reduction - 5 Year)
## Section 02, Team 1: Aimee, Dylan, Jo, Vicky

## Data Load for the Project Phase 3

OTPW Data (five year dataset): This is our joined data (We joined Airlines and Weather), the main dataset for the project. Location `dbfs:/mnt/mids-w261/OTPW_60M/OTPW_60M/`.

In [0]:
# check the folder path to load data
display(dbutils.fs.ls("dbfs:/mnt/mids-w261/OTPW_60M/OTPW_60M/"))

In [0]:
# read datasets
section = "02"
number = "01"
folder_path = f"dbfs:/student-groups/Group_{section}_{number}"

df_otpw_60m = spark.read.parquet(f"{folder_path}/otpw_60m.parquet")

In [0]:
len(df_otpw_60m.columns)


In [0]:
print(df_otpw_60m.count())

## EDA on OTPW

In [0]:
# import necessary libraries
import pandas as pd
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import col

pd.set_option('display.max_columns', None)

df_otpw_60m.limit(10).toPandas().head()

In [0]:
df_otpw_60m.printSchema()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# sort by station and date and select the column names that contain "Hourly"
display(df_otpw_60m.sort(col("STATION"), col("DATE")).select(['DATE', 'STATION']+[col for col in df_otpw_60m.columns if "Hourly" in col]))

# Ensure DATE is a timestamp
df_otpw_60m = df_otpw_60m.withColumn("DATE", F.col("DATE").cast("timestamp"))

# Define window partitioned by STATION and ordered by DATE
window_spec = Window.partitionBy("STATION").orderBy("DATE")

for col_name in [col for col in df_otpw_60m.columns if "Hourly" in col]:
    # Get the previous timestamp and value within each station
    prev_time = F.lag("DATE").over(window_spec)
    prev_value = F.lag(col_name).over(window_spec)

    # Calculate time difference in hours
    time_diff = (F.unix_timestamp("DATE") - F.unix_timestamp(prev_time)) / 3600

    # Forward fill only if the previous timestamp is within 6 hours
    df_otpw_60m = df_otpw_60m.withColumn(
        col_name,
        F.when(time_diff <= 6, F.coalesce(F.col(col_name), prev_value)).otherwise(F.col(col_name))
    )

# Show the updated DataFrame
display(df_otpw_60m.sort(F.col("STATION"), F.col("DATE")).select(['DATE', 'STATION']+[col for col in df_otpw_60m.columns if "Hourly" in col]))


In [0]:
df_otpw_60m = df_otpw_60m.withColumn("HourlyPrecipitation", F.when(F.col("HourlyPrecipitation") == "T", "0.001").otherwise(F.col("HourlyPrecipitation")))

# cast HourlyPrecipitation to double
df_otpw_60m = df_otpw_60m.withColumn("HourlyPrecipitation", F.col("HourlyPrecipitation").cast("double"))

print("HourlyPrecipitation null counts",df_otpw_60m.filter(F.col("HourlyPrecipitation").isNull()).count())

In [0]:
# For the HourlyPressureChange Column, remove the "+" and cast as a double
df_otpw_60m = df_otpw_60m.withColumn("HourlyPressureChange", F.regexp_replace("HourlyPressureChange", "\+", "").cast("double"))


# get unique values in the HourlyPressureChange column s
display(df_otpw_60m.select("HourlyPressureChange").distinct())

df_otpw_60m.filter(F.col("HourlyPressureChange").isNull()).count()

In [0]:
# Define a window partitioned by ORIGIN_CITY_MARKET_ID over historical data
window_spec = Window.partitionBy("ORIGIN_CITY_MARKET_ID")

hourly_cols_to_fill = ["HourlyPrecipitation", "HourlySeaLevelPressure", "HourlyWindGustSpeed", "HourlyPressureChange", "HourlyDewPointTemperature", "HourlyDryBulbTemperature", "HourlyStationPressure", "HourlyVisibility",'HourlyWetBulbTemperature','HourlyWindSpeed']


for column in hourly_cols_to_fill:#['HourlyPrecipitation','HourlySeaLevelPressure','HourlyWindGustSpeed','HourlyPressureChange']:
    df_otpw_60m = df_otpw_60m.withColumn(column, F.col(column).cast("double"))

    # Compute the average HourlyPrecipitation for each ORIGIN based on data before 2019
    df_otpw_60m = df_otpw_60m.withColumn(
        "avg_" + column,
        F.avg(F.when(F.col("DATE") < "2019-01-01 00:00:00", F.col(column)))
        .over(window_spec)
    )

    # Fill null values in the later dataset (DATE >= "2019-01-01") with the computed average
    df_otpw_60m = df_otpw_60m.withColumn(
        column,
        F.when(
            (F.col("DATE") >= "2019-01-01 00:00:00") & F.col(column).isNull(),
            F.col("avg_" + column)
        ).otherwise(F.col(column))
    )

    # Drop the helper column
    df_otpw_60m = df_otpw_60m.drop("avg_" + column)

# Calculate the average values for the columns
avg_hourly_precipitation = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyPrecipitation")).first()[0]
avg_hourly_sea_level_pressure = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlySeaLevelPressure")).first()[0]
avg_hourly_wind_gust_speed = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyWindGustSpeed")).first()[0]
avg_hourly_pressure_change = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyPressureChange")).first()[0]
avg_hourly_dew_point_temp = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyDewPointTemperature")).first()[0]
avg_hourly_dry_bulb_temp = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyDryBulbTemperature")).first()[0]
avg_hourly_station_pressure = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyStationPressure")).first()[0]
avg_hourly_visibility = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyVisibility")).first()[0]
avg_hourly_wet_bulb_temp = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyWetBulbTemperature")).first()[0]
avg_hourly_wind_speed = df_otpw_60m.filter(col("DATE") < "2019-01-01 00:00:00").select(F.avg("HourlyWindSpeed")).first()[0]



# Fill in null values with the calculated averages
df_otpw_60m = df_otpw_60m.fillna(
    {
        'HourlyPrecipitation': avg_hourly_precipitation,
        'HourlySeaLevelPressure': avg_hourly_sea_level_pressure,
        'HourlyWindGustSpeed': avg_hourly_wind_gust_speed,
        'HourlyPressureChange': avg_hourly_pressure_change,
        'HourlyDewPointTemperature': avg_hourly_dew_point_temp,
        'HourlyDryBulbTemperature': avg_hourly_dry_bulb_temp,
        'HourlyStationPressure': avg_hourly_station_pressure,
        'HourlyVisibility': avg_hourly_visibility,
        'HourlyWetBulbTemperature': avg_hourly_wet_bulb_temp,
        'HourlyWindSpeed': avg_hourly_wind_speed
        
    }
)


# print the number of null values in the HourlyPrecipitation Column
print("HourlyPrecipitation null counts",df_otpw_60m.filter(F.col("HourlyPrecipitation").isNull()).count())
print("HourlySeaLevelPressure null counts",df_otpw_60m.filter(F.col("HourlySeaLevelPressure").isNull()).count())
print("HourlyWindGustSpeed null counts",df_otpw_60m.filter(F.col("HourlyWindGustSpeed").isNull()).count())
print("HourlyPressureChange null counts",df_otpw_60m.filter(F.col("HourlyPressureChange").isNull()).count())
print("HourlyDewPointTemperature null counts",df_otpw_60m.filter(F.col("HourlyDewPointTemperature").isNull()).count())
print("HourlyDryBulbTemperature null counts",df_otpw_60m.filter(F.col("HourlyDryBulbTemperature").isNull()).count())
print("HourlyStationPressure null counts",df_otpw_60m.filter(F.col("HourlyStationPressure").isNull()).count())
print("HourlyVisibility null counts",df_otpw_60m.filter(F.col("HourlyVisibility").isNull()).count())
print("HourlyWetBulbTemperature null counts",df_otpw_60m.filter(F.col("HourlyWetBulbTemperature").isNull()).count())
print("HourlyWindSpeed null counts",df_otpw_60m.filter(F.col("HourlyWindSpeed").isNull()).count())


In [0]:
# get unique values in the HourlyPressureTendency column
df_otpw_60m.select("HourlyPressureTendency").distinct().show()

# count null values in the HourlyPressureTendency column
df_otpw_60m.filter(F.col("HourlyPressureTendency").isNull()).select("HourlyPressureTendency").count()

# fill null values with -1 as an indicator that the value is not available
df_otpw_60m = df_otpw_60m.withColumn("HourlyPressureTendency", F.when(F.col("HourlyPressureTendency").isNull(), -1).otherwise(F.col("HourlyPressureTendency")))

In [0]:
# Data description 2.- Data size and source.
print(f"There are {df_otpw_60m.count()} rows and {len(df_otpw_60m.columns)} columns in our dataset")

# data source https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FGJ

In [0]:
# count null values per column - DS
null_counts = {column: df_otpw_60m.filter(F.col(column).isNull()).select(column).count() for column in df_otpw_60m.columns}
null_counts_df = pd.DataFrame(list(null_counts.items()), columns=['column_name', 'null_count'])
null_counts_df = null_counts_df.sort_values(by=['null_count'],ascending=False)

In [0]:
fig,ax = plt.subplots(2,1,figsize=(15,10))

threshold = 0.5 * df_otpw_60m.count()
ax[0].bar(null_counts_df['column_name'], null_counts_df['null_count'])
ax[0].axhline(y=round(threshold,0), color='r', linestyle='--', label=f'Threshold = {round(threshold,0)}')
ax[0].set_xlabel('Column')
ax[0].set_ylabel('Null Count')
ax[0].set_title('Null Counts per Column (60m)')
ax[0].set_xticks([])  # Remove x-axis ticks
ax[0].legend()
ax[0].ticklabel_format(style='plain', axis='y')

# Remove columns with more than 80% null values
cols_to_drop = [c for c in df_otpw_60m.columns if df_otpw_60m.filter(F.col(c).isNull()).count() > threshold]
df_otpw_60m = df_otpw_60m.drop(*cols_to_drop)

# recalc null counts
null_counts = {column: df_otpw_60m.filter(F.col(column).isNull()).select(column).count() for column in df_otpw_60m.columns}
null_counts_df = pd.DataFrame(list(null_counts.items()), columns=['column_name', 'null_count'])
null_counts_df = null_counts_df.sort_values(by=['null_count'],ascending=False)

ax[1].bar(null_counts_df['column_name'], null_counts_df['null_count'])
ax[1].axhline(y=round(threshold,0), color='r', linestyle='--', label=f'Threshold = {round(threshold,0)}')
ax[1].set_xlabel('Column')
ax[1].set_ylabel('Null Count')
ax[1].set_title('Null Counts per Column after Dropping Columns (60m)')
ax[1].set_xticks([])  # Remove x-axis ticks
ax[1].legend()
ax[1].ticklabel_format(style='plain', axis='y')

plt.show()

In [0]:
# DS Data Cleaning and Preprocessing
from pyspark.sql import functions as F
from pyspark.sql import types

# List of columns to cast to double
cast_cols_to_double = ["DEP_DELAY", "DEP_DELAY_NEW", "DEP_DELAY_GROUP", "DISTANCE", "AIR_TIME",
                   "origin_station_lat","origin_station_lon","origin_airport_lat","origin_airport_lon","origin_station_dis",
                   "dest_station_lat","dest_station_lon","dest_airport_lat","dest_airport_lon","dest_station_dis",
                   "LATITUDE","LONGITUDE","ELEVATION","HourlyAltimeterSetting","HourlyDewPointTemperature","HourlyDryBulbTemperature","HourlyPrecipitation",
                   "HourlyPressureTendency","HourlyRelativeHumidity","HourlySeaLevelPressure","HourlyStationPressure","HourlyVisibility",	"HourlyWetBulbTemperature","HourlyWindDirection","HourlyWindSpeed","HourlyWindGustSpeed"]

# Cast columns to double
for col in cast_cols_to_double:
    if col in df_otpw_60m.columns:
        df_otpw_60m = df_otpw_60m.withColumn(col, F.col(col).cast("double"))

# Cast other columns to appropriate data types

cast_cols_to_timestamp = ["FL_DATE","sched_depart_date_time","sched_depart_date_time_UTC",
                           "four_hours_prior_depart_UTC","two_hours_prior_depart_UTC","DATE","WindEquipmentChangeDate"]

# Cast columns to timestamp
for col in cast_cols_to_timestamp:
    if col in df_otpw_60m.columns:
        df_otpw_60m = df_otpw_60m.withColumn(col, F.col(col).cast("timestamp"))

# Cast columns to int
cast_cols_to_int = ["TAXI_OUT", "TAXI_IN", "DEP_DEL15",'FLIGHTS']

for col_name in cast_cols_to_int:
    if col in df_otpw_60m.columns:
        df_otpw_60m = df_otpw_60m.withColumn(col_name, F.col(col_name).cast("int"))

# Fill null values because null means there was not a delay. In other words, 0 minutes delay
# df_otpw_12m = df_otpw_12m.fillna(0, subset=['CARRIER_DELAY','WEATHER_DELAY','NAS_DELAY','SECURITY_DELAY','LATE_AIRCRAFT_DELAY'])

# NOTE TO SELF, CIRLCE BACK TO CLEAN HourlyPressureChange. NEED TO DEAL WITH '+' '-'
# HourlySkyConditions,REM MAYBE FOR TEXT PROCESSING?

In [0]:
print("HourlyPrecipitation",df_otpw_60m.filter(F.col("HourlyPrecipitation").isNull()).count())
print("HourlySeaLevelPressure",df_otpw_60m.filter(F.col("HourlySeaLevelPressure").isNull()).count())
print("HourlyWindGustSpeed",df_otpw_60m.filter(F.col("HourlyWindGustSpeed").isNull()).count())
print("HourlyPressureTendency",df_otpw_60m.filter(F.col("HourlyPressureTendency").isNull()).count())
print("HourlyDewPointTemperature",df_otpw_60m.filter(F.col("HourlyDewPointTemperature").isNull()).count())
print("HourlyDryBulbTemperature",df_otpw_60m.filter(F.col("HourlyDryBulbTemperature").isNull()).count())
print("HourlyStationPressure",df_otpw_60m.filter(F.col("HourlyStationPressure").isNull()).count())
print("HourlyVisibility",df_otpw_60m.filter(F.col("HourlyVisibility").isNull()).count())
print("HourlyWetBulbTemperature",df_otpw_60m.filter(F.col("HourlyWetBulbTemperature").isNull()).count())
print("HourlyWindSpeed",df_otpw_60m.filter(F.col("HourlyWindSpeed").isNull()).count())

In [0]:
# write dictionary with list of redundant columns, keys are the columns we will be keeping
redundant_columns = {
    "OP_CARRIER_AIRLINE_ID" : ["OP_CARRIER","OP_UNIQUE_CARRIER"], # these are duplicate columns, relationship is 1:1 between ID and carrier
    "ORIGIN_AIRPORT_ID" : ["ORIGIN_AIRPORT_SEQ_ID","origin_airport_name","origin_station_name","origin_station_id","origin_region","NAME","ORIGIN_CITY_NAME"],  # ORIGIN_AIRPORT_SEQ_ID is the same with a marker for point in time
    "ORIGIN_CITY_MARKET_ID" : [""], # including to group airports that service the same market
    "ORIGIN" : ["origin_iata_code","origin_icao"], # these are duplicate columns, i.e. LGA = LGA
    "ORIGIN_STATE_ABR" : ["ORIGIN_STATE_NM"], # one is just the abbreviation
    "DEST_AIRPORT_ID" : ["DEST_AIRPORT_SEQ_ID","dest_airport_name","dest_station_name","dest_station_id","dest_region","DEST_CITY_NAME"],  # DEST_AIRPORT_SEQ_ID is the same with a marker for point in time
    "DEST_CITY_MARKET_ID" : [], # including to group airports that service the same market
    "DEST_STATE_ABR" : ["DEST_STATE_NM"], # these are duplicate columns, i.e. LGA = LGA
    "DEP_DELAY" : ["CRS_DEP_TIME","DEP_TIME","DEP_TIME_BLK","DEP_DELAY_GROUP",
                   "TAXI_OUT","WHEELS_OFF","WHEELS_ON","TAXI_IN","CRS_ARR_TIME",
                   "ARR_TIME","ARR_DELAY","ARR_DELAY_NEW","ARR_DEL15","ARR_DELAY_GROUP","ARR_TIME_BLK","CRS_ELAPSED_TIME","ACTUAL_ELAPSED_TIME",
                   "CARRIER_DELAY","WEATHER_DELAY","NAS_DELAY","SECURITY_DELAY","LATE_AIRCRAFT_DELAY"], # dep delay and related columns that are not relevant such as arrival time variables and taxi/wheels information we wouldn't have 2 hours prior
    "DEL_DELAY_NEW": [], # keeping DEP_DELAY_NEW for potential target variable
    "DEST" : ["dest_iata_code","dest_icao"], # these are duplicate columns, i.e. LGA = LGA
    "DEST_STATE_ABR" : ["DEST_STATE_NM"], # one is just the abbreviation,
    "DATE": ["QUARTER","DAY_OF_MONTH","DAY_OF_WEEK","FL_DATE","YEAR"], # keep "MONTH"
    "MISCELLANEOUS": ["REPORT_TYPE","SOURCE","REM","BackupDirection","BackupDistance","BackupDistanceUnit","BackupElements","BackupElevation","BackupEquipment",	"BackupLatitude","BackupLongitude","BackupName","WindEquipmentChangeDate","_row_desc"]
}

In [0]:
import itertools
cols_to_drop = list(itertools.chain.from_iterable(redundant_columns.values()))
df_otpw_60m = df_otpw_60m.drop(*cols_to_drop)

In [0]:
from pyspark.sql.functions import col, countDistinct

# Filter out the missing values in the target variable
df_otpw_60m = df_otpw_60m.filter(
    (col("DEP_DEL15").isNotNull()) & 
    (col("CANCELLED").cast("int") != 1)
)

# Find columns with only 0 or 1 unique values
cols_to_drop_60m = [
    c for c in df_otpw_60m.columns 
    if df_otpw_60m.select(col(c)).distinct().count() <= 1
]

# Drop these columns
df_otpw_60m = df_otpw_60m.drop(*cols_to_drop_60m)

# Show dropped columns
print("Dropped columns in 60-month dataset:", cols_to_drop_60m)

In [0]:
df_otpw_60m.write.mode("overwrite").parquet(f"{folder_path}/otpw_60m_preprocess.parquet")

## Feature Engineering

###Feature 1: Previous Flight Delay Indicator

In [0]:
section = "02"
number = "01"
folder_path = f"dbfs:/student-groups/Group_{section}_{number}"

df_otpw_60m = spark.read.parquet(f"{folder_path}/otpw_60m_preprocess.parquet")

In [0]:
df_otpw_60m.count()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, when, to_timestamp, unix_timestamp

#converting sched_depart_date_time_UTC to timestamp
df_otpw_60m = df_otpw_60m.withColumn("sched_depart_date_time_UTC", to_timestamp(F.col("sched_depart_date_time_UTC"), "yyyy-MM-dd'T'HH:mm:ss"))

#creating window for previous flight delay by grouping by tail number
window = Window.partitionBy("TAIL_NUM").orderBy(F.col("sched_depart_date_time_UTC"))

#getting arrival delay of previous flight as well as previous flight time
df_otpw_60m = df_otpw_60m.withColumn("prev_flight_delay_raw", lag(F.col("DEP_DELAY_NEW"), 1).over(window))
df_otpw_60m = df_otpw_60m.withColumn("prev_flight_time", lag(F.col("sched_depart_date_time_UTC"), 1).over(window))

#calculate time difference so we only take previous flight delay if the flight is two hours before
df_otpw_60m = df_otpw_60m.withColumn(
    "time_diff_seconds",
    unix_timestamp(F.col("sched_depart_date_time_UTC")) - unix_timestamp(F.col("prev_flight_time"))
    )

#filtering out flights that are not two hours before
df_otpw_60m = df_otpw_60m.withColumn(
    "prev_flight_delay",
    when(F.col("time_diff_seconds") >= 7200, F.col("prev_flight_delay_raw"))
    .otherwise(0)
)

df_otpw_60m = df_otpw_60m.withColumn(
    "prev_flight_delay_ind",
    when(F.col("prev_flight_delay") > 0, 1)
    .otherwise(0)
)

df_otpw_60m = df_otpw_60m.drop("prev_flight_delay_raw", "prev_flight_delay", "prev_flight_time", "time_diff_seconds")

df_otpw_60m.display(10)

In [0]:
#verifying prev_flight_delay results
df_otpw_60m.filter(df_otpw_60m.TAIL_NUM == "N102UW").select("sched_depart_date_time_UTC", "DEP_DELAY_NEW", "prev_flight_delay_ind").orderBy("sched_depart_date_time_UTC").display(10)

In [0]:
# EDA on Feature 1
# Plot distribution of prev_flight_delay_ind
prev_delay_count = df_otpw_60m.groupBy("prev_flight_delay_ind").count().toPandas()

# Plot
sns.barplot(data=prev_delay_count, x="prev_flight_delay_ind", y="count", palette="Set2")
plt.title("Distribution of Previous Flight Delay Indicator (5 Years)")
plt.xlabel("Previous Flight Delay Indicator")
plt.ylabel("Number of Flights")
plt.xticks([0, 1], ["No Previous Delay", "Previous Delay"])
plt.tight_layout()
plt.show()


### Feature 2: Holiday Indicator

In [0]:
from pyspark.sql.functions import to_date, col, lit, when
from datetime import datetime, timedelta

# Step 1: Create a Python set of buffered holiday dates (±2 days)
us_holidays = {
    2015: ["2015-01-01", "2015-01-19", "2015-02-16", "2015-05-25", "2015-07-04",
           "2015-09-07", "2015-10-12", "2015-11-11", "2015-11-26", "2015-12-25"],
    2016: ["2016-01-01", "2016-01-18", "2016-02-15", "2016-05-30", "2016-07-04",
           "2016-09-05", "2016-10-10", "2016-11-11", "2016-11-24", "2016-12-25"],
    2017: ["2017-01-01", "2017-01-16", "2017-02-20", "2017-05-29", "2017-07-04",
           "2017-09-04", "2017-10-09", "2017-11-11", "2017-11-23", "2017-12-25"],
    2018: ["2018-01-01", "2018-01-15", "2018-02-19", "2018-05-28", "2018-07-04",
           "2018-09-03", "2018-10-08", "2018-11-11", "2018-11-22", "2018-12-25"],
    2019: ["2019-01-01", "2019-01-21", "2019-02-18", "2019-05-27", "2019-07-04",
           "2019-09-02", "2019-10-14", "2019-11-11", "2019-11-28", "2019-12-25"]
}

# Step 2: Generate buffered holiday dates as strings
holiday_buffer_dates = set()
for year, holidays in us_holidays.items():
    for h in holidays:
        h_date = datetime.strptime(h, "%Y-%m-%d")
        for offset in range(-2, 3):  # from -2 to +2 days
            date_str = (h_date + timedelta(days=offset)).strftime("%Y-%m-%d")
            holiday_buffer_dates.add(date_str)

# Step 3: Convert sched_depart_date_time_UTC to date (if not already)
df_otpw_60m = df_otpw_60m.withColumn(
    "sched_depart_date", 
    to_date(col("sched_depart_date_time_UTC"))
)

# Step 4: Create is_holiday column by checking membership in buffered date list
df_otpw_60m = df_otpw_60m.withColumn(
    "is_holiday",
    when(col("sched_depart_date").cast("string").isin(list(holiday_buffer_dates)), lit(1)).otherwise(lit(0))
)

# Optional: Drop helper column if not needed
df_otpw_60m = df_otpw_60m.drop("sched_depart_date")
df_otpw_60m.select("sched_depart_date_time_UTC", "is_holiday").show(5)


In [0]:
# Verifying is_holiday results
df_otpw_60m.select("sched_depart_date_time_UTC", "DEP_DELAY_NEW", "is_holiday") \
    .orderBy("sched_depart_date_time_UTC") \
    .display(10)

In [0]:
# EDA on Feature 2
from pyspark.sql.functions import month

# Holiday vs. Non-Holiday Flight Counts
df_otpw_60m.groupBy("is_holiday").count().orderBy("is_holiday").display()

# Average Delay times by Holiday Indicator
df_otpw_60m.groupBy("is_holiday") \
    .agg(F.avg("DEP_DELAY_NEW").alias("avg_dep_delay"),
         F.stddev("DEP_DELAY_NEW").alias("stddev_dep_delay")) \
    .orderBy("is_holiday") \
    .display()

# Proportion of Delayed Flights Near Holidays
df_otpw_60m.groupBy("is_holiday") \
    .agg(F.avg("DEP_DEL15").alias("prop_delayed_15min")) \
    .orderBy("is_holiday") \
    .display()

# Monthly Distribution of Holiday Flights
df_otpw_60m.withColumn("month", month("sched_depart_date_time_UTC")) \
    .groupBy("month", "is_holiday") \
    .count() \
    .orderBy("month", "is_holiday") \
    .display()   


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Holiday vs Delay Rate (DEP_DEL15)
df_otpw_60m.groupBy("is_holiday").agg(
    F.count("*").alias("total_flights"),
    F.sum("DEP_DEL15").alias("num_delayed"),
    (F.sum("DEP_DEL15") / F.count("*")).alias("delay_rate")
).show()


# Plot Delay Rates by Holiday Indicator
# Convert to pandas for visualization
holiday_delay_pd = df_otpw_60m.groupBy("is_holiday").agg(
    F.count("*").alias("total_flights"),
    F.sum("DEP_DEL15").alias("num_delayed"),
    (F.sum("DEP_DEL15") / F.count("*")).alias("delay_rate")
).toPandas()

plt.figure(figsize=(6, 4))
sns.barplot(data=holiday_delay_pd, x="is_holiday", y="delay_rate", palette="Set2")
plt.xticks([0, 1], ["Non-Holiday", "Holiday (±2 Days)"])
plt.ylabel("Delay Rate")
plt.title("Flight Delay Rate by Holiday Indicator (5 Year)")
plt.tight_layout()
plt.show()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F

# Group by holiday indicator and calculate average DEP_DELAY_NEW (delay time difference)
holiday_delay_diff = df_otpw_60m.groupBy("is_holiday").agg(
    F.avg("DEP_DELAY_NEW").alias("avg_delay_time_diff")
).toPandas()

# Map for better labels
holiday_delay_diff["is_holiday"] = holiday_delay_diff["is_holiday"].map({0: "Non-Holiday", 1: "Holiday"})

# Plot average delay time difference (DEP_DELAY_NEW)
plt.figure(figsize=(6, 4))
ax = sns.barplot(data=holiday_delay_diff, x="is_holiday", y="avg_delay_time_diff", palette="Set2")

# Add annotations for the average delay times
for p in ax.patches:
    height = p.get_height()
    ax.annotate(f'{height:.2f}',  # Format as two decimal places
                (p.get_x() + p.get_width() / 2., height), 
                ha='center', va='center', fontsize=12, color='black', fontweight='bold')

plt.ylabel("Average Delay Time In Minutes")
plt.xlabel("Holiday Indicator")
plt.title("Average Delay Time by Holiday Indicator (5 Year)")
plt.xticks([0, 1], ["Non-Holiday", "Holiday (±2 Days)"])
plt.tight_layout()
plt.show()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F

# Group by holiday indicator and calculate average DEP_DELAY_NEW (delay time difference)
holiday_delay_diff = df_otpw_60m.groupBy("is_holiday").agg(
    F.avg("DEP_DELAY_NEW").alias("avg_delay_time_diff")
).toPandas()

# Map for better labels
holiday_delay_diff["is_holiday"] = holiday_delay_diff["is_holiday"].map({0: "Non-Holiday", 1: "Holiday"})

# Group by holiday indicator and calculate delay rates
holiday_delay_pd = df_otpw_60m.groupBy("is_holiday").agg(
    F.count("*").alias("total_flights"),
    F.sum("DEP_DEL15").alias("num_delayed"),
    (F.sum("DEP_DEL15") / F.count("*")).alias("delay_rate")
).toPandas()

# Set up the figure for side-by-side plots
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Plot 1: Average Delay Time by Holiday Indicator
ax1 = axes[0]
sns.barplot(data=holiday_delay_diff, x="is_holiday", y="avg_delay_time_diff", palette="Set2", ax=ax1)

# Add annotations for the average delay times on the first plot
for p in ax1.patches:
    height = p.get_height()
    ax1.annotate(f'{height:.2f}', 
                 (p.get_x() + p.get_width() / 2., height), 
                 ha='center', va='center', fontsize=12, color='black', fontweight='bold')

ax1.set_ylabel("Average Delay Time In Minutes")
ax1.set_xlabel("Holiday Indicator")
ax1.set_title("Average Delay Time by Holiday Indicator (5 Year)")
ax1.set_xticklabels(["Non-Holiday", "Holiday (±2 Days)"])

# Plot 2: Delay Rate by Holiday Indicator
ax2 = axes[1]
sns.barplot(data=holiday_delay_pd, x="is_holiday", y="delay_rate", palette="Set2", ax=ax2)

# Add annotations for the delay rates on the second plot
for p in ax2.patches:
    height = p.get_height()
    ax2.annotate(f'{height * 100:.2f}%', 
                 (p.get_x() + p.get_width() / 2., height), 
                 ha='center', va='center', fontsize=12, color='black', fontweight='bold')

ax2.set_ylabel("Delay Rate")
ax2.set_xlabel("Holiday Indicator")
ax2.set_title("Flight Delay Rate by Holiday Indicator (5 Year)")
ax2.set_xticklabels(["Non-Holiday", "Holiday (±2 Days)"])

# Tighten layout
plt.tight_layout()
plt.show()


In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Convert to pandas
holiday_counts = df_otpw_60m.groupBy("is_holiday").count().toPandas()

# Plot
sns.barplot(data=holiday_counts, x="is_holiday", y="count", palette="Set2")
plt.title("Flight Count: Holiday vs. Non-Holiday (5 Years)")
plt.xlabel("Is Holiday (±2 days)")
plt.ylabel("Number of Flights")
plt.xticks([0, 1], ["Non-Holiday", "Holiday"])
plt.tight_layout()
plt.show()

In [0]:
from pyspark.sql.functions import year
# Flight Counts by Holiday Indicator and Year
# Add year column to Spark DataFrame
df_otpw_60m = df_otpw_60m.withColumn("year", year("sched_depart_date_time_UTC"))

# Group by year and is_holiday
holiday_counts_by_year = df_otpw_60m.groupBy("year", "is_holiday").count().orderBy("year", "is_holiday").toPandas()

# Map for better labels
holiday_counts_by_year["is_holiday"] = holiday_counts_by_year["is_holiday"].map({0: "Non-Holiday", 1: "Holiday"})

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(data=holiday_counts_by_year, x="year", y="count", hue="is_holiday", palette="Set2")
plt.title("Flight Counts by Year: Holiday vs. Non-Holiday (5 Years)")
plt.xlabel("Year")
plt.ylabel("Number of Flights")
plt.legend(title="Is Holiday")
plt.tight_layout()
plt.show()


In [0]:
from pyspark.sql.functions import year
import matplotlib.pyplot as plt
import seaborn as sns

# Step 1: Add year column to Spark DataFrame
df_otpw_60m = df_otpw_60m.withColumn("year", year("sched_depart_date_time_UTC"))

# Step 2: Group by year and is_holiday to get counts
holiday_counts_by_year = df_otpw_60m.groupBy("year", "is_holiday").count().toPandas()

# Step 3: Compute total flights per year
total_per_year = holiday_counts_by_year.groupby("year")["count"].transform("sum")
holiday_counts_by_year["percentage"] = holiday_counts_by_year["count"] / total_per_year

# Step 4: Map is_holiday values to labels for better readability
holiday_counts_by_year["is_holiday"] = holiday_counts_by_year["is_holiday"].map({0: "Non-Holiday", 1: "Holiday"})

# Step 5: Plot
plt.figure(figsize=(10, 6))
ax = sns.barplot(
    data=holiday_counts_by_year, 
    x="year", y="percentage", hue="is_holiday", palette="Set2"
)

# Add annotations to show percentages on the bars
for p in ax.patches:
    height = p.get_height()
    ax.annotate(f'{height * 100:.1f}%', 
                (p.get_x() + p.get_width() / 2., height), 
                ha='center', va='bottom', fontsize=10)

# Final touches
plt.title("Percentage of Flights by Year: Holiday vs. Non-Holiday (2015–2019)")
plt.ylabel("Percentage of Flights")
plt.xlabel("Year")
plt.ylim(0, 1)
plt.legend(title="Is Holiday")
plt.tight_layout()
plt.show()


In [0]:
from pyspark.sql.functions import month
# Monthly Flight Volume by Holiday Indicator
from pyspark.sql.functions import month
import matplotlib.pyplot as plt
import seaborn as sns

# Monthly Flight Volume by Holiday Indicator
monthly = df_otpw_60m.withColumn("month", month("sched_depart_date_time_UTC")) \
    .groupBy("month", "is_holiday").count() \
    .orderBy("month", "is_holiday") \
    .toPandas()

# Convert is_holiday to string for better legend labels
monthly["is_holiday"] = monthly["is_holiday"].map({0: "Non-Holiday", 1: "Holiday"})

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(data=monthly, x="month", y="count", hue="is_holiday", palette="Set2")
plt.title("Monthly Flight Counts by Holiday Indicator (5 Years)")
plt.xlabel("Month")
plt.ylabel("Flight Count")
plt.legend(title="Is Holiday")  # Let Seaborn/Matplotlib auto-handle colors
plt.tight_layout()
plt.show()



In [0]:
df_otpw_60m = df_otpw_60m.drop("year")

### Feature 3: Weekend Indicator

In [0]:
from pyspark.sql.functions import col, dayofweek, lit, when, hour

df_otpw_60m = df_otpw_60m.withColumn(
    "is_weekend",
    when(dayofweek(col("sched_depart_date_time_UTC")) == 1, lit(1))  # Sunday
    .when(dayofweek(col("sched_depart_date_time_UTC")) == 7, lit(1))  # Saturday
    .otherwise(lit(0)))


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

# Group by both 'is_weekend' and 'DEP_DEL15', then count the occurrences
delay_counts = df_otpw_60m.groupBy("is_weekend", "DEP_DEL15") \
    .agg(F.count("*").alias("flight_count")) \
    .toPandas()

# Plotting
sns.barplot(data=delay_counts, x="is_weekend", y="flight_count", hue="DEP_DEL15")

# Customizing the plot
plt.xticks([0, 1], ["Weekday", "Weekend"])
plt.ylabel("Count of Flights")
plt.title("Delayed vs Non-Delayed Flights: Weekend vs Weekday (5 Year)")
handles, labels = plt.gca().get_legend_handles_labels()
plt.legend(handles=handles, labels=["Not Delayed", "Delayed"], title="Delay Status")
plt.show()

In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Pivot data to get counts for pie chart
pivot_df = delay_counts.pivot(index='DEP_DEL15', columns='is_weekend', values='flight_count').fillna(0)

fig, axes = plt.subplots(1, 2, figsize=(12, 6))

# Weekday pie
axes[0].pie(pivot_df[0], labels=['Not Delayed', 'Delayed'], autopct='%1.1f%%', colors=sns.color_palette("pastel"))
axes[0].set_title('Weekday Flights')

# Weekend pie
axes[1].pie(pivot_df[1], labels=['Not Delayed', 'Delayed'], autopct='%1.1f%%', colors=sns.color_palette("pastel"))
axes[1].set_title('Weekend Flights')

plt.suptitle('Delay vs Non-Delay Flight %: Weekday vs Weekend (5 Year)')
plt.tight_layout()
plt.show()

In [0]:
from pyspark.sql import functions as F
import seaborn as sns
import matplotlib.pyplot as plt

# Group by 'is_weekend' and 'DEP_DEL15', calculate the average delay for each group
delay_avg = df_otpw_60m.groupBy("is_weekend") \
    .agg(F.avg("DEP_DELAY_NEW").alias("avg_delay")) \
    .toPandas()

# Plotting the average delay
plt.figure(figsize=(6, 4))
ax = sns.barplot(data=delay_avg, x="is_weekend", y="avg_delay")

# Adding the value annotations for each bar
for p in ax.patches:
    height = p.get_height()
    ax.annotate(f'{height:.2f}',  # Format as two decimal places
                (p.get_x() + p.get_width() / 2., height), 
                ha='center', va='center', fontsize=12, color='black', fontweight='bold')

# Customizing the plot
plt.xticks([0, 1], ["Weekday", "Weekend"])
plt.ylabel("Average Delay (Minutes)")
plt.title("Average Flight Delay: Weekend vs Weekday (5 Year)")
plt.tight_layout()
plt.show()


### Feature 4: Airports Network (Page Rank and Degree Centrality)


In [0]:
df_otpw_60m.dtypes

In [0]:
from graphframes import GraphFrame

# find the edges of the graph (airport routes) and use the count as the weight
edges = df_otpw_60m.groupBy('ORIGIN', 'DEST') \
    .agg(F.count('*').alias('weight'))

edges.cache()

display(edges)

# Compute total weight per ORIGIN (src)
total_flights = edges.groupBy('ORIGIN').agg(F.sum('weight').alias('total_weight'))

# Join and calculate normalized prob
edges_with_prob = edges.join(total_flights, on='ORIGIN') \
    .withColumn('prob', F.col('weight') / F.col('total_weight'))

edges_with_prob.cache()

# get the vertices of the graph (airports)
vertices = edges.select('ORIGIN').union(edges.select('DEST')) \
    .distinct().withColumnRenamed('ORIGIN', 'id')

vertices.cache()

# create the edges for the graph
edges_for_graph = edges_with_prob.selectExpr('ORIGIN as src', 'DEST as dst', 'prob')

edges_for_graph.cache()

# create the graph
g = GraphFrame(vertices, edges_for_graph)

# run the pagerank of the graph
results = g.pageRank(resetProbability=0.15, maxIter=20)
results.vertices.select('id', 'pagerank').show()

In [0]:
import matplotlib.pyplot as plt

# Get the degree centrality from GraphFrame
total_degree_df = g.degrees.toPandas()

# Sort the airports by degree centrality
total_degree_df = total_degree_df.sort_values(by='degree', ascending=False)

# Calculate normalized degree centrality
N = total_degree_df.shape[0]  # Total number of airports
total_degree_df['normalized_degree_centrality'] = total_degree_df['degree'] / (N - 1)


# Plotting the bar chart
plt.figure(figsize=(20, 10))
plt.bar(total_degree_df['id'], total_degree_df['degree'], color='skyblue', edgecolor='black')

# Adding labels and title
plt.xlabel('Airport', fontsize=12)
plt.ylabel('Degree (Number of Connections)', fontsize=12)
plt.title('Degree Centrality of Airports (5 Years)', fontsize=16)

# Rotate the x-axis labels for better readability
# Only display every 5-th airport name on the x-axis for readability
n = 5  
plt.xticks(total_degree_df['id'][::n], rotation=90)  # Show every n-th airport name

# Display the plot
plt.show()


In [0]:
display(total_degree_df)

In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

# In-degree: Count the number of incoming edges (flights arriving at the airport)
in_degree_df = g.inDegrees.toPandas()

# Out-degree: Count the number of outgoing edges (flights departing from the airport)
out_degree_df = g.outDegrees.toPandas()

# Merge in-degree and out-degree data
degree_df = pd.merge(in_degree_df, out_degree_df, on='id', how='outer')
degree_df = degree_df.rename(columns={'inDegree': 'in_degree', 'outDegree': 'out_degree'})

# Sort the airports by in-degree centrality
degree_df = degree_df.sort_values(by='out_degree', ascending=False)

# Create subplots for In-degree and Out-degree
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(20, 15), sharex=True)

# Plotting the In-degree Centrality
ax1.bar(degree_df['id'], degree_df['in_degree'], color='skyblue', edgecolor='black', alpha=0.6)
ax1.set_title('In-degree Centrality of Airports (5 Years)', fontsize=16)
ax1.set_ylabel('In-degree (Number of Incoming Flights)', fontsize=12)

# Plotting the Out-degree Centrality
ax2.bar(degree_df['id'], degree_df['out_degree'], color='green', edgecolor='black', alpha=0.6)
ax2.set_title('Out-degree Centrality of Airports (5 Years)', fontsize=16)
ax2.set_xlabel('Airport', fontsize=12)
ax2.set_ylabel('Out-degree (Number of Departing Flights)', fontsize=12)

# Rotate the x-axis labels for better readability
n = 8  # Adjust this value to show every n-th airport
ax2.set_xticks(degree_df['id'][::n])  # Show every n-th airport name
ax2.set_xticklabels(degree_df['id'][::n], rotation=90)

# Display the plot
plt.tight_layout()
plt.show()


In [0]:
# Normalize the degree centralities
N = degree_df.shape[0]  # Total number of airports
degree_df['normalized_in_degree'] = degree_df['in_degree'] / (N - 1)
degree_df['normalized_out_degree'] = degree_df['out_degree'] / (N - 1)
display(degree_df)

In [0]:
# First, convert degree_df (Pandas) to a Spark DataFrame
degree_sdf = spark.createDataFrame(degree_df)

# Rename columns to avoid ambiguity during join
origin_degree = degree_sdf.select(
    F.col("id").alias("ORIGIN"),
    F.col("normalized_out_degree").alias("origin_normalized_out_degree")
)

dest_degree = degree_sdf.select(
    F.col("id").alias("DEST"),
    F.col("normalized_in_degree").alias("dest_normalized_in_degree")
)

# Join on ORIGIN
df_otpw_60m = df_otpw_60m.join(origin_degree, on="ORIGIN", how="left")

# Then join on DEST
df_otpw_60m = df_otpw_60m.join(dest_degree, on="DEST", how="left")

# Review the results
df_otpw_60m.select(
    "ORIGIN", "DEST",
    "origin_normalized_out_degree",
    "dest_normalized_in_degree"
).show(5)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

# Add a PageRank score to the original 5 Years DataFrame

pagerank_df = results.vertices

# join the pagerank table using broadcast for both origin and destination airport
df_otpw_60m = df_otpw_60m \
    .join(broadcast(pagerank_df), df_otpw_60m.ORIGIN == pagerank_df.id, "left") \
    .withColumnRenamed("pagerank", "origin_pagerank") \
    .drop("id") \
    .join(broadcast(pagerank_df), df_otpw_60m.DEST == pagerank_df.id, "left") \
    .withColumnRenamed("pagerank", "dest_pagerank") \
    .drop("id")

### Check Point Data Set After Pre-Processing and Feature Engineering

In [0]:
# Checkpoint the data after data pre-processing
df_otpw_60m.write.mode("overwrite").parquet(f"{folder_path}/otpw_60m_cleaned.parquet")

## EDA Post Data Cleaning

In [0]:
section = "02"
number = "01"
folder_path = f"dbfs:/student-groups/Group_{section}_{number}"

df_otpw_60m = spark.read.parquet(f"{folder_path}/otpw_60m_cleaned.parquet")

In [0]:
df_otpw_60m.dtypes

In [0]:
df_otpw_60m.limit(10).toPandas().head(20)

In [0]:
# Use the 60 month data to plot the Geo plot
# create a geo plot for all airports and the selected US weather stations (closest to each airport)
import plotly.graph_objects as go

# create a table for destination airport and latitude and longitude
dest = df_otpw_60m.dropDuplicates(['DEST']).select('DEST', 'dest_airport_lat', 'dest_airport_lon')

display(dest)
dests = dest.toPandas()

# create a table for weather station and latitude and longitude
station = df_otpw_60m.dropDuplicates(['STATION']).select('STATION', 'LATITUDE', 'LONGITUDE')

display(station)

# Set up plotting tables
stations = station.toPandas()
# stations = stations_us.toPandas()

# Set up map
fig = go.Figure(data=go.Scattergeo(
         lat = dests['dest_airport_lat'],
         lon = dests['dest_airport_lon'],
         text = dests['DEST'],
         name = 'Airport',
         marker_symbol = 'circle-dot',
         marker_color = 'red',))
fig.add_trace(go.Scattergeo(
         lat = stations['LATITUDE'],
         lon = stations['LONGITUDE'],
         marker_color = 'green',
         name = 'Weather Stations',
         marker_symbol = 'x-dot',
         marker_size = 3,
         opacity = 0.5))
fig.update_layout(
         title = 'US Airports & Weather Stations (5 Years)',
         geo_scope='usa',
         autosize=False,
         width=1000,
         height=700,)
fig.show()

In [0]:
from pyspark.sql.functions import avg, col
from pyspark.sql.functions import count
import plotly.graph_objects as go
#Geo plot of Origin airports with avg delay on color (60 month)
origin_delays = df_otpw_60m.groupBy("ORIGIN", "ORIGIN_AIRPORT_LAT", "ORIGIN_AIRPORT_LON").agg(avg("DEP_DELAY").alias("AVG_DEP_DELAY"))

#create table with only destination airport, latitude and longitude, and average departure delay
origin_delays.orderBy(col("AVG_DEP_DELAY").desc()).limit(10).display(10)
origin_delays_pd = origin_delays.toPandas()

# Set up map
fig = go.Figure(data=go.Scattergeo(
         lat = origin_delays_pd['ORIGIN_AIRPORT_LAT'],
         lon = origin_delays_pd['ORIGIN_AIRPORT_LON'],
         text=origin_delays_pd['ORIGIN'] + "<br>Avg Delay: " + origin_delays_pd['AVG_DEP_DELAY'].round(3).astype(str),
         marker=dict(
            size= 10, 
            color=origin_delays_pd['AVG_DEP_DELAY'],
            colorscale="RdBu",
            cmin=-max(abs(origin_delays_pd['AVG_DEP_DELAY'])),
            cmax=max(origin_delays_pd['AVG_DEP_DELAY']),
            colorbar=dict(title="Avg Departure Delay (minutes)")
        )))
fig.update_layout(
         title = 'Average Departure Delay for Each Origin Airport (5 Years)',
         geo_scope='usa',
         autosize=False,
         width=1000,
         height=700,)
fig.show()


In [0]:
from pyspark.sql.functions import avg, col, count, sum as spark_sum
import plotly.graph_objects as go

# create a geo plot to show the percentage of delayed flights for each airport

# Aggregate: count total flights & delayed flights per airport
origin_delay_rate = df_otpw_60m.groupBy("ORIGIN", "ORIGIN_AIRPORT_LAT", "ORIGIN_AIRPORT_LON") \
    .agg(
        count("*").alias("TOTAL_FLIGHTS"),
        spark_sum("DEP_DEL15").alias("DELAYED_FLIGHTS")
    ) \
    .withColumn("PCT_DELAY", (col("DELAYED_FLIGHTS") / col("TOTAL_FLIGHTS")) * 100)

# View top 10 worst airports
origin_delay_rate.orderBy(col("PCT_DELAY").desc()).limit(10).display()

# Convert to Pandas for plotting
origin_delay_rate_pd = origin_delay_rate.toPandas()

# Plotting
fig = go.Figure(data=go.Scattergeo(
         lat = origin_delay_rate_pd['ORIGIN_AIRPORT_LAT'],
         lon = origin_delay_rate_pd['ORIGIN_AIRPORT_LON'],
         text=origin_delay_rate_pd['ORIGIN'] + "<br>% Delay: " + origin_delay_rate_pd['PCT_DELAY'].round(2).astype(str) + "%",
         marker=dict(
            size= 10, 
            color=origin_delay_rate_pd['PCT_DELAY'],
            colorscale="Reds",
            cmin=0,
            cmax=origin_delay_rate_pd['PCT_DELAY'].max(),
            colorbar=dict(title="% of Delayed Flights")
        )))
fig.update_layout(
         title = 'Percentage of Delayed Departures for Each Origin Airport (5 Years)',
         geo_scope='usa',
         autosize=False,
         width=1000,
         height=700)
fig.show()


In [0]:
# EDA on flight delayed time between origin and destination - 60 month data

from pyspark.sql.functions import col, avg, expr
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import seaborn as sns

# Compute average departure delay for each origin-destination pair
avg_delay_df = df_otpw_60m.groupBy("ORIGIN", "DEST").agg(avg("DEP_DELAY").alias("AVG_DEP_DELAY"))

# Sort by delay
avg_delay_df = avg_delay_df.orderBy(col("AVG_DEP_DELAY").desc())
avg_delay_df.display(10)

# plot the top 50 routes with the highest delays
avg_delay_df50 = avg_delay_df.withColumn("ORIGIN-DEST", expr("ORIGIN || '-' || DEST")).limit(50)
avg_delay_df_pd = avg_delay_df50.toPandas()

plt.figure(figsize=(12, 6))
sns.barplot(data=avg_delay_df_pd, x="ORIGIN-DEST", y="AVG_DEP_DELAY", color = sns.color_palette("Blues")[2])

plt.xticks(rotation=90)
plt.xlabel("Origin - Destination")
plt.ylabel("Average Departure Delay (minutes)")
plt.title("Average Departure Delay Between Origin and Destination (5 Years)")
plt.show()

In [0]:
# EDA on Airport Degree Features

plot_df = df_otpw_60m.select(
    "origin_normalized_out_degree", "dest_normalized_in_degree", "DEP_DEL15"
).dropna().toPandas()

# Boxplots of Degree Centrality vs. DEP_DEL15
plt.figure(figsize=(12, 6))
sns.boxplot(data=plot_df, x="DEP_DEL15", y="origin_normalized_out_degree", palette="Blues")
plt.title("Origin Out-Degree vs Delay (5 Years)")
plt.xlabel("DEP_DEL15 (0 = No Delay, 1 = Delayed)")
plt.ylabel("Normalized Origin Out-Degree")
plt.show()

plt.figure(figsize=(12, 6))
sns.boxplot(data=plot_df, x="DEP_DEL15", y="dest_normalized_in_degree", palette="Reds")
plt.title("Destination In-Degree vs Delay (5 Years)")
plt.xlabel("DEP_DEL15 (0 = No Delay, 1 = Delayed)")
plt.ylabel("Normalized Destination In-Degree")
plt.show()


In [0]:
# EDA on PageRank Results

from pyspark.sql import functions as F

# create a plot for pagerank vs delay rate
df_group = df_otpw_60m.groupBy('ORIGIN').agg(
    F.avg('origin_pagerank').alias('avg_origin_pagerank'),
    F.sum('DEP_DEL15').alias('total_delays'),
    F.count('*').alias('total_flights')
).withColumn(
    'delay_rate', F.col('total_delays') / F.col('total_flights')
)

# convert the table to Pandas
df_pd = df_group.toPandas()

# create the plot
plt.figure(figsize=(10,6))
sns.scatterplot(
    data=df_pd, 
    x='avg_origin_pagerank', 
    y='delay_rate', 
    size='total_flights', 
    hue='delay_rate', 
    palette='Reds', 
    alpha=0.9
)
plt.title('Pagerank vs Delay Rate by Airport')
plt.xlabel('Pagerank')
plt.ylabel('Delay Rate')
plt.show()

In [0]:
# EDA - PageRank and Origin Airports
df_group = df_otpw_60m.groupBy('ORIGIN').agg(
    F.avg('origin_pagerank').alias('avg_origin_pagerank')
).orderBy(F.desc('avg_origin_pagerank'))

df_pd = df_group.toPandas()

plt.figure(figsize=(12, 6))
ax2 = sns.barplot(data=df_pd, x='ORIGIN', y='avg_origin_pagerank',palette='Blues_r')

# Rotate the x-axis labels for better readability
n = 8  # Adjust this value to show every n-th airport
ax2.set_xticks(range(0, len(df_pd), n))  # Show every n-th airport name
ax2.set_xticklabels(df_pd['ORIGIN'][::n], rotation=90)

plt.title('Average Pagerank by Origin Airport (5 Years)')
plt.ylabel('Average Pagerank')
plt.xlabel('Origin Airport')
plt.show()

In [0]:
from pyspark.sql.functions import col, avg, expr
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import seaborn as sns
# Box Plot: Previous Flight Delay Indicator vs Current Departure Delay - 60 months
df_sample = df_otpw_60m.select("prev_flight_delay_ind", "DEP_DELAY_NEW","DEP_DEL15").toPandas()

# Bar plot: % Delayed by previous delay indicator
delay_rate = df_sample.groupby("prev_flight_delay_ind")["DEP_DEL15"].mean().reset_index()

plt.figure(figsize=(8, 6))
sns.barplot(data=delay_rate, x="prev_flight_delay_ind", y="DEP_DEL15", palette="Set2")
plt.title("Proportion of Delays by Previous Flight Delay Indicator (5 Years)")
plt.xlabel("Previous Flight Delayed (0 = No, 1 = Yes)")
plt.ylabel("Proportion of Delayed Flights (DEP_DEL15)")
plt.ylim(0, 1)
plt.show()


In [0]:
# Count Plot of Delay Status by Previous Delay Indicator (60m)

plt.figure(figsize=(10, 6))
sns.countplot(data=df_sample, x="prev_flight_delay_ind", hue="DEP_DEL15", palette="coolwarm")
plt.title("Count of Delayed vs. Not Delayed Flights by Previous Delay Indicator (5 Years)")
plt.xlabel("Previous Flight Delayed (0 = No, 1 = Yes)")
plt.ylabel("Flight Count")
plt.legend(title="DEP_DEL15", labels=["On-Time", "Delayed"])
plt.show()


In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Calculate the delay counts with percentage
delay_counts = df_sample.groupby("prev_flight_delay_ind")["DEP_DEL15"].value_counts(normalize=True).unstack()

# Show percentage table
print("Percentage Table:")
print(delay_counts * 100)  # Multiplying by 100 to get the percentages

# Create stacked bar chart
ax = delay_counts.plot(kind="bar", stacked=True, figsize=(8, 6))

# Annotate percentages on the bars
for p in ax.patches:
    # Get the width and height of each bar segment
    height = p.get_height()
    width = p.get_width()
    x, y = p.get_xy()  # Get the x and y position of the bar segment

    # Calculate the percentage of each segment
    percentage = height * 100  # Since the heights represent the percentages

    # Annotate percentage on each bar segment
    ax.annotate(f'{percentage:.1f}%', (x + width / 2, y + height / 2), ha='center', va='center', fontsize=10)

# Set the plot title and labels
plt.title("Percentage of Delays by Previous Flight Delay Indicator (5 Years)")
plt.xlabel("Previous Flight Delayed (0 = No, 1 = Yes)")
plt.ylabel("Percentage")
plt.legend(title="DEP_DEL15", labels=["On-Time", "Delayed"])
plt.ylim(0, 1)
plt.show()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Holiday vs Delay Rate (DEP_DEL15)
holiday_delay_pd = df_otpw_60m.groupBy("is_holiday").agg(
    F.count("*").alias("total_flights"),
    F.sum("DEP_DEL15").alias("num_delayed"),
    (F.sum("DEP_DEL15") / F.count("*")).alias("delay_rate")
).toPandas()

# Plot Delay Rates by Holiday Indicator
plt.figure(figsize=(6, 4))
ax = sns.barplot(data=holiday_delay_pd, x="is_holiday", y="delay_rate", palette="Set2")

# Add percentage labels on the bars
for p in ax.patches:
    height = p.get_height()
    ax.annotate(f'{height * 100:.2f}%', 
                (p.get_x() + p.get_width() / 2., height), 
                ha='center', va='center', fontsize=12, color='black', fontweight='bold')

plt.xticks([0, 1], ["Non-Holiday", "Holiday (±2 Days)"])
plt.ylabel("Delay Rate")
plt.title("Flight Delay Rate by Holiday Indicator (5 Year)")
plt.tight_layout()
plt.show()


#### Correlation Analysis

In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas
df_sample_60m = df_otpw_60m.select([
     'AIR_TIME', 'DISTANCE', 
     'origin_station_lat', 'origin_station_lon', 'origin_airport_lat', 'origin_airport_lon', 
     'origin_station_dis', 'dest_station_lat', 'dest_station_lon', 'dest_airport_lat', 'dest_airport_lon', 
     'dest_station_dis', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 
     'HourlyAltimeterSetting', 'HourlyDewPointTemperature', 'HourlyDryBulbTemperature', 
     'HourlyPrecipitation', 'HourlyPressureTendency', 'HourlyRelativeHumidity', 
     'HourlySeaLevelPressure', 'HourlyStationPressure', 'HourlyVisibility', 'HourlyWetBulbTemperature', 
     'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 
     'origin_normalized_out_degree', 'dest_normalized_in_degree', 
     'origin_pagerank', 'dest_pagerank'
]).toPandas()

# Compute Spearman correlation
corr_matrix_spearman = df_sample_60m.corr(method="spearman")

# Plot heatmap
plt.figure(figsize=(14,10))
sns.heatmap(corr_matrix_spearman, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5, annot_kws={"size": 8})
plt.title("Spearman Correlation Heatmap of Flight Delay & All Continuous Features (5 Years)")
plt.show()


In [0]:
# # Plot histograms for numerical variables 60 month data
axes = df_sample_60m.hist(figsize=(15,12), bins=30, edgecolor="black")
plt.suptitle("Numerical Feature Distributions (60m)", fontsize=16)

for ax in axes.flatten():
    if ax.get_title(): 
        ax.set_title(ax.get_title(), fontsize=10) 

plt.subplots_adjust(wspace=0.8, hspace=0.8)
plt.show()

## Dimensionality Reduction

In [0]:
import itertools
# Drop Highly Correlated & Redundent Features
drop_col = ["DEP_DELAY_DOUBLE", "DEP_DELAY_NEW_DOUBLE", "DEP_DELAY", "DISTANCE", "ELEVATION", "HourlyAltimeterSetting", "HourlyDewPointTemperature", "HourlyWetBulbTemperature", "origin_station_lat", "origin_station_lon", "origin_airport_lat", "origin_airport_lon", "dest_station_lat", "dest_station_lon", "dest_normalized_in_degree", "origin_normalized_out_degree"]

# Drop Highly Correlated Features for 60 months data
df_otpw_60m_cleaned = df_otpw_60m.drop(*drop_col)

In [0]:
# Read new 12 month data
df_otpw_60m_cleaned.columns

In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# # Assess multicollinearity after dropping correlated features for 60 month data
df_numeric_60m = df_otpw_60m_cleaned.select([
     'origin_station_dis', 'dest_airport_lat', 'dest_airport_lon', 'dest_station_dis', 'LATITUDE', 'LONGITUDE', 'HourlyPressureTendency', 'HourlyDryBulbTemperature', 'HourlyPrecipitation', 'HourlyRelativeHumidity', 'HourlySeaLevelPressure', 'HourlyStationPressure', 'HourlyVisibility', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'origin_pagerank', 'dest_pagerank'
]).toPandas()

# # Compute correlation matrix
corr_matrix = df_numeric_60m.corr(method="spearman")

# # Plot heatmap
plt.figure(figsize=(14,10))
sns.heatmap(corr_matrix, annot=True, fmt=".2f", cmap="coolwarm", linewidths=0.5)
plt.title("Spearman Correlation Heatmap of Continuous Features (5 Years)")
plt.show()

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# LASSO Regularization - 60 month data

# List of numerical features
num_col = [c for c, t in df_otpw_60m_cleaned.dtypes if t in ('double') and c != "DEP_DELAY_NEW"]

# Filter out rows with null values in the target column
df_otpw_60m_cleaned = df_otpw_60m_cleaned.filter(df_otpw_60m_cleaned["DEP_DEL15"].isNotNull())

# create an assembler for numerical features
assembler = VectorAssembler(
    inputCols=num_col,
    outputCol="features",
    handleInvalid="skip"
)

# Lasso Regression (L1 Regularization)
logReg_lasso = LogisticRegression(featuresCol="features", labelCol="DEP_DEL15", elasticNetParam=1.0)

# Create a pipeline with indexers, encoders, assembler, and model
pipeline = Pipeline(stages=[assembler, logReg_lasso])

# Fit the model
model = pipeline.fit(df_otpw_60m_cleaned)

# Extract the coefficients of the model
coef = model.stages[-1].coefficients
intercept = model.stages[-1].intercept

# Get the feature names from the assembler
feature_names = assembler.getInputCols()

# Find dropped and selected features - drop features with coefficients less than 0.001
dropped_features = [(feature_names[i], coef[i]) for i in range(len(coef)) if abs(coef[i]) < 0.001]
selected_features = [(feature_names[i], coef[i]) for i in range(len(coef)) if abs(coef[i]) >= 0.001]

print("Dropped Features:", dropped_features)
print("Selected Features:", selected_features)

In [0]:
from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import numpy as np

# Define numerical features for PCA (using the same LASSO-selected features)
feature_columns_60m = [f[0] for f in selected_features]  # Use the same selected features

# Assemble the feature columns into a feature vector
assembler_60m = VectorAssembler(inputCols=feature_columns_60m, outputCol="features", handleInvalid="keep")

# Standardize the features
scaler_60m = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# Apply PCA with 18 components 
k_60m = min(len(feature_columns_60m), 18)  
pca_60m = PCA(k=k_60m, inputCol="scaled_features", outputCol="pca_features")

# Create a pipeline
pca_pipeline_60m = Pipeline(stages=[assembler_60m, scaler_60m, pca_60m])

# Fit the PCA model
pca_model_60m = pca_pipeline_60m.fit(df_otpw_60m_cleaned.dropna(subset=feature_columns_60m))

# Extract explained variance
explained_variance_60m = pca_model_60m.stages[-1].explainedVariance.toArray()

# Cumulative explained variance
cumulative_variance_60m = np.cumsum(explained_variance_60m)

# Scree Plot for 60-month data
plt.figure(figsize=(8, 5))
plt.plot(range(1, k_60m + 1), explained_variance_60m, marker='o', linestyle='--', label="Explained Variance")
plt.plot(range(1, k_60m + 1), cumulative_variance_60m, marker='s', linestyle='-', label="Cumulative Variance", color='r')
plt.axhline(y=0.85, color='g', linestyle='--', label="85% Variance Threshold") 
plt.xlabel("Principal Components")
plt.ylabel("Explained Variance")
plt.title("Scree Plot - PCA (5 Years)")
plt.legend()
plt.grid()
plt.show()

# Print explained variance
print("Explained Variance by Principal Components (5 Years):", explained_variance_60m)
print("Cumulative Explained Variance (5 Years):", cumulative_variance_60m)

In [0]:
# drop the overlapping columns (show dropped in 60m data) result from the LASSO regularization
# Dropped Features: [('AIR_TIME', 0.0007176901231048505), ('origin_station_dis', 9.369368015092871e-06), ('dest_station_dis', -8.989046179898635e-05), ('HourlyWindDirection', 0.00032700957757733114)]

drop_col_num = ['AIR_TIME', 'origin_station_dis', 'dest_station_dis', 'HourlyWindDirection']

# Drop these columns
df_otpw_60m_cleaned = df_otpw_60m_cleaned.drop(*drop_col_num)

In [0]:
from pyspark.sql.types import StringType

# Get all string-type columns for 60m data
string_columns = [field.name for field in df_otpw_60m_cleaned.schema.fields if isinstance(field.dataType, StringType)]

# Compute unique value counts for string-type columns
unique_counts = {col_name: df_otpw_60m_cleaned.select(col_name).distinct().count() for col_name in string_columns}

unique_counts

In [0]:
# drop categorical columns that are either duplicates or have too many unique values (high cardinality)
drop_col_cat = ['ORIGIN_STATE_FIPS','ORIGIN_WAC',"DEST_STATE_FIPS","DEST_WAC",'ORIGIN_AIRPORT_ID','DEST_AIRPORT_ID', 
                'OP_CARRIER_FL_NUM', 'TAIL_NUM','STATION','HourlySkyConditions']

# Drop these columns
df_otpw_60m_cleaned = df_otpw_60m_cleaned.drop(*drop_col_cat)


In [0]:
string_columns = [field.name for field in df_otpw_60m_cleaned.schema.fields if isinstance(field.dataType, StringType)]
string_columns

# Compute unique value counts for string-type columns
unique_counts = {col_name: df_otpw_60m_cleaned.select(col_name).distinct().count() for col_name in string_columns}

unique_counts

In [0]:
df_otpw_60m_cleaned.limit(10).toPandas().head()

In [0]:
# final removed columns
drop_col_final = ['DATE', 'two_hours_prior_depart_UTC', 'four_hours_prior_depart_UTC','DEP_DELAY_NEW','LATITUDE','LONGITUDE','ORIGIN_CITY_MARKET_ID', 'DEST_CITY_MARKET_ID']

# Drop these columns
df_otpw_60m_cleaned = df_otpw_60m_cleaned.drop(*drop_col_final)

In [0]:
df_otpw_60m_cleaned.dtypes

In [0]:
# Final Review
df_otpw_60m_cleaned.limit(10).toPandas().head()

In [0]:
# Checkpoint the data after dimensionality reduction
df_otpw_60m_cleaned.write.mode("overwrite").parquet(f"{folder_path}/otpw_60m_proceed.parquet")

# Train-Test Split on 5 Years Data

In [0]:
# read datasets
section = "02"
number = "01"
folder_path = f"dbfs:/student-groups/Group_{section}_{number}"

df_otpw_60m = spark.read.parquet(f"{folder_path}/otpw_60m_proceed.parquet")
print("Size of df_otpw_60m:", df_otpw_60m.count())

In [0]:
from pyspark.sql import functions as F

print(df_otpw_60m.filter(F.col("HourlyPrecipitation").isNull()).count())
print(df_otpw_60m.filter(F.col("HourlySeaLevelPressure").isNull()).count())
print(df_otpw_60m.filter(F.col("HourlyWindGustSpeed").isNull()).count())

In [0]:
# drop duplicates in df_otpw_60m dataset
df_otpw_60m = df_otpw_60m.dropDuplicates()

# print the size of df_otpw_60m dataset
print("Size of df_otpw_60m:", df_otpw_60m.count())

In [0]:
from pyspark.sql.functions import rand,col,when,concat,substring,lit,udf,lower,sum as ps_sum,count as ps_count,row_number

# Remove null values in the target variable
df_otpw_60m = df_otpw_60m.filter(F.col("DEP_DEL15").isNotNull())

# Train-Test Split for 60-Month Dataset
df_train = df_otpw_60m.filter(col("sched_depart_date_time_UTC") < "2019-01-01 00:00:00")  # 2015-2018
df_test = df_otpw_60m.filter(col("sched_depart_date_time_UTC") >= "2019-01-01 00:00:00")  # 2019 (blind test)

print("Training Set Size (2015–2018):", df_train.count())
print("Test Set Size (2019):", df_test.count())

In [0]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import pandas as pd

# Convert to Pandas DataFrame for visualization
df_train_pandas = df_train.toPandas()
df_test_pandas = df_test.toPandas()

# Create a new column for the dataset split (train or test) in both dataframes
df_train_pandas['Dataset'] = 'Train'
df_test_pandas['Dataset'] = 'Test'

# Concatenate the two datasets to combine them for plotting
df_combined = pd.concat([df_train_pandas[['DEP_DEL15', 'Dataset']], df_test_pandas[['DEP_DEL15', 'Dataset']]])

# Replace 0 with 'On Time' and 1 with 'Delayed' for better labeling
df_combined['DEP_DEL15'] = df_combined['DEP_DEL15'].replace({0: 'On Time', 1: 'Delayed'})

# Create a stacked bar plot to compare the distribution of On Time vs Delayed in both sets
plt.figure(figsize=(8, 6))
sns.countplot(x='DEP_DEL15', hue='Dataset', data=df_combined, palette='Set2')

# Set the title and labels
plt.title("Comparison of On Time vs Delayed Flights in Train and Test Sets (5 Years)")
plt.xlabel("Flight Status")
plt.ylabel("Count")
plt.legend(title='Dataset', loc='upper right')

plt.tight_layout()
plt.show()


## Downsampling

In [0]:
#downsampling 
def downsample(train_df,verbose=False):
  '''Downsamples train_df to balance classes'''
  #balance classes in train
  delay_count = train_df.filter(F.col("DEP_DEL15") == 1).count()
  non_delay_count = train_df.filter(F.col("DEP_DEL15") == 0).count()
 
  total = delay_count + non_delay_count
  keep_percent = delay_count / non_delay_count
  
  train_delay = train_df.filter(F.col('DEP_DEL15') == 1)
  train_non_delay = train_df.filter(F.col('DEP_DEL15') == 0).sample(withReplacement=False,fraction=keep_percent,seed=42)
  train_downsampled = train_delay.union(train_non_delay)
  return train_downsampled  

In [0]:
from pyspark.sql import functions as F
# Downsample the training data from the 60-month dataset
df_train_downsampled = downsample(df_train)
df_train_downsampled.groupBy("DEP_DEL15").count().show()

# Convert to Pandas DataFrame for visualization
df_train_downsampled_pandas = df_train_downsampled.toPandas()
df_test_pandas = df_test.toPandas()

# Create a new column for the dataset split (train or test) in both dataframes
df_train_downsampled_pandas['Dataset'] = 'Train'
df_test_pandas['Dataset'] = 'Test'

# Concatenate the two datasets to combine them for plotting
df_combined_60m_downsampled = pd.concat([df_train_downsampled_pandas[['DEP_DEL15', 'Dataset']], df_test_pandas[['DEP_DEL15', 'Dataset']]])

# Replace 0 with 'On Time' and 1 with 'Delayed' for better labeling
df_combined_60m_downsampled['DEP_DEL15'] = df_combined_60m_downsampled['DEP_DEL15'].replace({0: 'On Time', 1: 'Delayed'})

# Create a stacked bar plot to compare the distribution of On Time vs Delayed in both sets after downsampling
plt.figure(figsize=(8, 6))
sns.countplot(x='DEP_DEL15', hue='Dataset', data=df_combined_60m_downsampled, palette='Set2', order=['On Time', 'Delayed'])

# Set the title and labels
plt.title("Comparison of On Time vs Delayed Flights in Train and Test Sets (5 Years Data) After Downsampling")
plt.xlabel("Flight Status")
plt.ylabel("Count")
plt.legend(title='Dataset', loc='upper right')

plt.tight_layout()
plt.show()

# Count the occurrences of 0 and 1 for both train and test sets after downsampling
count_table_downsampled = df_combined_60m_downsampled.groupby(['Dataset', 'DEP_DEL15']).size().unstack(fill_value=0)

# Display the count table for the downsampled dataset
print("Count of On Time and Delayed flights in Train and Test sets (After Downsampling):")
print(count_table_downsampled)

## Check Point Train and Test Datasets

In [0]:
# Save the original train set (after downsampling) and test set as parquet files
df_train.write.mode("overwrite").parquet(f"{folder_path}/df_train.parquet")
df_train_downsampled.write.mode("overwrite").parquet(f"{folder_path}/df_train_downsampled.parquet")
df_test.write.mode("overwrite").parquet(f"{folder_path}/df_test.parquet")