In [1]:
#PHASE 1 — Basic Data Inspection (Pandas)

In [2]:
import pandas as pd

DATA_PATH = "C:/Users/Lenovo/PycharmProjects/data analysis and big data/flight_delays.csv"

# Load data
df = pd.read_csv(DATA_PATH)

# Basic inspection
print(df.head())
print(df.info())
print(df.describe())

   FlightID    Airline  FlightNumber Origin Destination ScheduledDeparture  \
0         1     United          4558    ORD         MIA   2024-09-01 08:11   
1         2      Delta          8021    LAX         MIA   2024-09-01 10:25   
2         3  Southwest          7520    DFW         SFO   2024-09-01 16:53   
3         4      Delta          2046    ORD         BOS   2024-09-01 14:44   
4         5      Delta          6049    LAX         SEA   2024-09-01 01:51   

    ActualDeparture  ScheduledArrival     ActualArrival  DelayMinutes  \
0  2024-09-01 08:30  2024-09-01 12:11  2024-09-01 12:19             8   
1  2024-09-01 10:41  2024-09-01 13:25  2024-09-01 13:27             2   
2  2024-09-01 17:05  2024-09-01 17:53  2024-09-01 18:07            14   
3  2024-09-01 15:04  2024-09-01 18:44  2024-09-01 18:34           -10   
4  2024-09-01 02:08  2024-09-01 05:51  2024-09-01 06:15            24   

           DelayReason  Cancelled  Diverted AircraftType TailNumber  Distance  
0           

In [3]:
#PHASE 2 — Basic Cleaning

In [4]:
# Remove rows with missing Airline or DelayMinutes
df = df.dropna(subset=["Airline", "DelayMinutes"])

# Keep only delayed flights
df = df[df["DelayMinutes"] > 0]
df

Unnamed: 0,FlightID,Airline,FlightNumber,Origin,Destination,ScheduledDeparture,ActualDeparture,ScheduledArrival,ActualArrival,DelayMinutes,DelayReason,Cancelled,Diverted,AircraftType,TailNumber,Distance
0,1,United,4558,ORD,MIA,2024-09-01 08:11,2024-09-01 08:30,2024-09-01 12:11,2024-09-01 12:19,8,Weather,True,False,Boeing 737,N71066,1031
1,2,Delta,8021,LAX,MIA,2024-09-01 10:25,2024-09-01 10:41,2024-09-01 13:25,2024-09-01 13:27,2,Air Traffic Control,True,True,Airbus A320,N22657,1006
2,3,Southwest,7520,DFW,SFO,2024-09-01 16:53,2024-09-01 17:05,2024-09-01 17:53,2024-09-01 18:07,14,Weather,True,True,Boeing 737,N95611,2980
4,5,Delta,6049,LAX,SEA,2024-09-01 01:51,2024-09-01 02:08,2024-09-01 05:51,2024-09-01 06:15,24,Air Traffic Control,False,True,Boeing 737,N27417,2298
6,7,Southwest,4188,ORD,JFK,2024-09-01 05:47,2024-09-01 06:02,2024-09-01 10:47,2024-09-01 10:54,7,Weather,False,True,Boeing 777,N25382,1674
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1747620,1747621,Southwest,3902,DFW,SEA,2024-09-01 19:21,2024-09-01 19:31,2024-09-02 01:21,2024-09-02 01:36,15,Weather,True,False,Boeing 737,N30929,1253
1747621,1747622,American Airlines,5071,DFW,SEA,2024-09-01 10:22,2024-09-01 10:38,2024-09-01 13:22,2024-09-01 13:27,5,Weather,False,False,Boeing 737,N22395,2887
1747623,1747624,United,2155,ATL,SEA,2024-09-01 20:43,2024-09-01 20:54,2024-09-01 21:43,2024-09-01 22:03,20,Weather,True,False,Boeing 777,N68154,2185
1747625,1747626,Delta,2940,JFK,SEA,2024-09-01 08:04,2024-09-01 08:07,2024-09-01 13:04,2024-09-01 13:20,16,Maintenance,False,True,Airbus A320,N94591,2793


In [5]:
#PHASE 3 — Exploratory Analysis

In [6]:
airline_summary = (
    df.groupby("Airline")["DelayMinutes"]
    .agg(["count","mean","max","std"])
    .sort_values("mean", ascending=False)
)

print(airline_summary.head())

                    count       mean  max       std
Airline                                            
Southwest          320306  15.508426   30  8.660510
American Airlines  319545  15.503347   30  8.669587
United             319245  15.494739   30  8.652382
Delta              319658  15.487230   30  8.639548


In [7]:
# PHASE 4 — Business Metric

In [8]:
import numpy as np

airline_summary["Severity_Index"] = (
    airline_summary["mean"] * np.log(airline_summary["count"])
)

print(airline_summary.head())

                    count       mean  max       std  Severity_Index
Airline                                                            
Southwest          320306  15.508426   30  8.660510      196.600818
American Airlines  319545  15.503347   30  8.669587      196.499549
United             319245  15.494739   30  8.652382      196.375894
Delta              319658  15.487230   30  8.639548      196.300749


In [9]:
# PHASE 5 — Move to Production Abstraction

In [10]:
DELAY_COL = "DelayMinutes"
GROUP_COL = "Airline"
TOP_N = 5

def severity(avg, count):
    return avg * np.log(count)

In [11]:
# PHASE 6 — Pandas Production Engine

In [12]:
def pandas_engine(path):
    df = pd.read_csv(path)
    df = df[df[DELAY_COL] > 0]

    stats = (
        df.groupby(GROUP_COL)[DELAY_COL]
        .agg(["count","mean","max","std"])
        .rename(columns={
            "count":"Total_Delayed",
            "mean":"Avg_Delay",
            "max":"Max_Delay",
            "std":"Std_Delay"
        })
        .sort_values("Avg_Delay", ascending=False)
    )

    stats["Severity_Index"] = severity(stats["Avg_Delay"], stats["Total_Delayed"])
    return stats.head(TOP_N)

In [13]:
# PHASE 7 — Polars Production Engine

In [23]:
import polars as pl

def polars_engine(path):
    df = pl.read_csv(path)
    df = df.filter(pl.col(DELAY_COL) > 0)

    return (
        df.group_by(GROUP_COL)
        .agg([
            pl.len().alias("Total_Delayed"),
            pl.mean(DELAY_COL).alias("Avg_Delay"),
            pl.max(DELAY_COL).alias("Max_Delay"),
            pl.std(DELAY_COL).alias("Std_Delay")
        ])
        .with_columns(
            (pl.col("Avg_Delay") * pl.col("Total_Delayed").log()).alias("Severity_Index")
        )
        .sort("Avg_Delay", descending=True)
        .head(TOP_N)
    )

In [15]:
# PHASE 8 — Dask Production Engine

In [16]:
import dask.dataframe as dd

def dask_engine(path):
    df = dd.read_csv(path)
    df = df[df[DELAY_COL] > 0]

    stats = df.groupby(GROUP_COL).agg({
        DELAY_COL: ["count","mean","max","std"]
    })

    stats.columns = ["Total_Delayed","Avg_Delay","Max_Delay","Std_Delay"]
    stats["Severity_Index"] = stats["Avg_Delay"] * np.log(stats["Total_Delayed"])

    return stats.sort_values("Avg_Delay", ascending=False).head(TOP_N)

In [17]:
# PHASE 9 — PySpark Production Engine

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, max, stddev, count, log

def spark_engine(path):
    spark = SparkSession.builder.appName("FlightAnalytics").getOrCreate()
    df = spark.read.csv(path, header=True, inferSchema=True)

    df = df.filter(col(DELAY_COL) > 0)

    return (
        df.groupBy(GROUP_COL)
        .agg(
            count(DELAY_COL).alias("Total_Delayed"),
            mean(DELAY_COL).alias("Avg_Delay"),
            max(DELAY_COL).alias("Max_Delay"),
            stddev(DELAY_COL).alias("Std_Delay")
        )
        .withColumn("Severity_Index", col("Avg_Delay") * log(col("Total_Delayed")))
        .orderBy(col("Avg_Delay").desc())
        .limit(TOP_N)
    )

ModuleNotFoundError: No module named 'pyspark'

In [19]:
# PHASE 10 — Orchestration Layer

In [20]:
def run_pipeline():
    print("\n--- PANDAS ---")
    print(pandas_engine(DATA_PATH))

    print("\n--- POLARS ---")
    print(polars_engine(DATA_PATH))

    print("\n--- DASK ---")
    print(dask_engine(DATA_PATH))

    print("\n--- PYSPARK ---")
    spark_engine(DATA_PATH).show()

In [21]:
# PHASE 11 - Execute

In [24]:
run_pipeline()


--- PANDAS ---
                   Total_Delayed  Avg_Delay  Max_Delay  Std_Delay  \
Airline                                                             
Southwest                 320306  15.508426         30   8.660510   
American Airlines         319545  15.503347         30   8.669587   
United                    319245  15.494739         30   8.652382   
Delta                     319658  15.487230         30   8.639548   

                   Severity_Index  
Airline                            
Southwest              196.600818  
American Airlines      196.499549  
United                 196.375894  
Delta                  196.300749  

--- POLARS ---
shape: (4, 6)
┌───────────────────┬───────────────┬───────────┬───────────┬───────────┬────────────────┐
│ Airline           ┆ Total_Delayed ┆ Avg_Delay ┆ Max_Delay ┆ Std_Delay ┆ Severity_Index │
│ ---               ┆ ---           ┆ ---       ┆ ---       ┆ ---       ┆ ---            │
│ str               ┆ u32           ┆ f64       ┆ 

NameError: name 'spark_engine' is not defined