# Set Paths

Commenting out for security reasons

In [0]:
# raw_path = temporarily commenting
# processed_path = temporarily commenting

# Extract Airport Data

In [0]:
airports_df = spark.read.option("header", True).option("inferSchema", True).csv(raw_path + "airports.csv")
airports_with_icao_df = spark.read.option("header", True).option("inferSchema", True).csv(raw_path + "airports_with_icao.csv")

# Access and Extract Flight and Weather API Data

In [0]:
import requests
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType, DoubleType, BooleanType, ArrayType
)

## Flight Data Extraction

In [0]:
# URL for the OpenSky API endpoint
url = "https://opensky-network.org/api/states/all"

# Request data from the API
response = requests.get(url)
data = response.json()

# Extract the list of flight states
states = data.get('states', [])

In [0]:
def convert_row(row):
    # Make a copy of the row (which is a list)
    new_row = list(row)
    # For fields: longitude (index 5), latitude (6), baro_altitude (7), velocity (9),
    # true_track (10), vertical_rate (11), geo_altitude (13)
    for idx in [5, 6, 7, 9, 10, 11, 13]:
        val = new_row[idx]
        if val is not None:
            new_row[idx] = float(val)
    return new_row

# Convert each row accordingly
states = [convert_row(row) for row in states]

In [0]:
# Define an explicit schema based on the API documentation
schema = StructType([
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("last_contact", LongType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("baro_altitude", DoubleType(), True),
    StructField("on_ground", BooleanType(), True),
    StructField("velocity", DoubleType(), True),
    StructField("true_track", DoubleType(), True),
    StructField("vertical_rate", DoubleType(), True),
    StructField("sensors", ArrayType(LongType()), True),
    StructField("geo_altitude", DoubleType(), True),
    StructField("squawk", StringType(), True),
    StructField("spi", BooleanType(), True),
    StructField("position_source", LongType(), True)
])

In [0]:
# Create a Spark DataFrame using the explicit schema
flights_df = spark.createDataFrame(states, schema=schema)

In [0]:
flights_df_clean = flights_df.drop("sensors")

In [0]:
flights_df_clean.show(5)

+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+------------+------+-----+---------------+
|icao24|callsign|origin_country|time_position|last_contact|longitude|latitude|baro_altitude|on_ground|velocity|true_track|vertical_rate|geo_altitude|squawk|  spi|position_source|
+------+--------+--------------+-------------+------------+---------+--------+-------------+---------+--------+----------+-------------+------------+------+-----+---------------+
|4b1816|SWR935L |   Switzerland|   1742905483|  1742905483|   21.581| 39.1825|     10965.18|    false|  218.93|    321.49|         0.33|    11308.08|  6627|false|              0|
|ab1644|UAL1666 | United States|   1742905483|  1742905483| -77.2351| 38.1137|     11262.36|    false|  262.12|     33.07|          0.0|    11186.16|  1011|false|              0|
|4b1819|SWR252B |   Switzerland|   1742905482|  1742905482|  16.4373| 45.5644|     10652.76|    false|   

### Load Flight Data

In [0]:
import re

### Function below used to ensure the file name stays persistent when updating files.

In [0]:
from pyspark.sql.functions import col

In [0]:
def clean_column_names(df):
    new_cols = [col(c).alias(c.lower().replace(" ", "_")) for c in df.columns]
    return df.select(*new_cols)

In [0]:
def save_stable_csv(df, temp_path, final_path, filename="data.csv"):
    # Clean column names
    df = clean_column_names(df)

    # Write to temp folder
    df.coalesce(1).write.option("header", True).mode("overwrite").csv(temp_path)

    # List files in the temp directory
    files = dbutils.fs.ls(temp_path)

    # Find the CSV part file
    part_file = [f.path for f in files if re.match(r".*part.*\.csv$", f.name)]
    if part_file:
        # Rename/move to stable path
        dbutils.fs.mv(part_file[0], f"{final_path}/{filename}", True)

    # Remove temp directory
    dbutils.fs.rm(temp_path, True)

## Weather Data Extraction

In [0]:
import ftplib
import io
import re
from pyspark.sql import Row
from datetime import datetime

In [0]:
def download_metar_file(hour_utc="00"):
    filename = f"{hour_utc}Z.TXT"
    host = "tgftp.nws.noaa.gov"
    dir_path = "/data/observations/metar/cycles/"

    # Connect to FTP
    ftp = ftplib.FTP(host)
    ftp.login()
    ftp.cwd(dir_path)

    # Download file content
    r = io.BytesIO()
    ftp.retrbinary(f"RETR {filename}", r.write)
    ftp.quit()
    
    # Convert bytes to string
    content = r.getvalue().decode("utf-8", errors="ignore")
    return content

In [0]:
# Example: pull METARs from 00Z
metar_data = download_metar_file("00")
print(metar_data[:1000])  # preview first 1000 chars

2025/03/24 23:50
KANR 242350Z AUTO 08016KT 10SM SCT018 SCT024 BKN036 24/19 A3010 RMK A01

2025/03/24 23:45
KGUL 242345Z AUTO 02003KT 2 1/2SM BR CLR 23/21 A2997 RMK A01

2025/03/24 23:50
KIKT 242350Z AUTO 07020KT 10SM CLR 20/14 A3002 RMK A01

2025/03/24 23:50
KXER 242350Z AUTO 08017KT 4SM BR 21/20 A3002 RMK A01

2025/03/24 23:45
KGUL 242345Z AUTO 02003KT 2 1/2SM BR CLR 23/21 A2997 RMK A01

2025/03/24 23:50
KIKT 242350Z AUTO 07020KT 10SM CLR 20/14 A3002 RMK A01

2025/03/24 23:50
KANR 242350Z AUTO 08016KT 10SM SCT018 SCT024 BKN036 24/19 A3010 RMK A01

2025/03/24 23:50
KXER 242350Z AUTO 08017KT 4SM BR 21/20 A3002 RMK A01

2025/03/24 23:55
KBFR 242355Z AUTO 27007KT 10SM SCT120 11/M06 A2997 RMK AO2

2025/03/24 23:55
KBFR 242355Z AUTO 27007KT 10SM SCT120 11/M06 A2997 RMK AO2

2025/03/24 23:45
KGUL 242345Z AUTO 02003KT 2 1/2SM BR CLR 23/21 A2997 RMK A01

2025/03/24 23:50
KIKT 242350Z AUTO 07020KT 10SM CLR 20/14 A3002 RMK A01

2025/03/24 23:55
KBFR 242355Z AUTO 27007KT 10SM SCT120 11/M06 A2997 

## Weather Data Parsing and Transformation

In [0]:
def parse_metar_entries(raw_data):
    entries = raw_data.strip().split("\n\n")  # Split by METAR blocks
    parsed = []

    for entry in entries:
        lines = entry.strip().split("\n")
        if len(lines) < 2:
            continue  # Skip bad entries
        try:
            timestamp_str = lines[0]
            metar_line = lines[1]
            icao = metar_line.split()[0]
            timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M")
            parsed.append(Row(icao_code=icao, metar=metar_line, timestamp=timestamp))
        except Exception as e:
            print(f"Skipping entry due to error: {e}")
            continue
    
    return parsed

In [0]:
parsed_rows = parse_metar_entries(metar_data)

In [0]:
weather_df = spark.createDataFrame(parsed_rows)
weather_df.show(truncate=False)

+---------+-----------------------------------------------------------------------+-------------------+
|icao_code|metar                                                                  |timestamp          |
+---------+-----------------------------------------------------------------------+-------------------+
|KANR     |KANR 242350Z AUTO 08016KT 10SM SCT018 SCT024 BKN036 24/19 A3010 RMK A01|2025-03-24 23:50:00|
|KGUL     |KGUL 242345Z AUTO 02003KT 2 1/2SM BR CLR 23/21 A2997 RMK A01           |2025-03-24 23:45:00|
|KIKT     |KIKT 242350Z AUTO 07020KT 10SM CLR 20/14 A3002 RMK A01                 |2025-03-24 23:50:00|
|KXER     |KXER 242350Z AUTO 08017KT 4SM BR 21/20 A3002 RMK A01                   |2025-03-24 23:50:00|
|KGUL     |KGUL 242345Z AUTO 02003KT 2 1/2SM BR CLR 23/21 A2997 RMK A01           |2025-03-24 23:45:00|
|KIKT     |KIKT 242350Z AUTO 07020KT 10SM CLR 20/14 A3002 RMK A01                 |2025-03-24 23:50:00|
|KANR     |KANR 242350Z AUTO 08016KT 10SM SCT018 SCT024 BKN036 2

In [0]:
def safe_parse_metar(metar_row):
    try:
        metar = metar_row.metar
        icao = metar_row.icao_code
        timestamp = metar_row.timestamp

        # REGEX patterns
        wind_pattern = r"(\d{3}|VRB)(\d{2})KT"
        visibility_pattern = r"(\d{1,2})SM"
        temp_dew_pattern = r"(M?\d{2})/(M?\d{2})"
        altimeter_pattern = r"A(\d{4})"

        # Match and parse
        wind_match = re.search(wind_pattern, metar)
        vis_match = re.search(visibility_pattern, metar)
        temp_match = re.search(temp_dew_pattern, metar)
        alt_match = re.search(altimeter_pattern, metar)

        def parse_temp(val):
            return -int(val[1:]) if val.startswith("M") else int(val)

        wind_dir = int(wind_match.group(1).replace("VRB", "0")) if wind_match else None
        wind_speed = int(wind_match.group(2)) if wind_match else None
        wind_speed_mph = round(wind_speed * 1.15078, 2) if wind_speed is not None else None

        visibility = int(vis_match.group(1)) if vis_match else None
        temperature = parse_temp(temp_match.group(1)) if temp_match else None
        dewpoint = parse_temp(temp_match.group(2)) if temp_match else None
        altimeter = float(alt_match.group(1)) / 100 if alt_match else None

        return Row(
            icao_code=icao,
            timestamp=timestamp,
            metar=metar,
            wind_dir=wind_dir,
            wind_speed=wind_speed,
            wind_speed_mph=wind_speed_mph,
            visibility=visibility,
            temperature=temperature,
            dewpoint=dewpoint,
            altimeter=altimeter
        )

    except Exception as e:
        print(f"[Parse Error] ICAO={metar_row.icao_code}, Error={str(e)}")
        return None

## Transform parsed Metar Data Safely to ensure Accuracy and Data Integrity

In [0]:
parsed_weather_rdd = weather_df.rdd.map(safe_parse_metar).filter(lambda row: row is not None)

In [0]:
parsed_weather_df = spark.createDataFrame(parsed_weather_rdd)

In [0]:
parsed_weather_df = parsed_weather_df.drop("metar")

In [0]:
parsed_weather_df = parsed_weather_df.dropDuplicates(["icao_code", "timestamp"])

In [0]:
parsed_weather_df = parsed_weather_df.dropna()

Show Parsed Weather Dataframe

In [0]:
parsed_weather_df.show(truncate=False)

+---------+-------------------+--------+----------+--------------+----------+-----------+--------+---------+
|icao_code|timestamp          |wind_dir|wind_speed|wind_speed_mph|visibility|temperature|dewpoint|altimeter|
+---------+-------------------+--------+----------+--------------+----------+-----------+--------+---------+
|KATW     |2025-03-24 23:45:00|300     |7         |8.06          |10        |1          |-3      |29.85    |
|KAFP     |2025-03-24 23:50:00|210     |4         |4.6           |10        |18         |13      |29.87    |
|KCRQ     |2025-03-24 23:53:00|220     |6         |6.9           |5         |15         |12      |30.02    |
|PABI     |2025-03-24 23:53:00|80      |8         |9.21          |10        |3          |-11     |29.83    |
|KSJN     |2025-03-24 23:54:00|300     |7         |8.06          |10        |22         |-6      |30.18    |
|KFST     |2025-03-24 23:53:00|310     |6         |6.9           |10        |29         |0       |30.0     |
|KTRI     |2025-03-

# Join Parsed Weather with Airport Data

In [0]:
airports_with_weather_df = airports_with_icao_df.join(parsed_weather_df, on="icao_code", how="inner")

In [0]:
airports_with_weather_df = airports_with_weather_df.dropDuplicates(["icao_code"])

In [0]:
airports_with_weather_df.show(truncate=False)

+---------+----+-----+--------------+--------------------------------------------+-----------------+-------------------+------------+---------+-----------+----------+-----------------+-----------------+---------+----------+----------------------------------------+-------------------------------------------------------------------+-------------------+-------------------+--------+----------+--------------+----------+-----------+--------+---------+
|icao_code|id  |ident|type          |name                                        |latitude_deg     |longitude_deg      |elevation_ft|continent|iso_country|iso_region|municipality     |scheduled_service|iata_code|local_code|home_link                               |wikipedia_link                                                     |keywords           |timestamp          |wind_dir|wind_speed|wind_speed_mph|visibility|temperature|dewpoint|altimeter|
+---------+----+-----+--------------+--------------------------------------------+-----------------+

# Saving airports_with_weather file

# Saving processed_airports file

In [0]:
from pyspark.sql.functions import col

In [0]:


def clean_column_names(df):
    new_cols = [col(c).alias(c.lower().replace(" ", "_")) for c in df.columns]
    return df.select(*new_cols)

In [0]:
weather_cols = [
    "icao_code", "timestamp", "wind_dir", "wind_speed", "wind_speed_mph",
    "visibility", "temperature", "dewpoint", "altimeter"
]

weather_only_df = airports_with_weather_df.select(weather_cols)

In [0]:
processed_airports_df = airports_df.join(weather_only_df, on="icao_code", how="left")

In [0]:
processed_airports_df = processed_airports_df.dropDuplicates()

In [0]:
processed_airports_df

DataFrame[icao_code: string, id: int, ident: string, type: string, name: string, latitude_deg: double, longitude_deg: double, elevation_ft: int, continent: string, iso_country: string, iso_region: string, municipality: string, scheduled_service: string, iata_code: string, gps_code: string, local_code: string, home_link: string, wikipedia_link: string, keywords: string, timestamp: timestamp, wind_dir: bigint, wind_speed: bigint, wind_speed_mph: double, visibility: bigint, temperature: bigint, dewpoint: bigint, altimeter: double]

In [0]:
processed_airports_df.show(truncate=False)

+---------+------+-----+--------------+--------------------------------------+------------------+-------------------+------------+---------+-----------+----------+------------------+-----------------+---------+--------+----------+----------------------------+--------------------------------------------------------------------+----------+-------------------+--------+----------+--------------+----------+-----------+--------+---------+
|icao_code|id    |ident|type          |name                                  |latitude_deg      |longitude_deg      |elevation_ft|continent|iso_country|iso_region|municipality      |scheduled_service|iata_code|gps_code|local_code|home_link                   |wikipedia_link                                                      |keywords  |timestamp          |wind_dir|wind_speed|wind_speed_mph|visibility|temperature|dewpoint|altimeter|
+---------+------+-----+--------------+--------------------------------------+------------------+-------------------+---------

# Overwrite and save csv Files

Commenting out for security reasons

In [0]:
save_stable_csv(
    df=flights_df_clean,
    #temp_path=temporarily commenting,
    #final_path=temporarily commenting,
    filename="flights.csv"
)

In [None]:
save_stable_csv(
    df=processed_airports_df,
    #temp_path=temporarily commenting,
    #final_path=temporarily commenting,
    filename="processed_airports.csv"
)

NameError: name 'save_stable_csv' is not defined

In [0]:
save_stable_csv(
    df=airports_with_weather_df,
    #temp_path=temporarily commenting,
    #final_path= temporarily commenting,
    filename="airports_with_weather.csv"
)

In [0]:
dbutils.fs.head("abfss://processed@flightdata2025.dfs.core.windows.net/flights_csv/flights.csv", 1024)

[Truncated to first 1024 bytes]


'icao24,callsign,origin_country,time_position,last_contact,longitude,latitude,baro_altitude,on_ground,velocity,true_track,vertical_rate,geo_altitude,squawk,spi,position_source\n4b1816,SWR935L,Switzerland,1742905483,1742905483,21.581,39.1825,10965.18,false,218.93,321.49,0.33,11308.08,6627,false,0\nab1644,UAL1666,United States,1742905483,1742905483,-77.2351,38.1137,11262.36,false,262.12,33.07,0.0,11186.16,1011,false,0\n4b1819,SWR252B,Switzerland,1742905482,1742905482,16.4373,45.5644,10652.76,false,240.9,117.33,0.0,10675.62,3056,false,0\naa3cbd,N759P,United States,1742905482,1742905482,-81.548,40.8857,609.6,false,82.78,122.31,-0.33,586.74,5550,false,0\ne8027d,LPE2304,Chile,1742905479,1742905479,-77.2976,-12.0977,2545.08,false,146.19,267.98,13.98,,,false,0\n801638,AXB2749,India,1742905482,1742905483,75.8739,27.5647,8458.2,false,230.37,36.92,-9.75,8846.82,,false,0\na3feec,N357BE,United States,1742905482,1742905482,-102.4475,46.0617,11582.4,false,193.54,232.89,-0.33,11529.06,3653,false,0\n39