In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("BigDataProject").getOrCreate()

all_flights = spark.read.csv("all_flights.csv", header=True, inferSchema=True)

In [2]:
row_count = all_flights.count()
print("Number of rows:", row_count)

Number of rows: 61556964


In [3]:
all_flights = all_flights.drop('Unnamed: 27')

In [4]:
canceled_flights = all_flights.filter(all_flights.CANCELLED == 1)
normal_flights = all_flights.filter(all_flights.CANCELLED == 0)

canceled_flights_row_count = canceled_flights.count()
normal_flights_row_count = normal_flights.count()

print(f"Number of canceled flights rows: {canceled_flights_row_count}")
print(f"Number of normal flights rows: {normal_flights_row_count}")

Number of canceled flights rows: 973209
Number of normal flights rows: 60583755


In [5]:
# List of columns to fill
columns_to_fill = ["WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "ACTUAL_ELAPSED_TIME", "AIR_TIME"]

# Fill all specified columns with -1 where they are null
canceled_flights = canceled_flights.fillna({col: -1 for col in columns_to_fill})


In [6]:
from pyspark.sql.functions import col
# plane left the gate (maybe even moved but never took off fully)
weird_canceled_flights =canceled_flights.filter((col("DEP_TIME").isNotNull()))
other_canceled_flights =canceled_flights.filter((col("DEP_TIME").isNull()))

In [7]:
delayed_flights = normal_flights.filter(
    (col("DEP_DELAY") > 0)
)

# Flights not delayed (DEP_DELAY <= 0)
non_delayed_flights = normal_flights.filter(
    (col("DEP_DELAY") <= 0)
)

In [8]:
from pyspark.sql import functions as F

row_count = weird_canceled_flights.count()

print("Number of rows in weird_canceled_flights:", row_count)
print("-----------------------------------------")
for column in weird_canceled_flights.columns:
    null_count = weird_canceled_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in weird_canceled_flights: 37486
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 208 nulls
TAXI_OUT: 28178 nulls
WHEELS_OFF: 28173 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 26 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 37486 nulls
WEATHER_DELAY: 37486 nulls
NAS_DELAY: 37486 nulls
SECURITY_DELAY: 37486 nulls
LATE_AIRCRAFT_DELAY: 37486 nulls


In [9]:
# handling dep delay missing values in weird canceled flights

from pyspark.sql.functions import col, floor, when

# Step 1: Convert CRS_DEP_TIME to total minutes (temporary columns)
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_HOUR", floor(col("CRS_DEP_TIME") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_MIN", col("CRS_DEP_TIME") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_TOTAL_MIN", (col("CRS_DEP_HOUR") * 60) + col("CRS_DEP_MIN"))

# Step 2: Convert DEP_TIME to total minutes (temporary columns)
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_HOUR", floor(col("DEP_TIME") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_MIN", col("DEP_TIME") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_TOTAL_MIN", (col("DEP_HOUR") * 60) + col("DEP_MIN"))

# Step 3: Calculate DEP_DELAY
weird_canceled_flights = weird_canceled_flights.withColumn(
    "DEP_DELAY",
    when(
        col("DEP_DELAY").isNull(),
        col("DEP_TOTAL_MIN") - col("CRS_DEP_TOTAL_MIN")
    ).otherwise(
        col("DEP_DELAY")
    )
)

# Step 4: Drop helper columns to clean
weird_canceled_flights = weird_canceled_flights.drop(
    "CRS_DEP_HOUR", "CRS_DEP_MIN", "CRS_DEP_TOTAL_MIN",
    "DEP_HOUR", "DEP_MIN", "DEP_TOTAL_MIN"
)

In [10]:
# handling wheels off missing values in weird canceled flights
weird_canceled_flights = weird_canceled_flights.fillna({"WHEELS_OFF": -1})

In [11]:
# handling taxi out missing values in weird canceled flights
from pyspark.sql.functions import floor, col, when

# Step 1: Convert DEP_TIME and WHEELS_OFF into total minutes
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_HOUR", floor(col("DEP_TIME") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_MIN", col("DEP_TIME") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("DEP_TOTAL_MIN", col("DEP_HOUR") * 60 + col("DEP_MIN"))

weird_canceled_flights = weird_canceled_flights.withColumn("WHEELS_OFF_HOUR", floor(col("WHEELS_OFF") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("WHEELS_OFF_MIN", col("WHEELS_OFF") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("WHEELS_OFF_TOTAL_MIN", col("WHEELS_OFF_HOUR") * 60 + col("WHEELS_OFF_MIN"))

# Step 2: Handle missing or calculate TAXI_OUT
weird_canceled_flights = weird_canceled_flights.withColumn(
    "TAXI_OUT",
    when(
        col("TAXI_OUT").isNull(),
        when(
            (col("WHEELS_OFF") == -1) | (col("DEP_TIME").isNull()) | (col("WHEELS_OFF").isNull()),
            -1
        ).otherwise(
            col("WHEELS_OFF_TOTAL_MIN") - col("DEP_TOTAL_MIN")
        )
    ).otherwise(
        col("TAXI_OUT")
    )
)

# Step 3: (optional) Drop helper columns to keep DataFrame clean
weird_canceled_flights = weird_canceled_flights.drop("DEP_HOUR", "DEP_MIN", "DEP_TOTAL_MIN", "WHEELS_OFF_HOUR", "WHEELS_OFF_MIN", "WHEELS_OFF_TOTAL_MIN")


In [12]:
# handling crs elapsed time missing values in weird canceled flights
from pyspark.sql.functions import floor, col, when

# Step 1: Convert CRS_DEP_TIME and CRS_ARR_TIME to total minutes
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_HOUR", floor(col("CRS_DEP_TIME") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_MIN", col("CRS_DEP_TIME") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_DEP_TOTAL_MIN", col("CRS_DEP_HOUR") * 60 + col("CRS_DEP_MIN"))

weird_canceled_flights = weird_canceled_flights.withColumn("CRS_ARR_HOUR", floor(col("CRS_ARR_TIME") / 100))
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_ARR_MIN", col("CRS_ARR_TIME") % 100)
weird_canceled_flights = weird_canceled_flights.withColumn("CRS_ARR_TOTAL_MIN", col("CRS_ARR_HOUR") * 60 + col("CRS_ARR_MIN"))

# Step 2: Calculate CRS_ELAPSED_TIME
weird_canceled_flights = weird_canceled_flights.withColumn(
    "CRS_ELAPSED_TIME",
    when(
        col("CRS_ELAPSED_TIME").isNull(),
        when(
            col("CRS_ARR_TOTAL_MIN") >= col("CRS_DEP_TOTAL_MIN"),
            col("CRS_ARR_TOTAL_MIN") - col("CRS_DEP_TOTAL_MIN")
        ).otherwise(
            (col("CRS_ARR_TOTAL_MIN") + 1440) - col("CRS_DEP_TOTAL_MIN")
        )
    ).otherwise(
        col("CRS_ELAPSED_TIME")
    )
)

# Step 3: Drop helper columns
weird_canceled_flights = weird_canceled_flights.drop("CRS_DEP_HOUR", "CRS_DEP_MIN", "CRS_DEP_TOTAL_MIN", "CRS_ARR_HOUR", "CRS_ARR_MIN", "CRS_ARR_TOTAL_MIN")

In [13]:
columns_to_fill = [
    "LATE_AIRCRAFT_DELAY",
    "SECURITY_DELAY",
    "NAS_DELAY",
    "WEATHER_DELAY",
    "CARRIER_DELAY"
]

weird_canceled_flights = weird_canceled_flights.fillna({col: 0 for col in columns_to_fill})

In [14]:
from pyspark.sql import functions as F

row_count = weird_canceled_flights.count()

print("Number of rows in weird_canceled_flights:", row_count)
print("-----------------------------------------")
for column in weird_canceled_flights.columns:
    null_count = weird_canceled_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in weird_canceled_flights: 37486
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 0 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 0 nulls
WEATHER_DELAY: 0 nulls
NAS_DELAY: 0 nulls
SECURITY_DELAY: 0 nulls
LATE_AIRCRAFT_DELAY: 0 nulls


In [15]:
from pyspark.sql import functions as F

row_count = other_canceled_flights.count()

print("Number of rows in other_canceled_flights:", row_count)
print("-----------------------------------------")
for column in other_canceled_flights.columns:
    null_count = other_canceled_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in other_canceled_flights: 935723
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 935723 nulls
DEP_DELAY: 935723 nulls
TAXI_OUT: 935723 nulls
WHEELS_OFF: 935723 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 11 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 935723 nulls
WEATHER_DELAY: 935723 nulls
NAS_DELAY: 935723 nulls
SECURITY_DELAY: 935723 nulls
LATE_AIRCRAFT_DELAY: 935723 nulls


In [16]:
columns_to_fill_with_0 = [
    "LATE_AIRCRAFT_DELAY",
    "SECURITY_DELAY",
    "NAS_DELAY",
    "WEATHER_DELAY",
    "CARRIER_DELAY"
]

other_canceled_flights = other_canceled_flights.fillna({col: 0 for col in columns_to_fill_with_0})

columns_to_fill_with_minus_1 = [
    "DEP_TIME",
    "DEP_DELAY",
    "TAXI_OUT",
    "WHEELS_OFF"
]

other_canceled_flights = other_canceled_flights.fillna({col: -1 for col in columns_to_fill_with_minus_1})

In [17]:
# handling crs elapsed time missing values in other canceled flights
from pyspark.sql.functions import floor, col, when

# Step 1: Convert CRS_DEP_TIME and CRS_ARR_TIME to total minutes
other_canceled_flights = other_canceled_flights.withColumn("CRS_DEP_HOUR", floor(col("CRS_DEP_TIME") / 100))
other_canceled_flights = other_canceled_flights.withColumn("CRS_DEP_MIN", col("CRS_DEP_TIME") % 100)
other_canceled_flights = other_canceled_flights.withColumn("CRS_DEP_TOTAL_MIN", col("CRS_DEP_HOUR") * 60 + col("CRS_DEP_MIN"))

other_canceled_flights = other_canceled_flights.withColumn("CRS_ARR_HOUR", floor(col("CRS_ARR_TIME") / 100))
other_canceled_flights = other_canceled_flights.withColumn("CRS_ARR_MIN", col("CRS_ARR_TIME") % 100)
other_canceled_flights = other_canceled_flights.withColumn("CRS_ARR_TOTAL_MIN", col("CRS_ARR_HOUR") * 60 + col("CRS_ARR_MIN"))

# Step 2: Calculate CRS_ELAPSED_TIME
other_canceled_flights = other_canceled_flights.withColumn(
    "CRS_ELAPSED_TIME",
    when(
        col("CRS_ELAPSED_TIME").isNull(),
        when(
            col("CRS_ARR_TOTAL_MIN") >= col("CRS_DEP_TOTAL_MIN"),
            col("CRS_ARR_TOTAL_MIN") - col("CRS_DEP_TOTAL_MIN")
        ).otherwise(
            (col("CRS_ARR_TOTAL_MIN") + 1440) - col("CRS_DEP_TOTAL_MIN")
        )
    ).otherwise(
        col("CRS_ELAPSED_TIME")
    )
)

# Step 3: Drop helper columns
other_canceled_flights = other_canceled_flights.drop("CRS_DEP_HOUR", "CRS_DEP_MIN", "CRS_DEP_TOTAL_MIN", "CRS_ARR_HOUR", "CRS_ARR_MIN", "CRS_ARR_TOTAL_MIN")

In [18]:
from pyspark.sql import functions as F

row_count = other_canceled_flights.count()

print("Number of rows in other_canceled_flights:", row_count)
print("-----------------------------------------")
for column in other_canceled_flights.columns:
    null_count = other_canceled_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in other_canceled_flights: 935723
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 0 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 0 nulls
WEATHER_DELAY: 0 nulls
NAS_DELAY: 0 nulls
SECURITY_DELAY: 0 nulls
LATE_AIRCRAFT_DELAY: 0 nulls


In [19]:
from pyspark.sql import functions as F

row_count = delayed_flights.count()

print("Number of rows in delayed_flights:", row_count)
print("-----------------------------------------")
for column in delayed_flights.columns:
    null_count = delayed_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in delayed_flights: 22280599
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 1 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 15262 nulls
TAXI_IN: 15261 nulls
CRS_ARR_TIME: 2 nulls
ARR_TIME: 15261 nulls
ARR_DELAY: 75794 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 22280599 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 11 nulls
ACTUAL_ELAPSED_TIME: 75077 nulls
AIR_TIME: 75076 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 12198707 nulls
WEATHER_DELAY: 12198707 nulls
NAS_DELAY: 12198707 nulls
SECURITY_DELAY: 12198707 nulls
LATE_AIRCRAFT_DELAY: 12198707 nulls


In [20]:
# handling crs dep time null value
from pyspark.sql.functions import col, floor, when

# Convert DEP_TIME to total minutes
delayed_flights = delayed_flights.withColumn(
    "DEP_TOTAL_MIN",
    floor(col("DEP_TIME") / 100) * 60 + (col("DEP_TIME") % 100)
)

# Fill missing CRS_DEP_TIME using DEP_TIME - DEP_DELAY
delayed_flights = delayed_flights.withColumn(
    "CRS_DEP_TIME",
    when(
        col("CRS_DEP_TIME").isNull(),
        (floor((col("DEP_TOTAL_MIN") - col("DEP_DELAY")) / 60) * 100.0) + ((col("DEP_TOTAL_MIN") - col("DEP_DELAY")) % 60)
    ).otherwise(col("CRS_DEP_TIME"))
)

# Drop helper column
delayed_flights = delayed_flights.drop("DEP_TOTAL_MIN")


In [21]:
from pyspark.sql.functions import col, lit

delayed_flights = delayed_flights.filter(
    ~(col("ARR_TIME").isNull() & col("WHEELS_ON").isNull() & col("TAXI_IN").isNull())
)

delayed_flights = delayed_flights.withColumn("CANCELLATION_CODE", lit("None"))

In [22]:
# handling air time missing values
from pyspark.sql.functions import col, floor, when

# Convert WHEELS_OFF and WHEELS_ON to total minutes
delayed_flights = delayed_flights.withColumn(
    "WHEELS_OFF_MIN",
    floor(col("WHEELS_OFF") / 100) * 60 + (col("WHEELS_OFF") % 100)
).withColumn(
    "WHEELS_ON_MIN",
    floor(col("WHEELS_ON") / 100) * 60 + (col("WHEELS_ON") % 100)
)

# Fill AIR_TIME only where it's null and WHEELS_ON is not null
delayed_flights = delayed_flights.withColumn(
    "AIR_TIME",
    when(
        col("AIR_TIME").isNull() & col("WHEELS_ON").isNotNull(),
        when(
            col("WHEELS_ON_MIN") >= col("WHEELS_OFF_MIN"),
            col("WHEELS_ON_MIN") - col("WHEELS_OFF_MIN")
        ).otherwise(
            (col("WHEELS_ON_MIN") + 1440) - col("WHEELS_OFF_MIN")  # handles flights crossing midnight
        )
    ).otherwise(col("AIR_TIME"))
)

# Drop temp columns
delayed_flights = delayed_flights.drop("WHEELS_OFF_MIN", "WHEELS_ON_MIN")


In [23]:
# handling arr delay missing values
from pyspark.sql.functions import col, floor, when

# Convert CRS_ARR_TIME and ARR_TIME to total minutes
delayed_flights = delayed_flights.withColumn(
    "CRS_ARR_TOTAL_MIN",
    floor(col("CRS_ARR_TIME") / 100) * 60 + (col("CRS_ARR_TIME") % 100)
).withColumn(
    "ARR_TOTAL_MIN",
    floor(col("ARR_TIME") / 100) * 60 + (col("ARR_TIME") % 100)
)

# Fill ARR_DELAY only if it is null and both times are not null
delayed_flights = delayed_flights.withColumn(
    "ARR_DELAY",
    when(
        col("ARR_DELAY").isNull() & col("ARR_TIME").isNotNull() & col("CRS_ARR_TIME").isNotNull(),
        when(
            col("ARR_TOTAL_MIN") >= col("CRS_ARR_TOTAL_MIN"),
            col("ARR_TOTAL_MIN") - col("CRS_ARR_TOTAL_MIN")
        ).otherwise(
            (col("ARR_TOTAL_MIN") + 1440) - col("CRS_ARR_TOTAL_MIN")  # handles midnight
        )
    ).otherwise(col("ARR_DELAY"))
)

# Step 3: Drop helper columns
delayed_flights = delayed_flights.drop("CRS_ARR_TOTAL_MIN", "ARR_TOTAL_MIN")


In [24]:
# handling actual elapsed time missing value
from pyspark.sql.functions import col, floor, when

# Step 1: Convert ARR_TIME and DEP_TIME to total minutes
delayed_flights = delayed_flights.withColumn(
    "ARR_TOTAL_MIN",
    floor(col("ARR_TIME") / 100) * 60 + (col("ARR_TIME") % 100)
).withColumn(
    "DEP_TOTAL_MIN",
    floor(col("DEP_TIME") / 100) * 60 + (col("DEP_TIME") % 100)
)

# Step 2: Calculate ACTUAL_ELAPSED_TIME only if it is null
delayed_flights = delayed_flights.withColumn(
    "ACTUAL_ELAPSED_TIME",
    when(
        col("ACTUAL_ELAPSED_TIME").isNull() & col("ARR_TIME").isNotNull(),
        when(
            col("ARR_TOTAL_MIN") >= col("DEP_TOTAL_MIN"),
            col("ARR_TOTAL_MIN") - col("DEP_TOTAL_MIN")
        ).otherwise(
            (col("ARR_TOTAL_MIN") + 1440) - col("DEP_TOTAL_MIN")  # Handles crossing midnight
        )
    ).otherwise(col("ACTUAL_ELAPSED_TIME"))
)

# Step 3: Drop helper columns
delayed_flights = delayed_flights.drop("ARR_TOTAL_MIN", "DEP_TOTAL_MIN")


In [25]:
# handling crs elapsed time missing values
from pyspark.sql.functions import col, floor, when

# Step 1: Convert CRS_ARR_TIME and CRS_DEP_TIME to total minutes
delayed_flights = delayed_flights.withColumn(
    "CRS_ARR_TOTAL_MIN",
    floor(col("CRS_ARR_TIME") / 100) * 60 + (col("CRS_ARR_TIME") % 100)
).withColumn(
    "CRS_DEP_TOTAL_MIN",
    floor(col("CRS_DEP_TIME") / 100) * 60 + (col("CRS_DEP_TIME") % 100)
)

# Step 2: Fill CRS_ELAPSED_TIME only if null
delayed_flights = delayed_flights.withColumn(
    "CRS_ELAPSED_TIME",
    when(
        col("CRS_ELAPSED_TIME").isNull() & col("CRS_ARR_TIME").isNotNull(),
        when(
            col("CRS_ARR_TOTAL_MIN") >= col("CRS_DEP_TOTAL_MIN"),
            col("CRS_ARR_TOTAL_MIN") - col("CRS_DEP_TOTAL_MIN")
        ).otherwise(
            (col("CRS_ARR_TOTAL_MIN") + 1440) - col("CRS_DEP_TOTAL_MIN")  # Handles midnight wrap
        )
    ).otherwise(col("CRS_ELAPSED_TIME"))
)

# Step 3: Drop temp columns
delayed_flights = delayed_flights.drop("CRS_ARR_TOTAL_MIN", "CRS_DEP_TOTAL_MIN")


In [26]:
columns_to_fill_with_0 = [
    "LATE_AIRCRAFT_DELAY",
    "SECURITY_DELAY",
    "NAS_DELAY",
    "WEATHER_DELAY",
    "CARRIER_DELAY"
]

delayed_flights = delayed_flights.fillna(0, subset=columns_to_fill_with_0)

In [27]:
from pyspark.sql.functions import col

columns_to_check = [
    "TAXI_IN",
    "AIR_TIME",
    "WHEELS_ON",
    "ACTUAL_ELAPSED_TIME",
    "ARR_TIME",
    "CRS_ARR_TIME",
    "CRS_ELAPSED_TIME"
]

# Build a combined filter: keep only rows where all are NOT null
delayed_flights = delayed_flights.filter(
    ~(
        col("TAXI_IN").isNull() |
        col("AIR_TIME").isNull() |
        col("WHEELS_ON").isNull() |
        col("ACTUAL_ELAPSED_TIME").isNull() |
        col("ARR_TIME").isNull() |
        col("CRS_ARR_TIME").isNull() |
        col("CRS_ELAPSED_TIME").isNull()
    )
)


In [28]:
from pyspark.sql import functions as F

row_count = delayed_flights.count()

print("Number of rows in delayed_flights:", row_count)
print("-----------------------------------------")
for column in delayed_flights.columns:
    null_count = delayed_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in delayed_flights: 22265335
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 0 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 0 nulls
WEATHER_DELAY: 0 nulls
NAS_DELAY: 0 nulls
SECURITY_DELAY: 0 nulls
LATE_AIRCRAFT_DELAY: 0 nulls


In [29]:
from pyspark.sql import functions as F

row_count = non_delayed_flights.count()

print("Number of rows in non_delayed_flights:", row_count)
print("-----------------------------------------")
for column in non_delayed_flights.columns:
    null_count = non_delayed_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in non_delayed_flights: 38298412
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 8538 nulls
TAXI_IN: 8538 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 8538 nulls
ARR_DELAY: 72192 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 38298412 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 12 nulls
ACTUAL_ELAPSED_TIME: 70449 nulls
AIR_TIME: 70449 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 36989899 nulls
WEATHER_DELAY: 36989899 nulls
NAS_DELAY: 36989899 nulls
SECURITY_DELAY: 36989899 nulls
LATE_AIRCRAFT_DELAY: 36989899 nulls


In [30]:
from pyspark.sql.functions import col, lit

non_delayed_flights = non_delayed_flights.filter(
    ~(col("ARR_TIME").isNull() & col("WHEELS_ON").isNull() & col("TAXI_IN").isNull())
)

non_delayed_flights = non_delayed_flights.withColumn("CANCELLATION_CODE", lit("None"))

columns_to_fill_with_0 = [
    "LATE_AIRCRAFT_DELAY",
    "SECURITY_DELAY",
    "NAS_DELAY",
    "WEATHER_DELAY",
    "CARRIER_DELAY"
]

non_delayed_flights = non_delayed_flights.fillna(0, subset=columns_to_fill_with_0)

In [31]:
# handling arr delay missing values
from pyspark.sql.functions import col, floor, when

# Convert CRS_ARR_TIME and ARR_TIME to total minutes
non_delayed_flights = non_delayed_flights.withColumn(
    "CRS_ARR_TOTAL_MIN",
    floor(col("CRS_ARR_TIME") / 100) * 60 + (col("CRS_ARR_TIME") % 100)
).withColumn(
    "ARR_TOTAL_MIN",
    floor(col("ARR_TIME") / 100) * 60 + (col("ARR_TIME") % 100)
)

# Fill ARR_DELAY only if it is null and both times are not null
non_delayed_flights = non_delayed_flights.withColumn(
    "ARR_DELAY",
    when(
        col("ARR_DELAY").isNull(),
        when(
            col("ARR_TOTAL_MIN") >= col("CRS_ARR_TOTAL_MIN"),
            col("ARR_TOTAL_MIN") - col("CRS_ARR_TOTAL_MIN")
        ).otherwise(
            (col("ARR_TOTAL_MIN") + 1440) - col("CRS_ARR_TOTAL_MIN")  # handles midnight
        )
    ).otherwise(col("ARR_DELAY"))
)

# Step 3: Drop helper columns
non_delayed_flights = non_delayed_flights.drop("CRS_ARR_TOTAL_MIN", "ARR_TOTAL_MIN")


In [32]:
# handling crs elapsed time missing values
from pyspark.sql.functions import col, floor, when

# Step 1: Convert CRS_ARR_TIME and CRS_DEP_TIME to total minutes
non_delayed_flights = non_delayed_flights.withColumn(
    "CRS_ARR_TOTAL_MIN",
    floor(col("CRS_ARR_TIME") / 100) * 60 + (col("CRS_ARR_TIME") % 100)
).withColumn(
    "CRS_DEP_TOTAL_MIN",
    floor(col("CRS_DEP_TIME") / 100) * 60 + (col("CRS_DEP_TIME") % 100)
)

# Step 2: Fill CRS_ELAPSED_TIME only if null
non_delayed_flights = non_delayed_flights.withColumn(
    "CRS_ELAPSED_TIME",
    when(
        col("CRS_ELAPSED_TIME").isNull(),
        when(
            col("CRS_ARR_TOTAL_MIN") >= col("CRS_DEP_TOTAL_MIN"),
            col("CRS_ARR_TOTAL_MIN") - col("CRS_DEP_TOTAL_MIN")
        ).otherwise(
            (col("CRS_ARR_TOTAL_MIN") + 1440) - col("CRS_DEP_TOTAL_MIN")  # Handles midnight wrap
        )
    ).otherwise(col("CRS_ELAPSED_TIME"))
)

# Step 3: Drop temp columns
non_delayed_flights = non_delayed_flights.drop("CRS_ARR_TOTAL_MIN", "CRS_DEP_TOTAL_MIN")


In [33]:
# handling actual elapsed time missing value
from pyspark.sql.functions import col, floor, when

# Step 1: Convert ARR_TIME and DEP_TIME to total minutes
non_delayed_flights = non_delayed_flights.withColumn(
    "ARR_TOTAL_MIN",
    floor(col("ARR_TIME") / 100) * 60 + (col("ARR_TIME") % 100)
).withColumn(
    "DEP_TOTAL_MIN",
    floor(col("DEP_TIME") / 100) * 60 + (col("DEP_TIME") % 100)
)

# Step 2: Calculate ACTUAL_ELAPSED_TIME only if it is null
non_delayed_flights = non_delayed_flights.withColumn(
    "ACTUAL_ELAPSED_TIME",
    when(
        col("ACTUAL_ELAPSED_TIME").isNull(),
        when(
            col("ARR_TOTAL_MIN") >= col("DEP_TOTAL_MIN"),
            col("ARR_TOTAL_MIN") - col("DEP_TOTAL_MIN")
        ).otherwise(
            (col("ARR_TOTAL_MIN") + 1440) - col("DEP_TOTAL_MIN")  # Handles crossing midnight
        )
    ).otherwise(col("ACTUAL_ELAPSED_TIME"))
)

# Step 3: Drop helper columns
non_delayed_flights = non_delayed_flights.drop("ARR_TOTAL_MIN", "DEP_TOTAL_MIN")


In [34]:
# handling air time missing values
from pyspark.sql.functions import col, floor, when

# Convert WHEELS_OFF and WHEELS_ON to total minutes
non_delayed_flights = non_delayed_flights.withColumn(
    "WHEELS_OFF_MIN",
    floor(col("WHEELS_OFF") / 100) * 60 + (col("WHEELS_OFF") % 100)
).withColumn(
    "WHEELS_ON_MIN",
    floor(col("WHEELS_ON") / 100) * 60 + (col("WHEELS_ON") % 100)
)

# Fill AIR_TIME only where it's null and WHEELS_ON is not null
non_delayed_flights = non_delayed_flights.withColumn(
    "AIR_TIME",
    when(
        col("AIR_TIME").isNull(),
        when(
            col("WHEELS_ON_MIN") >= col("WHEELS_OFF_MIN"),
            col("WHEELS_ON_MIN") - col("WHEELS_OFF_MIN")
        ).otherwise(
            (col("WHEELS_ON_MIN") + 1440) - col("WHEELS_OFF_MIN")  # handles flights crossing midnight
        )
    ).otherwise(col("AIR_TIME"))
)

# Drop temp columns
non_delayed_flights = non_delayed_flights.drop("WHEELS_OFF_MIN", "WHEELS_ON_MIN")


In [35]:
from pyspark.sql import functions as F

row_count = non_delayed_flights.count()

print("Number of rows in non_delayed_flights:", row_count)
print("-----------------------------------------")
for column in non_delayed_flights.columns:
    null_count = non_delayed_flights.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in non_delayed_flights: 38289874
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 0 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 0 nulls
WEATHER_DELAY: 0 nulls
NAS_DELAY: 0 nulls
SECURITY_DELAY: 0 nulls
LATE_AIRCRAFT_DELAY: 0 nulls


In [36]:
merged_df = weird_canceled_flights.union(other_canceled_flights).union(delayed_flights).union(non_delayed_flights)

In [38]:
from pyspark.sql import functions as F

row_count = merged_df.count()

print("Number of rows in merged dataframe:", row_count)
print("-----------------------------------------")
for column in merged_df.columns:
    null_count = merged_df.filter(F.col(column).isNull()).count()
    print(f"{column}: {null_count} nulls")

Number of rows in merged dataframe: 61528418
-----------------------------------------
FL_DATE: 0 nulls
OP_CARRIER: 0 nulls
OP_CARRIER_FL_NUM: 0 nulls
ORIGIN: 0 nulls
DEST: 0 nulls
CRS_DEP_TIME: 0 nulls
DEP_TIME: 0 nulls
DEP_DELAY: 0 nulls
TAXI_OUT: 0 nulls
WHEELS_OFF: 0 nulls
WHEELS_ON: 0 nulls
TAXI_IN: 0 nulls
CRS_ARR_TIME: 0 nulls
ARR_TIME: 0 nulls
ARR_DELAY: 0 nulls
CANCELLED: 0 nulls
CANCELLATION_CODE: 0 nulls
DIVERTED: 0 nulls
CRS_ELAPSED_TIME: 0 nulls
ACTUAL_ELAPSED_TIME: 0 nulls
AIR_TIME: 0 nulls
DISTANCE: 0 nulls
CARRIER_DELAY: 0 nulls
WEATHER_DELAY: 0 nulls
NAS_DELAY: 0 nulls
SECURITY_DELAY: 0 nulls
LATE_AIRCRAFT_DELAY: 0 nulls


In [39]:
merged_df.write.option("header", True).csv("cleaned.csv")