In [0]:
from pyspark.sql.functions import col

In [0]:
DATA_DIR = "/Volumes/flight/default/flightdelay"
OUTPUT_DIR = "/Volumes/flight/default/cleaned_flightdelay"

files = [
    "flight_with_weather_2016.csv",
    "flight_with_weather_2017.csv",
    "flight_with_weather_2018.csv",
    "flight_with_weather_2019.csv",
    "flight_with_weather_2020.csv",
    "flight_with_weather_2021.csv",
    "flight_with_weather_2022.csv",
    "flight_with_weather_2023.csv",
    "flight_with_weather_2024.csv"
]

In [0]:
important_columns = [
    "DEP_DELAY",
    "ARR_DELAY",
    "TAXI_OUT",
    "TAXI_IN",
    "AIR_TIME",
    "O_TEMP",
    "O_PRCP",
    "O_WSPD",
    "D_TEMP",
    "D_PRCP",
    "D_WSPD"
]

In [0]:
for file in files:
    
    print("===================================")
    print(f"Processing file: {file}")
    print("===================================")
    
    file_path = f"{DATA_DIR}/{file}"

Processing file: flight_with_weather_2016.csv
Processing file: flight_with_weather_2017.csv
Processing file: flight_with_weather_2018.csv
Processing file: flight_with_weather_2019.csv
Processing file: flight_with_weather_2020.csv
Processing file: flight_with_weather_2021.csv
Processing file: flight_with_weather_2022.csv
Processing file: flight_with_weather_2023.csv
Processing file: flight_with_weather_2024.csv


In [0]:
    df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(file_path)

In [0]:
    print("Schema:")
    df.printSchema()

Schema:
root
 |-- FL_DATE: timestamp (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: double (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: timestamp (nullable = true)
 |-- DEP_TIME: timestamp (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: timestamp (nullable = true)
 |-- WHEELS_ON: timestamp (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: timestamp (nullable = true)
 |-- ARR_TIME: timestamp (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- FLIGHTS: double (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- ORIGIN_INDEX: integer (nullable = true)


In [0]:
print("Rows before cleaning:", df.count())

Rows before cleaning: 6284841


In [0]:
print("NULL count before cleaning:")
for c in important_columns:
    null_count = df.filter(col(c).isNull()).count()
    print(f"{c} -> {null_count}")

NULL count before cleaning:
DEP_DELAY -> 0
ARR_DELAY -> 0
TAXI_OUT -> 0
TAXI_IN -> 0
AIR_TIME -> 0
O_TEMP -> 513
O_PRCP -> 513
O_WSPD -> 513
D_TEMP -> 750
D_PRCP -> 750
D_WSPD -> 750


In [0]:
df_clean = df.dropna(subset=important_columns)

In [0]:
print("Rows after cleaning:", df_clean.count())

Rows after cleaning: 6283578


In [0]:
print("NULL count after cleaning:")
for c in important_columns:
    print(c, "->", df_clean.filter(col(c).isNull()).count())


NULL count after cleaning:
DEP_DELAY -> 0
ARR_DELAY -> 0
TAXI_OUT -> 0
TAXI_IN -> 0
AIR_TIME -> 0
O_TEMP -> 0
O_PRCP -> 0
O_WSPD -> 0
D_TEMP -> 0
D_PRCP -> 0
D_WSPD -> 0


In [0]:
OUTPUT_DIR = "/Volumes/flight/default/cleaned_flightdelay"

In [0]:
df_clean.write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(output_path)

print(f"Cleaned file saved at: {output_path}")

Cleaned file saved at: /Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2024


In [0]:
from pyspark.sql.functions import col

In [0]:
INPUT_DIR = "/Volumes/flight/default/flightdelay"
OUTPUT_DIR = "/Volumes/flight/default/cleaned_flightdelay"

In [0]:
files = [
    "flight_with_weather_2016.csv",
    "flight_with_weather_2017.csv",
    "flight_with_weather_2018.csv",
    "flight_with_weather_2019.csv",
    "flight_with_weather_2020.csv",
    "flight_with_weather_2021.csv",
    "flight_with_weather_2022.csv",
    "flight_with_weather_2023.csv",
    "flight_with_weather_2024.csv"
]

In [0]:
important_columns = [
    "DEP_DELAY",
    "ARR_DELAY",
    "TAXI_OUT",
    "TAXI_IN",
    "AIR_TIME",
    "O_TEMP",
    "O_PRCP",
    "O_WSPD",
    "D_TEMP",
    "D_PRCP",
    "D_WSPD"
]

In [0]:
for file in files:

    print("\n========================================")
    print("Processing file:", file)
    print("========================================")

    # ---- Read file ----
    input_path = f"{INPUT_DIR}/{file}"
    df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(input_path)

    # ---- Before cleaning checks ----
    print("Rows before cleaning:", df.count())

    for c in important_columns:
        print(c, "NULL rows:", df.filter(col(c).isNull()).count())

    # ---- Cleaning ----
    df_clean = df.dropna(subset=important_columns)

    # ---- After cleaning checks ----
    print("Rows after cleaning:", df_clean.count())

    for c in important_columns:
        print(c, "NULL rows after:", df_clean.filter(col(c).isNull()).count())

    # ---- Output path (VERY IMPORTANT) ----
    year_folder = file.replace(".csv", "")
    output_path = f"{OUTPUT_DIR}/{year_folder}"

    print("Saving cleaned data to:", output_path)

    # ---- Save cleaned file ----
    df_clean.write \
        .mode("overwrite") \
        .option("header", True) \
        .csv(output_path)

    print("Saved successfully ✔")



Processing file: flight_with_weather_2016.csv
Rows before cleaning: 5537987
DEP_DELAY NULL rows: 0
ARR_DELAY NULL rows: 0
TAXI_OUT NULL rows: 0
TAXI_IN NULL rows: 0
AIR_TIME NULL rows: 0
O_TEMP NULL rows: 8995
O_PRCP NULL rows: 8995
O_WSPD NULL rows: 8995
D_TEMP NULL rows: 9079
D_PRCP NULL rows: 9079
D_WSPD NULL rows: 9079
Rows after cleaning: 5519913
DEP_DELAY NULL rows after: 0
ARR_DELAY NULL rows after: 0
TAXI_OUT NULL rows after: 0
TAXI_IN NULL rows after: 0
AIR_TIME NULL rows after: 0
O_TEMP NULL rows after: 0
O_PRCP NULL rows after: 0
O_WSPD NULL rows after: 0
D_TEMP NULL rows after: 0
D_PRCP NULL rows after: 0
D_WSPD NULL rows after: 0
Saving cleaned data to: /Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2016
Saved successfully ✔

Processing file: flight_with_weather_2017.csv
Rows before cleaning: 5575872
DEP_DELAY NULL rows: 0
ARR_DELAY NULL rows: 0
TAXI_OUT NULL rows: 0
TAXI_IN NULL rows: 0
AIR_TIME NULL rows: 0
O_TEMP NULL rows: 980
O_PRCP NULL rows: 980
O_

In [0]:
dbutils.fs.ls("/Volumes/flight/default/cleaned_flightdelay")

[FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2016/', name='flight_with_weather_2016/', size=0, modificationTime=1768965555517),
 FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2017/', name='flight_with_weather_2017/', size=0, modificationTime=1768965555517),
 FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2018/', name='flight_with_weather_2018/', size=0, modificationTime=1768965555517),
 FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2019/', name='flight_with_weather_2019/', size=0, modificationTime=1768965555517),
 FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2020/', name='flight_with_weather_2020/', size=0, modificationTime=1768965555517),
 FileInfo(path='dbfs:/Volumes/flight/default/cleaned_flightdelay/flight_with_weather_2021/', name='flight_with_weather_2021/', size=0, modificationTime=17689655

In [0]:
CLEAN_DIR = "/Volumes/flight/default/cleaned_flightdelay"
TYPED_DIR = "/Volumes/flight/default/typed_flightdelay"

In [0]:
%sql
CREATE VOLUME flight.default.typed_flightdelay;

In [0]:
folders = [
    "flight_with_weather_2016",
    "flight_with_weather_2017",
    "flight_with_weather_2018",
    "flight_with_weather_2019",
    "flight_with_weather_2020",
    "flight_with_weather_2021",
    "flight_with_weather_2022",
    "flight_with_weather_2023",
    "flight_with_weather_2024"
]

In [0]:
from pyspark.sql.functions import col, try_to_timestamp, date_format, expr
from pyspark.sql.types import IntegerType, DoubleType

In [0]:
for folder in folders:

    print("\n========================================")
    print("Final datatype processing for:", folder)
    print("========================================")

    # -------- READ CLEANED DATA (ALL STRING) --------
    input_path = f"{CLEAN_DIR}/{folder}"
    df = spark.read.option("header", True).csv(input_path)

    print("Schema BEFORE:")
    df.printSchema()

    # -------- 1. FL_DATE → DATETIME (TimestampType) --------
    df = df.withColumn(
        "FL_DATE",
        try_to_timestamp(col("FL_DATE"))
    )

    # remove invalid datetime rows
    df = df.filter(col("FL_DATE").isNotNull())

    # -------- 2. FLIGHT NUMBER (STRING → DOUBLE → INT SAFE) --------
    df = df.withColumn(
        "OP_CARRIER_FL_NUM",
        expr("try_cast(OP_CARRIER_FL_NUM as double)")
    )

    df = df.withColumn(
        "OP_CARRIER_FL_NUM",
        expr("try_cast(OP_CARRIER_FL_NUM as int)")
    )

    # -------- 3. TIME COLUMNS (STRING → HH:mm) --------
    time_cols = [
        "CRS_DEP_TIME", "DEP_TIME",
        "WHEELS_OFF", "WHEELS_ON",
        "CRS_ARR_TIME", "ARR_TIME"
    ]

    for c in time_cols:
        df = df.withColumn(
            c,
            date_format(
                try_to_timestamp(col(c)),
                "HH:mm"
            )
        )

    # -------- 4. INTEGER COLUMNS --------
    int_cols = [
        "MONTH",
        "DAY_OF_MONTH",
        "DAY_OF_WEEK",
        "ORIGIN_INDEX",
        "DEST_INDEX"
    ]

    for c in int_cols:
        df = df.withColumn(
            c,
            expr(f"try_cast({c} as int)")
        )

    # -------- 5. DOUBLE COLUMNS --------
    double_cols = [
        "DEP_DELAY", "ARR_DELAY",
        "TAXI_OUT", "TAXI_IN",
        "CRS_ELAPSED_TIME",
        "ACTUAL_ELAPSED_TIME",
        "AIR_TIME",
        "FLIGHTS",
        "O_TEMP", "O_PRCP", "O_WSPD",
        "D_TEMP", "D_PRCP", "D_WSPD",
        "O_LATITUDE", "O_LONGITUDE",
        "D_LATITUDE", "D_LONGITUDE"
    ]

    for c in double_cols:
        df = df.withColumn(
            c,
            expr(f"try_cast({c} as double)")
        )

    # -------- 6. COLUMN NAMES (First Letter Capital) --------
    for c in df.columns:
        new_col = c[0].upper() + c[1:]
        df = df.withColumnRenamed(c, new_col)

    # -------- 7. FINAL NULL VALIDATION --------
    print("Final NULL check:")
    for c in df.columns:
        nulls = df.filter(col(c).isNull()).count()
        if nulls > 0:
            print(f"{c} -> {nulls}")

    print("Schema AFTER:")
    df.printSchema()

    # -------- 8. SAVE FINAL TYPED DATA --------
    output_path = f"{TYPED_DIR}/{folder}"

    df.write \
        .mode("overwrite") \
        .option("header", True) \
        .csv(output_path)

    print("Final typed data saved to:", output_path)


Final datatype processing for: flight_with_weather_2016
Schema BEFORE:
root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- CRS_ELAPSED_TIME: string (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- FLIGHTS: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY_OF_MONTH: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- 

In [0]:
final_path = "/Volumes/flight/default/typed_flightdelay/flight_with_weather_2020"

df_final = spark.read.option("header", True).csv(final_path)

df_final.display(5, truncate=False)

FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,ORIGIN_INDEX,DEST_INDEX,O_TEMP,O_PRCP,O_WSPD,D_TEMP,D_PRCP,D_WSPD,O_LATITUDE,O_LONGITUDE,D_LATITUDE,D_LONGITUDE
2020-05-31T00:00:00.000Z,AA,2574,STL,PHX,07:45,07:41,-4.0,9.0,07:50,08:37,3.0,09:13,08:40,-33.0,208.0,179.0,167.0,1.0,5,31,7,294,239,17.2,0.0,11.2,31.7,0.0,7.6,38.74769,-90.35999,33.43417,-112.00806
2020-05-31T00:00:00.000Z,WN,1763,STL,PHX,15:35,15:29,-6.0,10.0,15:39,16:23,7.0,16:55,16:30,-25.0,200.0,181.0,164.0,1.0,5,31,7,294,239,23.9,0.0,5.4,35.0,0.0,5.4,38.74769,-90.35999,33.43417,-112.00806
2020-06-01T00:00:00.000Z,AA,2574,STL,PHX,07:45,07:41,-4.0,13.0,07:54,08:39,4.0,09:13,08:43,-30.0,208.0,182.0,165.0,1.0,6,1,1,294,239,17.8,0.0,7.6,32.2,0.0,11.2,38.74769,-90.35999,33.43417,-112.00806
2020-06-01T00:00:00.000Z,WN,1763,STL,PHX,15:35,15:31,-4.0,10.0,15:41,16:26,7.0,16:55,16:33,-22.0,200.0,182.0,165.0,1.0,6,1,1,294,239,25.6,0.0,18.4,33.3,0.0,9.4,38.74769,-90.35999,33.43417,-112.00806
2020-06-02T00:00:00.000Z,WN,1763,STL,PHX,15:35,15:31,-4.0,7.0,15:38,16:21,4.0,16:55,16:25,-30.0,200.0,174.0,163.0,1.0,6,2,2,294,239,28.3,0.0,18.4,35.6,0.0,16.6,38.74769,-90.35999,33.43417,-112.00806
2020-06-03T00:00:00.000Z,AA,2574,STL,PHX,07:45,07:41,-4.0,12.0,07:53,08:45,3.0,09:13,08:48,-25.0,208.0,187.0,172.0,1.0,6,3,3,294,239,22.8,0.0,5.4,33.3,0.0,13.0,38.74769,-90.35999,33.43417,-112.00806
2020-06-03T00:00:00.000Z,WN,1763,STL,PHX,15:35,15:32,-3.0,7.0,15:39,16:30,17.0,16:55,16:47,-8.0,200.0,195.0,171.0,1.0,6,3,3,294,239,30.0,0.0,13.0,35.6,0.0,0.0,38.74769,-90.35999,33.43417,-112.00806
2020-06-04T00:00:00.000Z,AA,2574,STL,PHX,07:45,07:37,-8.0,12.0,07:49,08:54,4.0,09:04,08:58,-6.0,199.0,201.0,185.0,1.0,6,4,4,294,239,22.2,0.3,29.5,35.6,0.0,16.6,38.74769,-90.35999,33.43417,-112.00806
2020-06-04T00:00:00.000Z,WN,1763,STL,PHX,15:35,15:27,-8.0,9.0,15:36,16:30,6.0,16:55,16:36,-19.0,200.0,189.0,174.0,1.0,6,4,4,294,239,24.4,0.0,13.0,36.1,0.0,5.4,38.74769,-90.35999,33.43417,-112.00806
2020-06-05T00:00:00.000Z,AA,2574,STL,PHX,07:45,07:38,-7.0,10.0,07:48,08:53,6.0,09:04,08:59,-5.0,199.0,201.0,185.0,1.0,6,5,5,294,239,20.0,0.0,0.0,33.9,0.0,14.8,38.74769,-90.35999,33.43417,-112.00806


In [0]:
SOURCE_DIR = "/Volumes/flight/default/typed_flightdelay"
TARGET_DIR = "/Volumes/flight/default/final_single_csv"

In [0]:
years = [
    "flight_with_weather_2016",
    "flight_with_weather_2017",
    "flight_with_weather_2018",
    "flight_with_weather_2019",
    "flight_with_weather_2020",
    "flight_with_weather_2021",
    "flight_with_weather_2022",
    "flight_with_weather_2023",
    "flight_with_weather_2024"
]

In [0]:
for y in years:

    print("\n==============================")
    print("Combining year:", y)
    print("==============================")

    input_path = f"{SOURCE_DIR}/{y}"

    # 🔍 Safety check: CSV files exist or not
    files = dbutils.fs.ls(input_path)
    csv_files = [f for f in files if f.name.endswith(".csv")]

    if len(csv_files) == 0:
        print("⚠️ No CSV files found, skipping:", y)
        continue

    # ✅ READ PART FILES (schema WILL infer)
    df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(input_path)

    print("Rows:", df.count())

    # ✅ WRITE SINGLE CSV
    output_path = f"{TARGET_DIR}/{y}"

    df.coalesce(1) \
      .write \
      .mode("overwrite") \
      .option("header", True) \
      .csv(output_path)

    # ✅ RENAME part file → proper name
    for f in dbutils.fs.ls(output_path):
        if f.name.startswith("part-") and f.name.endswith(".csv"):
            dbutils.fs.mv(
                f.path,
                f"{output_path}/{y}.csv"
            )

    print("✅ Single CSV created for:", y)


Combining year: flight_with_weather_2016
Rows: 5519913
✅ Single CSV created for: flight_with_weather_2016

Combining year: flight_with_weather_2017
Rows: 5573738
✅ Single CSV created for: flight_with_weather_2017

Combining year: flight_with_weather_2018
Rows: 6985579
✅ Single CSV created for: flight_with_weather_2018

Combining year: flight_with_weather_2019
Rows: 7160502
✅ Single CSV created for: flight_with_weather_2019

Combining year: flight_with_weather_2020
Rows: 4311232
✅ Single CSV created for: flight_with_weather_2020

Combining year: flight_with_weather_2021
Rows: 5754634
✅ Single CSV created for: flight_with_weather_2021

Combining year: flight_with_weather_2022
Rows: 6412255
✅ Single CSV created for: flight_with_weather_2022

Combining year: flight_with_weather_2023
Rows: 6644052
✅ Single CSV created for: flight_with_weather_2023

Combining year: flight_with_weather_2024
Rows: 6283578
✅ Single CSV created for: flight_with_weather_2024
