In [1]:
import pandas as pd
import requests
import os
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, trim, udf, row_number, monotonically_increasing_id, to_date, from_unixtime, substring
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.window import Window

from dotenv import load_dotenv
from datetime import datetime, UTC

from geopy.distance import geodesic

import psycopg2
from sqlalchemy import create_engine, text


In [2]:
spark = SparkSession.builder.appName("SkyTrack DE") \
                            .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27") \
                            .getOrCreate()

:: loading settings :: url = jar:file:/opt/anaconda3/envs/skytrack-env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/admin/.ivy2/cache
The jars for the packages stored in: /Users/admin/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-894e4e47-fb5d-4ef9-8634-49b3561efe82;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.27 in central
	found org.checkerframework#checker-qual;3.5.0 in central
:: resolution report :: resolve 115ms :: artifacts dl 7ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.27 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------------

In [3]:
spark

In [5]:
load_dotenv(override=True)

True

## Ingestion (Raw/Bronze Layer)

In [6]:
def fetch_and_save_flights(url):
    try:
        response = requests.get(url)
        data = response.json()

        timestamp = datetime.now(UTC).strftime("%Y-%m-%d-%H-%M-%S")
        out_dir= f"data/raw/{timestamp}"

        os.makedirs(out_dir, exist_ok=True)

        with open(f"{out_dir}/flights.json", "w") as file:
            json.dump(data, file)

    except Exception as e:
        print(f"Something went very wrong: {e}")


25/07/21 11:34:39 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [56]:
if __name__ == "__main__":
    url = "https://opensky-network.org/api/states/all"

    fetch_and_save_flights(url)

## Extraction

In [8]:
# Collect all raw folders from today

raw_data_path = "data/raw"
today = datetime.now(UTC).strftime("%Y-%m-%d")

# all_json_folders = os.listdir(raw_data_path)

# today_folders = []

# for folder in all_json_folders:
#     if today in folder:
#         today_folders.append(folder)


folders = [f"{raw_data_path}/{f}" for f in os.listdir(raw_data_path) if today in f]
   

# print(all_json_folders)
# print(today_folders)
# print(folders)

In [9]:
flights_df_from_file = spark.read.json(folders)
flights_df_from_file.show()

+--------------------+----------+
|              states|      time|
+--------------------+----------+
|[[4b1815, SWR3YX ...|1753087022|
+--------------------+----------+



## Transformation (Silver Layer)

In [10]:
flights_df_exploded = flights_df_from_file.select(
    col("time").alias("snapshot_time"),
    explode(col("states")).alias("states_array")
)

flights_df_exploded.show()

+-------------+--------------------+
|snapshot_time|        states_array|
+-------------+--------------------+
|   1753087022|[4b1815, SWR3YX  ...|
|   1753087022|[4b1818, SWR2HX  ...|
|   1753087022|[a8a812, N657PT  ...|
|   1753087022|[408120, , United...|
|   1753087022|[88044a, AIQ4112 ...|
|   1753087022|[aa56db, UAL1317 ...|
|   1753087022|[408124, EXS14H  ...|
|   1753087022|[39de4a, TVF97NG ...|
|   1753087022|[88044f, AIQ1041 ...|
|   1753087022|[39de4d, TVF29UL ...|
|   1753087022|[408121, VIR12E  ...|
|   1753087022|[39de4c, TVF97TG ...|
|   1753087022|[7c6b2b, JST248  ...|
|   1753087022|[801642, AXB264  ...|
|   1753087022|[880459, AIQ243  ...|
|   1753087022|[801645, AIC2717 ...|
|   1753087022|[a1abef, N2069Q  ...|
|   1753087022|[842194, ADO67   ...|
|   1753087022|[440da5, EJU81MA ...|
|   1753087022|[880454, AIQ5500 ...|
+-------------+--------------------+
only showing top 20 rows



In [11]:
flights_df = flights_df_exploded.select(
    col("snapshot_time"),
    col("states_array")[0].alias("icao24"),
    col("states_array")[1].alias("callsign"),
    col("states_array")[2].alias("origin_country"),
    col("states_array")[3].alias("time_position"),
    col("states_array")[4].alias("last_contact"),
    col("states_array")[5].alias("longitude").cast(DoubleType()),
    col("states_array")[6].alias("latitude").cast(DoubleType()),
    col("states_array")[7].alias("baro_altitude"),
    col("states_array")[8].alias("on_ground"),
    col("states_array")[9].alias("velocity"),
    col("states_array")[10].alias("true_track"),
    col("states_array")[11].alias("vertical_rate"),
    col("states_array")[12].alias("sensors"),
    col("states_array")[13].alias("geo_altitude"),
    col("states_array")[14].alias("squawk"),
    col("states_array")[15].alias("spi"),
    col("states_array")[16].alias("position_source"),
    col("states_array")[17].alias("category")
)

flights_df.show()

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+
|   1753087022|4b1815|SWR3YX  |   Switzerland|   1753087021|  1753087021|   4.4327| 42.5747|      11277.6|    false|  208.22|    210.43|            0|   NULL|     11772.9|  3071|false|              0|    NULL|
|   1753087022|4b1818|SWR2HX  |   Switzerland|   1753087021|  1753087021|   2.3704| 49.0855|      2484.12|    false|  156.29|      6.24|        17.56|   NULL|  

In [13]:
# Remove trailing spaces from callsign column (Trimming) 
flights_df = flights_df.withColumn("callsign", trim(col("callsign")))

# # Convert altitude (geometric and barometric) from meters to feat, for easy readability
# flights_df = flights_df.withColumn("baro_altitude_ft", col("baro_altitude") * 3.28084) \
#                       .withColumn("geo_altitude_ft", col("geo_altitude") * 3.28084)

# # Convert velocity from m/s to m/h, for easy readability
# flights_df = flights_df.withColumn("velocity_mph", col("velocity") * 2.23694)


print("Flight data extracted successfully.")

flights_df.show()


Flight data extracted successfully.
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+
|   1753087022|4b1815|  SWR3YX|   Switzerland|   1753087021|  1753087021|   4.4327| 42.5747|      11277.6|    false|  208.22|    210.43|            0|   NULL|     11772.9|  3071|false|              0|    NULL|
|   1753087022|4b1818|  SWR2HX|   Switzerland|   1753087021|  1753087021|   2.3704| 49.0855|      2484.12|    false|  156.29

In [14]:
# Deriving dim_airports

dim_airports = spark.read.csv("data/airports.csv", header= True, inferSchema=True)

dim_airports = dim_airports.select(
                            col("id"),
                            col("name"),
                            col("type"), 
                            col("latitude_deg").alias("latitude"), 
                            col("longitude_deg").alias("longitude"),
                            col("iso_country").alias("country"),
                            col("icao_code").alias("icao"),
                            col("iata_code").alias("iata"),
                            col("gps_code"),
                        ).dropDuplicates()

dim_airports.show()

[Stage 8:>                                                          (0 + 3) / 3]

+------+--------------------+--------------+------------------+-------------------+-------+----+----+--------+
|    id|                name|          type|          latitude|          longitude|country|icao|iata|gps_code|
+------+--------------------+--------------+------------------+-------------------+-------+----+----+--------+
| 31542|Gasmata Island Ai...| small_airport|    -6.27111005783|      150.330993652|     PG|AYGT| GMI|    AYGT|
|307247|     Woitape Airport| small_airport|         -8.545833|           147.2525|     PG|AYWT| WTP|    AYWT|
|  1826|Hudson's Hope Air...| small_airport|         56.035142|        -121.978326|     CA|CYNH| YNH|    CYNH|
|  1838|Rainbow Lake Airport|medium_airport|         58.491402|        -119.407997|     CA|CYOP| YOP|    CYOP|
|  1928|Montreal / Pierre...| large_airport|         45.467837|         -73.742294|     CA|CYUL| YUL|    CYUL|
|  2557|     Thisted Airport|medium_airport|         57.068802|            8.70522|     DK|EKTS| TED|    EKTS|
|

                                                                                

In [15]:
# Leaving the list of airports to only big, medium and small. Filtering out closed, 
# heliports, balloonport, large_airport, seaplane_base.. e.t.c

dim_airports = dim_airports.filter(dim_airports["type"].isin(['large_airport', 'medium_airport', 'small_airport']))

dim_airports.show(5)

[Stage 11:>                                                         (0 + 3) / 3]

+------+--------------------+--------------+--------------+-------------+-------+----+----+--------+
|    id|                name|          type|      latitude|    longitude|country|icao|iata|gps_code|
+------+--------------------+--------------+--------------+-------------+-------+----+----+--------+
| 31542|Gasmata Island Ai...| small_airport|-6.27111005783|150.330993652|     PG|AYGT| GMI|    AYGT|
|307247|     Woitape Airport| small_airport|     -8.545833|     147.2525|     PG|AYWT| WTP|    AYWT|
|  1826|Hudson's Hope Air...| small_airport|     56.035142|  -121.978326|     CA|CYNH| YNH|    CYNH|
|  1838|Rainbow Lake Airport|medium_airport|     58.491402|  -119.407997|     CA|CYOP| YOP|    CYOP|
|  1928|Montreal / Pierre...| large_airport|     45.467837|   -73.742294|     CA|CYUL| YUL|    CYUL|
+------+--------------------+--------------+--------------+-------------+-------+----+----+--------+
only showing top 5 rows



                                                                                

In [16]:
# Identifying null values in dim_airports

for column in dim_airports.columns:
    print(column, " NULLS ====> " , dim_airports.filter(dim_airports[column].isNull()).count())

id  NULLS ====>  0
name  NULLS ====>  0
type  NULLS ====>  0
latitude  NULLS ====>  0
longitude  NULLS ====>  0
country  NULLS ====>  0
icao  NULLS ====>  38715


                                                                                

iata  NULLS ====>  38609
gps_code  NULLS ====>  15108


In [17]:
# Fill up of unknown values

dim_airports = dim_airports.fillna({
    "icao": "Unknown",
    "iata": "Unknown",
    "gps_code": "Unknown"
})

In [18]:
# Defining a udf to calculate accurate distance

@udf(returnType=DoubleType())
def geo_distance_in_km(lat1,lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None

    return geodesic((lat1,lon1), (lat2,lon2)).km



In [19]:
#  Creating a bounding box for each flight's long and lat

delta = 1.5

flights_boundings = flights_df.withColumn("lat_min", col("latitude") - delta) \
                            .withColumn("lat_max", col("latitude") + delta) \
                            .withColumn("long_min", col("longitude") - delta) \
                            .withColumn("long_max", col("longitude") + delta)

flights_boundings.show()
                            

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+--------+--------+--------------------+-------------------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category| lat_min| lat_max|            long_min|           long_max|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+--------+--------+--------------------+-------------------+
|   1753087022|4b1815|  SWR3YX|   Switzerland|   1753087021|  1753087021|   4.4327| 42.5747|      11277.6|    false|  208.22|    210.43|            0|   NULL|     11772.9|  3071|false|         

In [20]:
# Join for flights and dim_airports (Only cross joining 
# flights with airports whose locations are within the flight's bounding box)

flights_x_airs = flights_boundings.alias("flight") \
                     .join(dim_airports.alias("airport"), \
                                (col("airport.latitude") >= col("flight.lat_min")) & (col("airport.latitude") <= col("flight.lat_max")) & \
                                (col("airport.longitude") >= col("flight.long_min")) & (col("airport.longitude") <= col("flight.long_max")) \
                                ) \
                     .select(
                         "flight.*",
                         col("airport.name").alias("airport_name"),
                         col("airport.id").alias("airport_id"),                
                         col("airport.name").alias("airport_name"),          
                         col("airport.type").alias("airport_type"),      
                         col("airport.latitude").alias("airport_lat"),
                         col("airport.longitude").alias("airport_long"),
                         col("airport.country").alias("airport_country"),
                         col("airport.icao").alias("airport_icao"),
                         col("airport.iata").alias("airport_iata"),
                         col("airport.gps_code").alias("airport_gps_code")
                     )

flights_x_airs.show()

25/07/21 11:36:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 54:>                                                         (0 + 1) / 1]

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+-------+-------+------------------+--------+--------------------+----------+--------------------+--------------+-----------+------------+---------------+------------+------------+----------------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category|lat_min|lat_max|          long_min|long_max|        airport_name|airport_id|        airport_name|  airport_type|airport_lat|airport_long|airport_country|airport_icao|airport_iata|airport_gps_code|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+----

                                                                                

In [21]:
# Calculate column distance_km between flights and airports to the joined df

flights_x_airs = flights_x_airs.withColumn("distance_km", 
                                            geo_distance_in_km(col("latitude"), col("longitude"), 
                                                               col("airport_lat"), col("airport_long") 
                                                            )
                                          )

flights_x_airs.show(5)

[Stage 58:>                                                         (0 + 1) / 1]

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+-------+-------+------------------+--------+--------------------+----------+--------------------+--------------+-----------+------------+---------------+------------+------------+----------------+------------------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category|lat_min|lat_max|          long_min|long_max|        airport_name|airport_id|        airport_name|  airport_type|airport_lat|airport_long|airport_country|airport_icao|airport_iata|airport_gps_code|       distance_km|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+

                                                                                

In [22]:

windowSpec = Window.partitionBy("snapshot_time", "icao24").orderBy("distance_km")

flights_x_airs_closests = flights_x_airs.withColumn("rank",
                                                    row_number().over(windowSpec)
                                                    )

flights_x_airs_closests.show()

[Stage 62:>                                                         (0 + 1) / 1]

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+-------+-------+--------+--------+--------------------+----------+--------------------+--------------+-------------------+------------------+---------------+------------+------------+----------------+------------------+----+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category|lat_min|lat_max|long_min|long_max|        airport_name|airport_id|        airport_name|  airport_type|        airport_lat|      airport_long|airport_country|airport_icao|airport_iata|airport_gps_code|       distance_km|rank|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+-------

                                                                                

In [23]:
flights_x_airs_closests = flights_x_airs_closests.filter(col("rank") == 1).drop("rank")

In [24]:
flights_x_airs_closests.columns

fact_flights_df = flights_x_airs_closests.select(
    monotonically_increasing_id().alias("flight_id"),
    "snapshot_time",
    "icao24",
    "callsign",
    "origin_country",
    "time_position",
    "longitude",
    "latitude",
    col("geo_altitude").alias("altitude"),
    "on_ground",
    "velocity",
    col("true_track").alias("heading"),
    col("airport_id").alias("closest_airport_id"),
    col("distance_km").alias("airport_distance_km"),
    to_date(from_unixtime(col("snapshot_time"))).alias("flight_date")
)

fact_flights_df.show()



+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+
|flight_id|snapshot_time|icao24|callsign|origin_country|time_position|longitude|latitude|altitude|on_ground|velocity|heading|closest_airport_id|airport_distance_km|flight_date|
+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+
|        0|   1753087022|00834c| LNK861C|  South Africa|   1753086844|  29.5509|-25.5143| 9128.76|    false|  239.61|   67.4|              2807| 21.918568648399088| 2025-07-21|
|        1|   1753087022|00834d| LNK314X|  South Africa|   1753086938|  18.5998|-33.9718|    NULL|     true|     0.9|    315|              2775|  0.795863865822511| 2025-07-21|
|        2|   1753087022|00869f|  KEM901|  South Africa|   1753087020|  19.1401|-33.8569| 4434.84|    false|  147.9

                                                                                

In [25]:
fact_flights_df = fact_flights_df.withColumn("airline_code", substring(col("callsign"), 1, 3))

fact_flights_df.show(5)



+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+
|flight_id|snapshot_time|icao24|callsign|origin_country|time_position|longitude|latitude|altitude|on_ground|velocity|heading|closest_airport_id|airport_distance_km|flight_date|airline_code|
+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+
|        0|   1753087022|00834c| LNK861C|  South Africa|   1753086844|  29.5509|-25.5143| 9128.76|    false|  239.61|   67.4|              2807| 21.918568648399088| 2025-07-21|         LNK|
|        1|   1753087022|00834d| LNK314X|  South Africa|   1753086938|  18.5998|-33.9718|    NULL|     true|     0.9|    315|              2775|  0.795863865822511| 2025-07-21|         LNK|
|        2|   1753087022|00869f|  KEM901|  South A

                                                                                

In [26]:
dim_airports.show()

[Stage 79:>                                                         (0 + 3) / 3]

+------+--------------------+--------------+------------------+-------------------+-------+----+----+--------+
|    id|                name|          type|          latitude|          longitude|country|icao|iata|gps_code|
+------+--------------------+--------------+------------------+-------------------+-------+----+----+--------+
| 31542|Gasmata Island Ai...| small_airport|    -6.27111005783|      150.330993652|     PG|AYGT| GMI|    AYGT|
|307247|     Woitape Airport| small_airport|         -8.545833|           147.2525|     PG|AYWT| WTP|    AYWT|
|  1826|Hudson's Hope Air...| small_airport|         56.035142|        -121.978326|     CA|CYNH| YNH|    CYNH|
|  1838|Rainbow Lake Airport|medium_airport|         58.491402|        -119.407997|     CA|CYOP| YOP|    CYOP|
|  1928|Montreal / Pierre...| large_airport|         45.467837|         -73.742294|     CA|CYUL| YUL|    CYUL|
|  2557|     Thisted Airport|medium_airport|         57.068802|            8.70522|     DK|EKTS| TED|    EKTS|
|

                                                                                

In [27]:
fact_flights_x_airs = flights_x_airs_closests

fact_flights_x_airs.show()

[Stage 85:>                                                         (0 + 1) / 1]

+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+-------+------------+------+-----+---------------+--------+--------+-------------------+--------+--------+--------------------+----------+--------------------+--------------+-------------------+------------------+---------------+------------+------------+----------------+-------------------+
|snapshot_time|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|sensors|geo_altitude|squawk|  spi|position_source|category| lat_min|            lat_max|long_min|long_max|        airport_name|airport_id|        airport_name|  airport_type|        airport_lat|      airport_long|airport_country|airport_icao|airport_iata|airport_gps_code|        distance_km|
+-------------+------+--------+--------------+-------------+------------+---------+--------+-------------+--------

                                                                                

In [28]:
# Creating the Airline Dimension
dim_airline = spark.read.csv("data/silver/airlines.dat", header=False, inferSchema=True)

dim_airline = dim_airline.selectExpr(
    "_c0 as airline_id",
    "_c1 as airline_name",
    "_c2 as airline_alias",
    "_c4 as airline_icao",
    "_c5 as airline_callsign",
    "_c6 as country",
    "_c7 as active"
).filter((col("_c4") != "\\N") & (col("_c4").isNotNull()) & (col("_c4") != "N/A")) \
.drop("airline_id")

# Add montonically_increading_id for airline_id

dim_airline = dim_airline.withColumn("airline_id", monotonically_increasing_id() + 1)\
.select("airline_id", "airline_name", "airline_icao", "airline_callsign", "country", "active")

dim_airline.show()

+----------+--------------------+------------+----------------+--------------+------+
|airline_id|        airline_name|airline_icao|airline_callsign|       country|active|
+----------+--------------------+------------+----------------+--------------+------+
|         1|         135 Airways|         GNL|         GENERAL| United States|     N|
|         2|       1Time Airline|         RNX|         NEXTIME|  South Africa|     Y|
|         3|2 Sqn No 1 Elemen...|         WYT|            NULL|United Kingdom|     N|
|         4|     213 Flight Unit|         TFU|            NULL|        Russia|     N|
|         5|223 Flight Unit S...|         CHD|  CHKALOVSK-AVIA|        Russia|     N|
|         6|   224th Flight Unit|         TTF|      CARGO UNIT|        Russia|     N|
|         7|         247 Jet Ltd|         TWF|    CLOUD RUNNER|United Kingdom|     N|
|         8|         3D Aviation|         SEC|         SECUREX| United States|     N|
|         9|         40-Mile Air|         MLA|        

In [29]:
# Creating the Aircraft Dimension

dim_aircraft_base = fact_flights_df.select("icao24").dropna().dropDuplicates()

dim_aircraft_base.show(5)




+------+
|icao24|
+------+
|06a109|
|06a2f6|
|3455d9|
|348382|
|393f0c|
+------+
only showing top 5 rows



                                                                                

In [30]:
dim_aircraft_base.count()

                                                                                

6202

In [31]:
dim_aircraft = spark.read.csv("data/silver/aircrafts.csv", header= True, inferSchema=True)

# Clean column names

for old_column in dim_aircraft.columns:
    new_column = old_column.strip().strip("'")
    if old_column != new_column:
        dim_aircraft = dim_aircraft.withColumnRenamed(old_column, new_column)

dim_aircraft = dim_aircraft.select([
    "icao24",
    "model",
    "manufacturerName",
    "manufacturerIcao",
    "typecode",
    "icaoAircraftClass",
    "categoryDescription",
    "country",
    "engines",
    "built",
    "serialNumber",
    "registration",
    "status"
])

dim_aircraft.show(5)

                                                                                

+--------+---------+----------------+----------------+--------+-----------------+--------------------+-------+-------+-----+------------+------------+------+
|  icao24|    model|manufacturerName|manufacturerIcao|typecode|icaoAircraftClass| categoryDescription|country|engines|built|serialNumber|registration|status|
+--------+---------+----------------+----------------+--------+-----------------+--------------------+-------+-------+-----+------------+------------+------+
|'000000'|   unknow|              ''|              ''|      ''|               ''|                  ''|   NULL|     ''| NULL|          ''|          ''|    ''|
|'000001'|Antonov 2|              ''|    SHIJIAZHUANG|     AN2|              L1P|                  ''|   NULL|     ''| NULL|          ''|      SP-FGR|    ''|
|'000002'|       ''|              ''|              ''|      ''|               ''| Light (< 15500 lbs)|   NULL|     ''| NULL|          ''|          ''|    ''|
|'000003'|       ''|              ''|              '

In [32]:
# Final dim_aircraft, filtered to only those present in the fact table

dim_aircraft = dim_aircraft_base.join( dim_aircraft, ["icao24"], how="left")

dim_aircraft = dim_aircraft.withColumnRenamed("icao24", "aircraft_id")

dim_aircraft.show(5)



+-----------+-----+----------------+----------------+--------+-----------------+-------------------+-------+-------+-----+------------+------------+------+
|aircraft_id|model|manufacturerName|manufacturerIcao|typecode|icaoAircraftClass|categoryDescription|country|engines|built|serialNumber|registration|status|
+-----------+-----+----------------+----------------+--------+-----------------+-------------------+-------+-------+-----+------------+------------+------+
|     06a109| NULL|            NULL|            NULL|    NULL|             NULL|               NULL|   NULL|   NULL| NULL|        NULL|        NULL|  NULL|
|     06a2f6| NULL|            NULL|            NULL|    NULL|             NULL|               NULL|   NULL|   NULL| NULL|        NULL|        NULL|  NULL|
|     3455d9| NULL|            NULL|            NULL|    NULL|             NULL|               NULL|   NULL|   NULL| NULL|        NULL|        NULL|  NULL|
|     348382| NULL|            NULL|            NULL|    NULL|  

                                                                                

In [33]:
fact_flights_df = fact_flights_df.join(dim_airline, fact_flights_df.airline_code == dim_airline.airline_icao, how="left") \
                                .join(dim_aircraft, fact_flights_df.icao24 ==  dim_aircraft.aircraft_id, how="left")\
                                .select(["flight_id", "snapshot_time", "icao24", "callsign", "origin_country", "time_position",
                                         "longitude", "latitude", "altitude", "on_ground", "velocity", "heading", "closest_airport_id", 
                                         "airport_distance_km", "flight_date", "airline_code", "airline_id", "aircraft_id"]) \
                                .filter(col("airline_id").isNotNull())


fact_flights_df.show(5)



+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+----------+-----------+
|flight_id|snapshot_time|icao24|callsign|origin_country|time_position|longitude|latitude|altitude|on_ground|velocity|heading|closest_airport_id|airport_distance_km|flight_date|airline_code|airline_id|aircraft_id|
+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+----------+-----------+
|        0|   1753087022|00834c| LNK861C|  South Africa|   1753086844|  29.5509|-25.5143| 9128.76|    false|  239.61|   67.4|              2807| 21.918568648399088| 2025-07-21|         LNK|      1025|     00834c|
|        1|   1753087022|00834d| LNK314X|  South Africa|   1753086938|  18.5998|-33.9718|    NULL|     true|     0.9|    315|              2775|  0.

                                                                                

In [34]:
fact_flights_df.show(5)



+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+----------+-----------+
|flight_id|snapshot_time|icao24|callsign|origin_country|time_position|longitude|latitude|altitude|on_ground|velocity|heading|closest_airport_id|airport_distance_km|flight_date|airline_code|airline_id|aircraft_id|
+---------+-------------+------+--------+--------------+-------------+---------+--------+--------+---------+--------+-------+------------------+-------------------+-----------+------------+----------+-----------+
|        0|   1753087022|00834c| LNK861C|  South Africa|   1753086844|  29.5509|-25.5143| 9128.76|    false|  239.61|   67.4|              2807| 21.918568648399088| 2025-07-21|         LNK|      1025|     00834c|
|        1|   1753087022|00834d| LNK314X|  South Africa|   1753086938|  18.5998|-33.9718|    NULL|     true|     0.9|    315|              2775|  0.

                                                                                

In [35]:
#  Save dataframes to CSV

fact_flights_df.write.mode("append").parquet("data/gold/fact")
dim_aircraft.write.mode("overwrite").parquet("data/gold/aircrafts")
dim_airline.write.mode("overwrite").parquet("data/gold/airlines")
dim_airports.write.mode("overwrite").parquet("data/gold/airports")

                                                                                

## Loading

In [36]:
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME").lower()

In [37]:
# Creating the Database

try:
    conn = psycopg2.connect(f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/postgres")
    conn.autocommit = True

    cur = conn.cursor()

    cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")

    exists = cur.fetchone()

    if exists:
        print(f"Database {DB_NAME} already exists.")
    else:
        cur.execute(f"CREATE DATABASE {DB_NAME}")
        print(f"Database {DB_NAME} created successfully.")

except Exception as e:
    print(f"Something went very wrong: {e}")

Database skytrack already exists.


In [39]:
# Creating Schema

url = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine =  create_engine(url)

with engine.begin() as conn:
    conn.execute(text("CREATE SCHEMA IF NOT EXISTS skytracker"))

conn.close()

In [41]:
# Loading data into the database

db_properties = {
    "user": DB_USER,
    "password": DB_PASS,
    "driver": "org.postgresql.Driver"
}

jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"

fact_flights_df.write.jdbc(url=jdbc_url, table="skytracker.facts", mode="append", properties=db_properties)
dim_aircraft.write.jdbc(url=jdbc_url, table="skytracker.aircrafts", mode="append", properties=db_properties)
dim_airline.write.jdbc(url=jdbc_url, table="skytracker.airlines", mode="append", properties=db_properties)
dim_airports.write.jdbc(url=jdbc_url, table="skytracker.airports", mode="append", properties=db_properties)

                                                                                