In [2]:
import sys, os

# Always go 1 level up from "transformation for use cases" to project root
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import plotly.express as px
from loading.loader import write_to_mysql

spark = SparkSession.builder \
    .appName("Airport Traffic Monitor") \
    .getOrCreate()

parquet_path = "/Users/parthmac/Desktop/Projects/Flight Data Engineering/data/wrangled data/flattened_flight_data_parquet"
df = spark.read.parquet(parquet_path)

df.printSchema()
df.show(10, truncate=False)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/02 13:48:49 WARN Utils: Your hostname, Parths-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.249 instead (on interface en0)
25/09/02 13:48:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/02 13:48:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/02 13:48:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- aircraft_iata: string (nullable = true)
 |-- aircraft_icao: string (nullable = true)
 |-- aircraft_icao24: string (nullable = true)
 |-- aircraft_manufacturer: string (nullable = true)
 |-- aircraft_model: string (nullable = true)
 |-- aircraft_registration: string (nullable = true)
 |-- airline_iata: string (nullable = true)
 |-- airline_icao: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- arrival_actual: string (nullable = true)
 |-- arrival_actual_runway: string (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- arrival_baggage: string (nullable = true)
 |-- arrival_delay: long (nullable = true)
 |-- arrival_estimated: string (nullable = true)
 |-- arrival_estimated_runway: string (nullable = true)
 |-- arrival_gate: string (nullable = true)
 |-- arrival_iata: string (nullable = true)
 |-- arrival_icao: string (nullable = true)
 |-- arrival_scheduled: string (nullable = true)
 |-- arrival_terminal: string (nullable = true)
 |--

25/09/02 13:48:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------------+-------------+---------------+---------------------+--------------+---------------------+------------+------------+-----------------+-------------------------+-------------------------+-------------------------------+---------------+-------------+-------------------------+-------------------------+------------+------------+------------+-------------------------+----------------+-------------------+-------------------------+-------------------------+-----------------------------------+---------------+-------------------------+--------------------------+--------------+--------------+--------------+-------------------------+------------------+------------------+------------------------------+------------------------------+------------------------------+-----------------------------+-----------------------------+-------------------------------+-----------+-----------+-------------+-----------+-------------+----+
|aircraft_iata|aircraft_icao|aircraft_icao24|aircraft_manufactu

In [3]:
flight_status_counts = df.groupBy("flight_status").agg(F.count("*").alias("count")).orderBy(F.desc("count"))
flight_status_counts.show(truncate=False)

+-------------+-----+
|flight_status|count|
+-------------+-----+
|landed       |79984|
|delayed      |10088|
|cancelled    |9928 |
+-------------+-----+



In [4]:

flight_status_counts_pd = flight_status_counts.toPandas()
# Custom colors for each status
colors = {
    'landed': '#2ca02c',       # green
    'delayed': '#ff7f0e',      # orange
    'cancelled': '#d62728',    # red
    'scheduled': '#1f77b4'     # blue
}

fig = px.pie(
    flight_status_counts_pd,
    names='flight_status',
    values='count',
    title='✈️ Flight Status Distribution',
    color='flight_status',
    color_discrete_map=colors,
    hole=0.4  # makes it a donut chart
)

# Add count + percentage inside slices
fig.update_traces(textinfo='percent+label', pull=[0.05, 0.05, 0.05, 0.05], marker=dict(line=dict(color='#000000', width=2)))

# Layout styling
fig.update_layout(
    title_font_size=24,
    legend_title_text='Flight Status',
    legend_font_size=14,
    margin=dict(t=50, b=0, l=0, r=0)
)

fig.show()


In [5]:
# Aggregate cancelled and delayed flights by date
status_trend = (
    df.filter(F.col("flight_status").isin("cancelled", "delayed"))
      .groupBy("flight_date", "flight_status")
      .agg(F.count("*").alias("count"))
      .orderBy("flight_date")
)

status_trend.show(10, truncate=False)

+-----------+-------------+-----+
|flight_date|flight_status|count|
+-----------+-------------+-----+
|2025-08-22 |cancelled    |1021 |
|2025-08-22 |delayed      |1016 |
|2025-08-23 |delayed      |1031 |
|2025-08-23 |cancelled    |996  |
|2025-08-24 |delayed      |995  |
|2025-08-24 |cancelled    |936  |
|2025-08-25 |delayed      |968  |
|2025-08-25 |cancelled    |1008 |
|2025-08-26 |cancelled    |995  |
|2025-08-26 |delayed      |1015 |
+-----------+-------------+-----+
only showing top 10 rows


In [6]:
# Convert to pandas
status_trend_pd = status_trend.toPandas()

# Create line chart with color by flight_status
fig = px.line(
    status_trend_pd,
    x="flight_date",
    y="count",
    color="flight_status",
    title="✈️ Cancelled vs Delayed Flights Trend Over Past 9 Days",
    markers=True,
    color_discrete_map={
        "cancelled": "#c1100d",  # blue (like your cancelled formatting)
        "delayed": "#b6d627"     # red for delayed
    }
)

# Style traces
fig.update_traces(
    line=dict(width=4),
    marker=dict(size=10, symbol="circle"),
    hovertemplate="Date: %{x}<br>%{legendgroup} Flights: %{y}"
)

# Layout improvements
fig.update_layout(
    title_font_size=24,
    xaxis_title="Flight Date",
    yaxis_title="Number of Flights",
    xaxis=dict(
        showgrid=True,
        gridcolor='lightgrey',
        tickangle=0,
        tickformat="%b %d"   # e.g., "Aug 01"
    ),
    yaxis=dict(
        showgrid=True,
        gridcolor='lightgrey'
    ),
    plot_bgcolor='white',
    hovermode='x unified',
    margin=dict(t=50, b=50, l=80, r=150),  # extra right margin for legend
    legend=dict(
        title="Flight Status",
        orientation="v",    # vertical stack
        yanchor="top", y=1,
        xanchor="left", x=1.05  # push legend outside right side
    )
)

# Highlight peaks for each status
for status, marker_symbol in [("cancelled", "diamond"), ("delayed", "star")]:
    df_status = status_trend_pd[status_trend_pd["flight_status"] == status]
    max_idx = df_status["count"].idxmax()
    fig.add_scatter(
        x=[df_status.loc[max_idx, "flight_date"]],
        y=[df_status.loc[max_idx, "count"]],
        mode="markers+text",
        marker=dict(
            size=14,
            symbol=marker_symbol,
            color="grey"
        ),
        text=[f"<b>Peak {status.capitalize()}: {df_status.loc[max_idx, 'count']}</b>"],
        textposition="top center",
        textfont=dict(size=11, color="black"),
        showlegend=False
    )

fig.show()

In [7]:
#saving the dataframes to mysql database after transformations
write_to_mysql(flight_status_counts, "flight_status_counts", mode="overwrite")
write_to_mysql(status_trend, "flight_status_trend", mode="overwrite")

✅ Data written to MySQL (localhost:3306/flight_db) → Table: flight_status_counts
✅ Data written to MySQL (localhost:3306/flight_db) → Table: flight_status_trend
