### Setting up proper environment... make sure you have ran 00_colab_setup.ipynb first so you are in the same conda enviornment as me and you got the requirements.txt installed with proper versions and packages installed:

### Also, on the top right of VS Code, make sure to changethe kernel/environment to: 'airline-delay-prediction'

In [1]:
from pathlib import Path
import os
def to_repo_root(start=Path.cwd()):
    for p in [start, *start.parents]:
        if (p/"src").exists() and (p/"requirements.txt").exists():
            os.chdir(p); print("Project root:", p); return
    raise SystemExit("Could not locate project root (needs ./src and ./requirements.txt)")
to_repo_root()


Project root: /Users/nikhilroy/Documents/MSML610/repo


In [2]:
from pathlib import Path, PurePosixPath
import shutil

enriched_dir = Path("data/processed/flights_enriched")
print("Removing:", enriched_dir.resolve())
shutil.rmtree(enriched_dir, ignore_errors=True)
(enriched_dir).mkdir(parents=True, exist_ok=True)  # recreate empty dir


Removing: /Users/nikhilroy/Documents/MSML610/repo/data/processed/flights_enriched


In [3]:
!conda run -n airline-delay-prediction python src/spark_etl.py


Wrote: data/processed/flights_enriched

25/11/13 16:17:53 WARN Utils: Your hostname, Nikhils-MacBook-Pro-9.local resolves to a loopback address: 127.0.0.1; using 192.168.0.22 instead (on interface en0)
25/11/13 16:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/13 16:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/13 16:18:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/11/13 16:18:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                


Ok cool, now that you ran it, you should see in the very top line of the output above being: Wrote: data/processed/flights_enriched

This is where the processed data is stored - as parquet files because I wanted to advance this project and turn it into spark files
so I can use Spark to read in the data because it is faster, cleaner, and is able to handle much larger volumes of data as opposed to Pandas - this is good practice for me to research Spark and gives me the oppurtunity to understand Spark and use it in a real project application.

### Sanity check (post-ETL):

I want to confirm three things after the ETL script runs:

1. Spark actually wrote data/processed/flights_enriched/ and the row count matches the raw flights.csv (i.e., my joins didn’t duplicate or drop rows).

2. The schema has the columns I expect (keys, coords, dates, and the is_delayed label).

3. The delayed-flight rate (BTS rule: ARRIVAL_DELAY ≥ 15) looks reasonable, and there aren’t obvious null issues in the critical columns. If these checks pass, I’m confident the ETL stage is behaving as intended.

In [4]:
from pathlib import Path
parts = sorted(Path("data/processed/flights_enriched").glob("part-*.parquet"))
print("Num parts:", len(parts))
print([p.name for p in parts[:8]])


Num parts: 8
['part-00000-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00001-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00002-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00003-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00004-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00005-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00006-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet', 'part-00007-b537f10e-86cd-4237-b433-81146249a4ce-c000.snappy.parquet']


In [6]:
! git lfs install
! git lfs pull


Updated Git hooks.
Git LFS initialized.


In [5]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("sanity")
         .config("spark.sql.parquet.enableVectorizedReader","false")
         .config("spark.sql.parquet.mergeSchema","false")
         .getOrCreate())
df = spark.read.parquet("/Users/nikhilroy/Documents/MSML610/repo/data/processed/flights_enriched")
print(df.count())
spark.stop()

25/11/13 16:19:21 WARN Utils: Your hostname, Nikhils-MacBook-Pro-9.local resolves to a loopback address: 127.0.0.1; using 192.168.0.22 instead (on interface en0)
25/11/13 16:19:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/13 16:19:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 8) / 8]

5819079


                                                                                

In [6]:
# Runs the sanity script inside the conda env so Spark starts cleanly
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("etl_sanity_check").getOrCreate()

enriched_path = "data/processed/flights_enriched"
df = spark.read.parquet(enriched_path)
raw = spark.read.option("header", True).csv("data/raw/flights.csv")

print("=== ETL Output Exists / Counts ===")
out_rows = df.count()
raw_rows = raw.count()
diff = out_rows - raw_rows
print(f"Enriched rows: {out_rows:,}")
print(f"Raw flights rows: {raw_rows:,}")
print(f"Row delta (enriched - raw): {diff:+,}  <-- should be 0 in a clean, row-preserving join")

print("\n=== Schema (expect keys, coords, dates, label) ===")
df.printSchema()

print("\n=== Preview ===")
df.select(
    "YEAR","MONTH","DAY","FL_DATE",
    "AIRLINE","AIRLINE_NAME",
    "ORIGIN_AIRPORT","DESTINATION_AIRPORT",
    "ORIGIN_LAT","ORIGIN_LON","DEST_LAT","DEST_LON",
    "ARRIVAL_DELAY","is_delayed"
).show(10, truncate=False)

print("\n=== Label Sanity ===")
delay_rate = df.select(F.mean(F.col("is_delayed").cast("double")).alias("delay_rate")).first()["delay_rate"]
df.groupBy("is_delayed").count().orderBy("is_delayed").show()
print(f"Delayed rate: {delay_rate:.3f}")

print("\n=== Null Coverage on Critical Columns ===")
critical = [
    "AIRLINE","AIRLINE_NAME","ORIGIN_AIRPORT","DESTINATION_AIRPORT",
    "ORIGIN_LAT","ORIGIN_LON","DEST_LAT","DEST_LON",
    "ARRIVAL_DELAY","FL_DATE","is_delayed"
]
cover = df.select([
    (1 - (F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)) / F.count(F.lit(1)))).alias(f"{c}_non_null")
    for c in critical
]).first().asDict()
for k, v in cover.items():
    print(f"{k:>20}: {v:.3f}")

issues = []
if diff != 0:
    issues.append(f"Row mismatch: enriched={out_rows} vs raw={raw_rows} (delta {diff:+})")
for c in ["AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT","ARRIVAL_DELAY","is_delayed","FL_DATE"]:
    if cover.get(f"{c}_non_null", 0) < 0.95:
        issues.append(f"High nulls in {c} (non-null rate={cover.get(f'{c}_non_null', 0):.3f})")

print("\n=== Summary ===")
if issues:
    print("Potential issues detected:")
    for i in issues:
        print(" -", i)
else:
    print("ETL looks good: row count matches, schema sane, label distribution reasonable, and nulls minimal.")

spark.stop()


=== ETL Output Exists / Counts ===


                                                                                

Enriched rows: 5,819,079
Raw flights rows: 5,819,079
Row delta (enriched - raw): +0  <-- should be 0 in a clean, row-preserving join

=== Schema (expect keys, coords, dates, label) ===
root
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: double (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- WHEELS_ON: string (nullable = t

                                                                                

+----+-----+---+----------+-------+----------------------+--------------+-------------------+----------+----------+--------+----------+-------------+----------+
|YEAR|MONTH|DAY|FL_DATE   |AIRLINE|AIRLINE_NAME          |ORIGIN_AIRPORT|DESTINATION_AIRPORT|ORIGIN_LAT|ORIGIN_LON|DEST_LAT|DEST_LON  |ARRIVAL_DELAY|is_delayed|
+----+-----+---+----------+-------+----------------------+--------------+-------------------+----------+----------+--------+----------+-------------+----------+
|2015|1    |1  |2015-01-01|AS     |Alaska Airlines Inc.  |ANC           |SEA                |61.17432  |-149.99619|47.44898|-122.30931|-22.0        |0         |
|2015|1    |1  |2015-01-01|AA     |American Airlines Inc.|LAX           |PBI                |33.94254  |-118.40807|26.68316|-80.09559 |-9.0         |0         |
|2015|1    |1  |2015-01-01|US     |US Airways Inc.       |SFO           |CLT                |37.619    |-122.37484|35.21401|-80.94313 |5.0          |0         |
|2015|1    |1  |2015-01-01|AA     

                                                                                

+----------+-------+
|is_delayed|  count|
+----------+-------+
|      NULL| 105071|
|         0|4650569|
|         1|1063439|
+----------+-------+

Delayed rate: 0.186

=== Null Coverage on Critical Columns ===


                                                                                

    AIRLINE_non_null: 1.000
AIRLINE_NAME_non_null: 1.000
ORIGIN_AIRPORT_non_null: 1.000
DESTINATION_AIRPORT_non_null: 1.000
 ORIGIN_LAT_non_null: 0.916
 ORIGIN_LON_non_null: 0.916
   DEST_LAT_non_null: 0.916
   DEST_LON_non_null: 0.916
ARRIVAL_DELAY_non_null: 0.982
    FL_DATE_non_null: 1.000
 is_delayed_non_null: 0.982

=== Summary ===
ETL looks good: row count matches, schema sane, label distribution reasonable, and nulls minimal.


Just a quick note while we are here (or if you don't wanna run the above code/see the output for my sanity check):

18.6% of our data has a delayed flight - so we know we have an imbalance of our y classes, so accuracy is an invalid score here!

### Ok, now running the 2nd Python files below (which actually uses weather_meteostat.py as a helper file to run merge_weather.py)

### Beaware, this takes time to run because I had to do extensive research to find an applicable dataset that would appropiately be compatbile with the current dataset which was provided since there was no info from weather data as provided in the data given from instructions. Therefore, I had to research and find a suitable data to be able to obtain weather data which would fit the current data:

In [7]:
# Weather join (Meteostat)

!conda run -n airline-delay-prediction python src/merge_weather.py


Wrote merged dataset: data/processed/flights_with_weather.parquet, shape=(5819079, 54)




Ok cool, now that you ran it, you should see in the very last line of the output above being: Wrote: data/processed/flights_with_weather.parquet

This is where the processed data is stored - as parquet files because I wanted to advance this project and turn it into spark files
so I can use Spark to read in the data because it is faster, cleaner, and is able to handle much larger volumes of data as opposed to Pandas - this is good practice for me to research Spark and gives me the oppurtunity to understand Spark and use it in a real project application.

### Doing a quick sanity check for merge_weather.py:

In [8]:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("wx_sanity").getOrCreate()
wx = spark.read.parquet("data/processed/flights_with_weather.parquet")

print("Rows:", wx.count())
wx.select(
    (1 - F.avg(F.when(F.col("station_id").isNull(), 1).otherwise(0))).alias("station_id_non_null"),
    (1 - F.avg(F.when(F.col("temp").isNull(), 1).otherwise(0))).alias("temp_non_null"),
    (1 - F.avg(F.when(F.col("rhum").isNull(), 1).otherwise(0))).alias("rhum_non_null"),
    (1 - F.avg(F.when(F.col("prcp").isNull(), 1).otherwise(0))).alias("prcp_non_null"),
    (1 - F.avg(F.when(F.col("wspd").isNull(), 1).otherwise(0))).alias("wspd_non_null")
).show()

wx.select("ORIGIN_AIRPORT","station_id","dep_hour_rounded","temp","rhum","prcp","wspd","pres","is_delayed")\
  .orderBy(F.desc("prcp")).show(10, truncate=False)

spark.stop()


                                                                                

Rows: 5819079


                                                                                

+-------------------+------------------+------------------+------------------+------------------+
|station_id_non_null|     temp_non_null|     rhum_non_null|     prcp_non_null|     wspd_non_null|
+-------------------+------------------+------------------+------------------+------------------+
| 0.9156619114468114|0.8449048380336476|0.8447427848977476|0.7838001511923107|0.8440383778945088|
+-------------------+------------------+------------------+------------------+------------------+



                                                                                

+--------------+----------+-------------------+----+-----+----+----+------+----------+
|ORIGIN_AIRPORT|station_id|dep_hour_rounded   |temp|rhum |prcp|wspd|pres  |is_delayed|
+--------------+----------+-------------------+----+-----+----+----+------+----------+
|RSW           |KRSW0     |2015-07-26 16:00:00|23.3|100.0|71.9|33.5|1014.2|0.0       |
|RSW           |KRSW0     |2015-07-26 16:00:00|23.3|100.0|71.9|33.5|1014.2|0.0       |
|RSW           |KRSW0     |2015-07-26 16:00:00|23.3|100.0|71.9|33.5|1014.2|1.0       |
|RSW           |KRSW0     |2015-07-26 16:00:00|23.3|100.0|71.9|33.5|1014.2|1.0       |
|RSW           |KRSW0     |2015-07-26 16:00:00|23.3|100.0|71.9|33.5|1014.2|1.0       |
|CHS           |72208     |2015-08-31 11:00:00|23.9|94.0 |71.4|0.0 |1016.4|0.0       |
|CHS           |72208     |2015-08-31 11:00:00|23.9|94.0 |71.4|0.0 |1016.4|1.0       |
|CHS           |72208     |2015-08-31 11:00:00|23.9|94.0 |71.4|0.0 |1016.4|0.0       |
|CHS           |72208     |2015-08-31 11:00

### Weather sanity: 

Mapping origin → nearest Meteostat station worked for ~92% of flights. Most core wx fields (temp, rhum, wspd) exist ~84–85% of the time; prcp is lower (~78%) but still OK. This is good enough to keep weather in the feature set. Multiple flights can share the exact same hourly weather (same station + hour), so seeing duplicates in the preview is expected.

## Final Sanity Check Before Moving on To EDA + Modeling:

In [9]:
import pandas as pd
import pyarrow.dataset as ds

# Load entire merged dataset into memory (EDA-level inspection)
dataset = ds.dataset("data/processed/flights_with_weather.parquet", format="parquet")
df = dataset.to_table().to_pandas()

# Fix numeric + label columns
numeric_cols = [
    "DEPARTURE_DELAY", "ARRIVAL_DELAY", "DISTANCE", "AIR_TIME",
    "temp", "rhum", "prcp", "wspd", "pres"
]
for c in numeric_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

if "is_delayed" in df.columns:
    df["is_delayed"] = df["is_delayed"].fillna(0).astype(int)

# --- Print column names + data types ---
print("=== ALL COLUMNS ({} total) ===".format(len(df.columns)))
for i, col in enumerate(df.columns, start=1):
    print(f"{i:02d}. {col}  ({df[col].dtype})")

print("\n=== QUICK SUMMARY ===")
print(f"Total rows: {len(df):,}")
print(f"Unique airlines: {df['AIRLINE_NAME'].nunique() if 'AIRLINE_NAME' in df.columns else 'N/A'}")
print(f"Unique origin airports: {df['ORIGIN_AIRPORT'].nunique() if 'ORIGIN_AIRPORT' in df.columns else 'N/A'}")
print(f"Unique destination airports: {df['DESTINATION_AIRPORT'].nunique() if 'DESTINATION_AIRPORT' in df.columns else 'N/A'}")
print(f"Delayed flights (is_delayed=1): {df['is_delayed'].sum():,}")
print(f"On-time flights (is_delayed=0): {(df['is_delayed']==0).sum():,}")

print("\n SHOWING TOP 10 ROWS WITH ALL COLUMNS AN EXAMPLE FOR A FEEL OF FINAL DATASET MERGED WITH WEATHER")

df.head(10)


=== ALL COLUMNS (54 total) ===
01. DESTINATION_AIRPORT  (object)
02. ORIGIN_AIRPORT  (object)
03. AIRLINE  (object)
04. YEAR  (int32)
05. MONTH  (int32)
06. DAY  (int32)
07. DAY_OF_WEEK  (object)
08. FLIGHT_NUMBER  (object)
09. TAIL_NUMBER  (object)
10. SCHEDULED_DEPARTURE  (object)
11. DEPARTURE_TIME  (object)
12. DEPARTURE_DELAY  (float64)
13. TAXI_OUT  (object)
14. WHEELS_OFF  (object)
15. SCHEDULED_TIME  (object)
16. ELAPSED_TIME  (object)
17. AIR_TIME  (float64)
18. DISTANCE  (float64)
19. WHEELS_ON  (object)
20. TAXI_IN  (object)
21. SCHEDULED_ARRIVAL  (object)
22. ARRIVAL_TIME  (object)
23. ARRIVAL_DELAY  (float64)
24. DIVERTED  (float64)
25. CANCELLED  (float64)
26. CANCELLATION_REASON  (object)
27. AIR_SYSTEM_DELAY  (object)
28. SECURITY_DELAY  (object)
29. AIRLINE_DELAY  (object)
30. LATE_AIRCRAFT_DELAY  (object)
31. WEATHER_DELAY  (object)
32. FL_DATE  (object)
33. dep_hour_rounded  (datetime64[us])
34. is_delayed  (int64)
35. AIRLINE_NAME  (object)
36. ORIGIN_CITY  (object)

Unnamed: 0,DESTINATION_AIRPORT,ORIGIN_AIRPORT,AIRLINE,YEAR,MONTH,DAY,DAY_OF_WEEK,FLIGHT_NUMBER,TAIL_NUMBER,SCHEDULED_DEPARTURE,...,temp,dwpt,rhum,prcp,snow,wdir,wspd,wpgt,pres,tsun
0,SEA,ANC,AS,2015,1,1,4,98,N407AS,5,...,3.3,0.5,82.0,1.3,,150.0,11.2,,1024.8,
1,PBI,LAX,AA,2015,1,1,4,2336,N3KUAA,10,...,9.4,-10.2,24.0,0.0,,,0.0,,1018.0,
2,CLT,SFO,US,2015,1,1,4,840,N171US,20,...,10.0,2.8,61.0,0.0,,,0.0,,1018.5,
3,MIA,LAX,AA,2015,1,1,4,258,N3HYAA,20,...,9.4,-10.2,24.0,0.0,,,0.0,,1018.0,
4,ANC,SEA,AS,2015,1,1,4,135,N527AS,25,...,0.0,-5.6,66.0,0.0,,30.0,14.8,,1032.2,
5,MSP,SFO,DL,2015,1,1,4,806,N3730B,25,...,10.0,2.8,61.0,0.0,,,0.0,,1018.5,
6,MSP,LAS,NK,2015,1,1,4,612,N635NK,25,...,2.8,-8.9,42.0,0.0,,170.0,9.4,,1015.4,
7,CLT,LAX,US,2015,1,1,4,2013,N584UW,30,...,9.4,-10.2,24.0,0.0,,,0.0,,1018.0,
8,DFW,SFO,AA,2015,1,1,4,1112,N3LAAA,30,...,10.0,2.8,61.0,0.0,,,0.0,,1018.5,
9,ATL,LAS,DL,2015,1,1,4,1173,N826DN,30,...,2.8,-8.9,42.0,0.0,,170.0,9.4,,1015.4,


Units: we used SI (Celsius, mm, m/s)