In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Creating the Silver Schema
spark.sql("CREATE SCHEMA IF NOT EXISTS silver") 

DataFrame[]

In [0]:
df_bronze = spark.read.table("raw.flights_bronze")

In [0]:
# This turns one row with 100 flights into 100 separate rows
df_exploded = df_bronze.select(
    col("time"),
    explode(col("states")).alias("flight_data") # Each 'flight_data' is now a single array [id, callsign...]
)

In [0]:
df_exploded.display()

time,flight_data
1768700788,"List(e49406, , Brazil, null, 1768700720, null, null, null, false, 263.57, 32.73, -0.33, null, null, null, false, 0)"
1768700788,"List(a5a8e2, N464EG , United States, 1768700786, 1768700786, -116.1705, 43.5033, 2773.68, false, 121.01, 232.25, 20.16, null, 2941.32, null, false, 0)"
1768700788,"List(c822af, GBA510 , New Zealand, 1768700704, 1768700704, 174.7657, -37.0177, 60.96, false, 46.37, 70.56, -1.95, null, 91.44, null, false, 0)"
1768700788,"List(ac494e, CMD3 , United States, 1768700786, 1768700786, -121.3852, 38.933, 396.24, false, 65.48, 135.64, 0, null, 434.34, null, false, 0)"
1768700788,"List(aa56da, UAL1776 , United States, 1768700787, 1768700787, -84.8793, 41.8072, 9448.8, false, 259.75, 95, 0, null, 8999.22, null, false, 0)"
1768700788,"List(7c6b2f, JST459 , Australia, 1768700787, 1768700787, 152.9338, -29.0553, 5646.42, false, 196.22, 209.19, 5.85, null, 5859.78, null, false, 0)"
1768700788,"List(801641, AXB2934 , India, 1768700778, 1768700778, 77.5689, 13.1907, 1562.1, false, 86.43, 90.34, -4.88, null, 1676.4, null, false, 0)"
1768700788,"List(ab6fdd, AAL2797 , United States, 1768700745, 1768700745, -86.6704, 36.1237, null, true, 5.14, 45, null, null, null, 2510, false, 0)"
1768700788,"List(801645, AIC2953 , India, 1768700786, 1768700786, 73.0654, 19.0914, 906.78, false, 87.98, 269.33, -1.3, null, 861.06, null, false, 0)"
1768700788,"List(842194, ADO58 , Japan, 1768700728, 1768700728, 139.7862, 35.556399999999996, null, true, 0, 267.19, null, null, null, 4740, false, 0)"


In [0]:
#Extracting columns by Index (Mapping based on OpenSky Documentation)
df_parsed = df_exploded.select(
    # Converting Unix timestamp to readable time
    from_unixtime(col("time")).alias("request_time_utc"),
    
    # Extracting specific fields from the array (casting to correct types)
    col("flight_data")[0].cast("string").alias("icao24"),          # Unique ICAO ID
    col("flight_data")[1].cast("string").alias("callsign"),        # Flight number (e.g., UAL123)
    col("flight_data")[2].cast("string").alias("origin_country"),
    col("flight_data")[5].cast("double").alias("longitude"),
    col("flight_data")[6].cast("double").alias("latitude"),
    col("flight_data")[7].cast("double").alias("baro_altitude"),   # Altitude in meters
    col("flight_data")[9].cast("double").alias("velocity"),        # Ground speed in m/s
    col("flight_data")[10].cast("double").alias("true_track"),     # Heading (Degrees)
    col("flight_data")[13].cast("double").alias("geo_altitude")    # Geometric altitude
)

In [0]:
df_parsed.display()

request_time_utc,icao24,callsign,origin_country,longitude,latitude,baro_altitude,velocity,true_track,geo_altitude
2026-01-18 01:46:28,e49406,,Brazil,,,,263.57,32.73,
2026-01-18 01:46:28,a5a8e2,N464EG,United States,-116.1705,43.5033,2773.68,121.01,232.25,2941.32
2026-01-18 01:46:28,c822af,GBA510,New Zealand,174.7657,-37.0177,60.96,46.37,70.56,91.44
2026-01-18 01:46:28,ac494e,CMD3,United States,-121.3852,38.933,396.24,65.48,135.64,434.34
2026-01-18 01:46:28,aa56da,UAL1776,United States,-84.8793,41.8072,9448.8,259.75,95.0,8999.22
2026-01-18 01:46:28,7c6b2f,JST459,Australia,152.9338,-29.0553,5646.42,196.22,209.19,5859.78
2026-01-18 01:46:28,801641,AXB2934,India,77.5689,13.1907,1562.1,86.43,90.34,1676.4
2026-01-18 01:46:28,ab6fdd,AAL2797,United States,-86.6704,36.1237,,5.14,45.0,
2026-01-18 01:46:28,801645,AIC2953,India,73.0654,19.0914,906.78,87.98,269.33,861.06
2026-01-18 01:46:28,842194,ADO58,Japan,139.7862,35.5564,,0.0,267.19,


In [0]:
# Filtering out bad data (Flights on the ground)
df_clean = df_parsed.filter(
    col("latitude").isNotNull() & 
    col("longitude").isNotNull() 
)

### Exploratory Data Analysis (EDA)


In [0]:
# Basic Statistical Summary
display(df_clean.select("velocity", "baro_altitude", "true_track").summary())

summary,velocity,baro_altitude,true_track
count,5121.0,5121.0,5121.0
mean,183.87538371411884,7182.865553602792,183.08025385666872
stddev,74.0836997892721,4160.312787506365,103.6952918332027
min,0.0,7.62,0.0
25%,132.61,2948.94,89.63
50%,203.68,8877.3,186.01
75%,237.2,10668.0,273.88
max,341.85,37581.84,359.85


In [0]:
# Top 10 Countries by Flight Count
country_counts = (df_clean
    .groupBy("origin_country")
    .count()
    .orderBy("count", ascending=False)
    .limit(10)
)

display(country_counts)

origin_country,count
United States,2790
Canada,276
Australia,215
China,204
Japan,174
India,128
Turkey,103
United Arab Emirates,94
Republic of Korea,78
New Zealand,67


Databricks visualization. Run in Databricks to view.

In [0]:
# We take a sample because plotting 10,000+ points can be slow
df_sample = df_clean.sample(fraction=0.1, seed=42) 

display(df_sample.select("baro_altitude", "velocity"))

baro_altitude,velocity
9761.22,198.41
2575.56,145.83
8724.9,262.65
670.56,49.48
1325.88,72.81
11285.22,258.35
10058.4,262.76
10972.8,130.99
10972.8,211.99
11887.2,248.01


Databricks visualization. Run in Databricks to view.

In [0]:
#  Geospatial Plot
display(df_clean.select("latitude", "longitude", "origin_country"))

latitude,longitude,origin_country
43.5033,-116.1705,United States
-37.0177,174.7657,New Zealand
38.933,-121.3852,United States
41.8072,-84.8793,United States
-29.0553,152.9338,Australia
13.1907,77.5689,India
19.0914,73.0654,India
44.8632,-98.863,United States
27.0067,76.8424,India
-32.1824,115.93,Australia


Databricks visualization. Run in Databricks to view.

In [0]:
# Save as Silver Table
(df_clean.write
    .format("delta")
    .mode("overwrite") # For now, I overwrite. In prod,  would 'merge'.
    .option("overwriteSchema", "true")
    .saveAsTable("silver.flights_parsed")
)

print("Silver table 'silver.flights_parsed' created successfully!")

Silver table 'silver.flights_parsed' created successfully!


In [0]:
%sql
SELECT 
    request_time_utc, 
    callsign, 
    origin_country, 
    latitude, 
    longitude, 
    velocity 
FROM silver.flights_parsed 
WHERE origin_country = 'Australia' --Let's check my local flights!
LIMIT 10;

request_time_utc,callsign,origin_country,latitude,longitude,velocity
2026-01-18 01:46:28,JST459,Australia,-29.0553,152.9338,196.22
2026-01-18 01:46:28,KXW,Australia,-32.1824,115.93,50.05
2026-01-18 01:46:28,JST409,Australia,-29.3391,152.2932,230.11
2026-01-18 01:46:28,JST135,Australia,-43.1683,171.7357,197.73
2026-01-18 01:46:28,JST438,Australia,-29.3516,152.8731,262.65
2026-01-18 01:46:28,JST833,Australia,-37.1429,144.7266,141.58
2026-01-18 01:46:28,JST296,Australia,-39.2843,174.5412,185.75
2026-01-18 01:46:28,JST164,Australia,-37.8609,174.0996,212.19
2026-01-18 01:46:28,JST235,Australia,-37.2758,174.9545,178.05
2026-01-18 01:46:28,FD517,Australia,-34.2835,138.2933,121.61


# Weather

In [0]:
# Read from the RAW TABLE (Standard Practice)
df_weather_bronze = spark.read.table("raw.weather_bronze")

In [0]:
# Flatten and Clean
df_weather_silver = df_weather_bronze.select(
    # Create the location keys
    col("latitude").cast("decimal(4,2)").alias("latitude"),
    col("longitude").cast("decimal(5,2)").alias("longitude"),
    col("elevation"),
    
    # Extract timestamp
    to_timestamp(col("current_weather.time")).alias("observation_time"),
    
    # Extract weather metrics
    col("current_weather.temperature").alias("temperature_c"),
    col("current_weather.windspeed").alias("wind_speed_kmh"),
    col("current_weather.winddirection").alias("wind_direction"),
    col("current_weather.weathercode").alias("weather_code"),
    col("current_weather.is_day").cast("boolean").alias("is_day")
)

In [0]:
df_weather_silver.display()

latitude,longitude,elevation,observation_time,temperature_c,wind_speed_kmh,wind_direction,weather_code,is_day
-17.0,145.75,0.0,2026-01-18T03:45:00Z,31.2,7.7,229,3,True
-12.5,130.88,27.0,2026-01-18T03:45:00Z,28.6,12.2,332,80,True
-42.88,147.38,27.0,2026-01-18T03:45:00Z,18.0,19.8,61,3,True
-37.75,145.0,21.0,2026-01-18T03:45:00Z,28.5,8.6,165,0,True
-28.25,153.5,7.0,2026-01-18T03:45:00Z,25.6,13.0,182,80,True
-32.0,115.88,16.0,2026-01-18T03:45:00Z,21.5,17.7,213,0,True
-34.88,138.63,38.0,2026-01-18T03:45:00Z,35.7,7.4,317,2,True
-27.5,153.0,14.0,2026-01-18T03:45:00Z,28.7,9.8,126,80,True
-35.25,149.13,574.0,2026-01-18T03:45:00Z,18.3,28.0,122,3,True
-33.88,151.13,0.0,2026-01-18T03:45:00Z,20.5,15.8,135,80,True


In [0]:
df_weather_clean = df_weather_silver.dropDuplicates(["latitude", "longitude", "observation_time"])

In [0]:
(df_weather_clean.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("silver.weather_parsed")
)

print("Weather data successfully transformed to 'silver.weather_parsed'")

Weather data successfully transformed to 'silver.weather_parsed'
