In [1]:
import os
import hopsworks
import pandas as pd
from dotenv import load_dotenv, find_dotenv

In [2]:
from initial_edge_data import get_historical_flights, create_dataframe_flights, get_weather_bulk, get_full_year_data

In [3]:
load_dotenv(find_dotenv())

True

In [3]:
project = hopsworks.login(
    engine="python",
    project=os.getenv("HOPSWORKS_PROJECT"),
    api_key_value=os.getenv("HOPSWORKS_API_KEY"),
)
fs = project.get_feature_store()

2026-01-02 23:34:19,534 INFO: Initializing external client
2026-01-02 23:34:19,535 INFO: Base URL: https://c.app.hopsworks.ai:443
2026-01-02 23:34:21,192 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279162


In [8]:
flights_df = get_full_year_data(start_date_str="2025-01-03", end_date_str="2025-12-30")
flights_df.head()

Fetching Yearly Data:  92%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñè| 12/13 [02:16<00:11, 11.36s/it]


Ingestion complete. Removed 726 duplicate rows.


Unnamed: 0,flight_iata,airline,dep_airport,dep_time_sched,dep_time_actual,dep_delay,arr_airport,arr_time_sched,arr_time_actual,arr_delay
0,wt436,swiftair,ema,2025-01-03 01:35:00,2025-01-03 01:49:00,14,cph,2025-01-03 04:20:00,2025-01-03 04:15:00,0
1,ag4215,asl airlines ireland,cdg,2025-01-03 03:36:00,2025-01-03 04:17:00,41,cph,2025-01-03 05:30:00,2025-01-03 05:49:00,19
2,sk974,sas,bkk,2025-01-02 22:50:00,2025-01-02 23:07:00,17,cph,2025-01-03 05:55:00,2025-01-03 04:54:00,0
3,ac7453,air canada,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0
4,tg950,thai airways international,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0


In [4]:
# List the columns that contain dates
time_cols = ['dep_time_sched', 'dep_time_actual', 'arr_time_sched', 'arr_time_actual']

# Load the dataframe
flights_df = pd.read_csv("flights_data_2025.csv", parse_dates=time_cols)

# Verify the types
flights_df.head()


Unnamed: 0,flight_iata,airline,dep_airport,dep_time_sched,dep_time_actual,dep_delay,arr_airport,arr_time_sched,arr_time_actual,arr_delay
0,wt436,swiftair,ema,2025-01-03 01:35:00,2025-01-03 01:49:00,14,cph,2025-01-03 04:20:00,2025-01-03 04:15:00,0
1,ag4215,asl airlines ireland,cdg,2025-01-03 03:36:00,2025-01-03 04:17:00,41,cph,2025-01-03 05:30:00,2025-01-03 05:49:00,19
2,sk974,sas,bkk,2025-01-02 22:50:00,2025-01-02 23:07:00,17,cph,2025-01-03 05:55:00,2025-01-03 04:54:00,0
3,ac7453,air canada,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0
4,tg950,thai airways international,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0


In [5]:
# Fill actual departure with scheduled departure
flights_df['dep_time_actual'] = flights_df['dep_time_actual'].fillna(flights_df['dep_time_sched'])

# Fill actual arrival with scheduled arrival
flights_df['arr_time_actual'] = flights_df['arr_time_actual'].fillna(flights_df['arr_time_sched'])

# Drop rows where any cell is NaN
flights_df = flights_df.dropna()

# Reset the index to ensure the row numbers are continuous
flights_df = flights_df.reset_index(drop=True)

In [6]:
flights_df.head()

Unnamed: 0,flight_iata,airline,dep_airport,dep_time_sched,dep_time_actual,dep_delay,arr_airport,arr_time_sched,arr_time_actual,arr_delay
0,wt436,swiftair,ema,2025-01-03 01:35:00,2025-01-03 01:49:00,14,cph,2025-01-03 04:20:00,2025-01-03 04:15:00,0
1,ag4215,asl airlines ireland,cdg,2025-01-03 03:36:00,2025-01-03 04:17:00,41,cph,2025-01-03 05:30:00,2025-01-03 05:49:00,19
2,sk974,sas,bkk,2025-01-02 22:50:00,2025-01-02 23:07:00,17,cph,2025-01-03 05:55:00,2025-01-03 04:54:00,0
3,ac7453,air canada,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0
4,tg950,thai airways international,bkk,2025-01-03 00:05:00,2025-01-03 00:29:00,24,cph,2025-01-03 06:20:00,2025-01-03 05:55:00,0


In [7]:
len(flights_df)

284379

In [26]:
flight_fg = fs.get_or_create_feature_group(
    name="flights_fg",
    version=1,
    primary_key=['flight_iata'],
    event_time='dep_time_sched',
    description="Historical flight data from Aviation Edge",
    online_enabled=True 
)

In [12]:
flights_df.to_csv("flights_data_2025.csv", index=False)

In [20]:
import json
import requests

API_KEY = os.getenv("EDGE_API_KEY")

def get_coordinates_airport(iata_code: str) -> tuple:
    url = "https://aviation-edge.com/v2/public/airportDatabase"
    params = {
        "key": API_KEY,
        "codeIataAirport": iata_code
    }
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        
        airports = response.json()
        if not airports or "error" in airports:
            print(f"No data found for airport {iata_code} or API error:", airports.get("error", "Unknown error"))
            return None, None
        
        airport = airports[0]
        lat = airport.get('latitudeAirport')
        lon = airport.get('longitudeAirport')
        return lat, lon

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return None, None
    
CACHE_FILE = "airport_coords_cache.json"

def get_coordinates_with_cache(iata_code: str):
    # 1. Load existing cache if it exists
    cache = {}
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            cache = json.load(f)

    # 2. Return from cache if available
    if iata_code in cache:
        return cache[iata_code]['lat'], cache[iata_code]['lon']

    # 3. If not in cache, call Aviation Edge (Your existing logic)
    print(f"IATA {iata_code} not in cache. Calling Aviation Edge API...")
    lat, lon = get_coordinates_airport(iata_code) # Calling your existing function
    
    if lat and lon:
        # 4. Save to cache for next time
        cache[iata_code] = {'lat': lat, 'lon': lon}
        with open(CACHE_FILE, 'w') as f:
            json.dump(cache, f)
            
    return lat, lon

In [None]:
print("Inserting flight data...")
flight_fg.insert(flights_df)

Inserting flight data...




[A[A

[A[A%6|1767371494.750|FAIL|rdkafka#producer-1| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 99964ms in state UP, 1 identical error(s) suppressed)


[A[A%6|1767371495.901|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 100402ms in state UP)


[A[A

[A[A%4|1767371498.515|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 17 messages (1227 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
%4|1767371498.516|TERMINATE|rdkafka#producer-3| [thrd:app]: Producer terminating with 172 messages (12230 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

[A[A

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 284

Launching job: flights_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/flights_fg_1_offline_fg_materialization/executions


(Job('flights_fg_1_offline_fg_materialization', 'SPARK'), None)

%6|1767371546.777|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 49998ms in state UP, 1 identical error(s) suppressed)
%6|1767371596.907|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 99959ms in state UP, 1 identical error(s) suppressed)
%6|1767371647.450|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 49990ms in state UP, 1 identical error(s) suppressed)
%6|1767371698.059|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 50077ms in state UP, 1 identical error(s) suppressed)
%6|1767371748.440|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 100172ms in state UP, 1 identical error(s) suppressed)
%6|1767371799.128|FAIL|rdkafka#producer-5| [thrd:ssl://51.1

In [8]:
weather_df, coords_df = get_weather_bulk(flights_df)

Error fetching weather for bva: 429 Client Error: Too Many Requests for url: https://archive-api.open-meteo.com/v1/archive?latitude=49.459488&longitude=2.110815&start_date=2025-01-02&end_date=2025-12-30&hourly=temperature_2m%2Cprecipitation%2Cwind_speed_10m%2Cvisibility&timezone=UTC
Error fetching weather for bhx: 429 Client Error: Too Many Requests for url: https://archive-api.open-meteo.com/v1/archive?latitude=52.45252&longitude=-1.733256&start_date=2025-01-02&end_date=2025-12-30&hourly=temperature_2m%2Cprecipitation%2Cwind_speed_10m%2Cvisibility&timezone=UTC
Error fetching weather for tku: 429 Client Error: Too Many Requests for url: https://archive-api.open-meteo.com/v1/archive?latitude=60.512794&longitude=22.28098&start_date=2025-01-02&end_date=2025-12-30&hourly=temperature_2m%2Cprecipitation%2Cwind_speed_10m%2Cvisibility&timezone=UTC
Error fetching weather for lca: 429 Client Error: Too Many Requests for url: https://archive-api.open-meteo.com/v1/archive?latitude=34.880867&longit

In [9]:
weather_df

Unnamed: 0,weather_timestamp,temperature_2m,precipitation,wind_speed_10m,visibility,airport_iata
0,2025-01-02 00:00:00,3.1,0.0,10.2,,ema
1,2025-01-02 01:00:00,2.0,0.0,8.1,,ema
2,2025-01-02 02:00:00,0.7,0.0,5.4,,ema
3,2025-01-02 03:00:00,-0.6,0.0,6.3,,ema
4,2025-01-02 04:00:00,-0.7,0.0,6.8,,ema
...,...,...,...,...,...,...
2439355,2025-12-30 19:00:00,6.2,0.0,4.6,,bgw
2439356,2025-12-30 20:00:00,5.3,0.0,4.4,,bgw
2439357,2025-12-30 21:00:00,4.8,0.0,3.4,,bgw
2439358,2025-12-30 22:00:00,4.2,0.0,3.0,,bgw


In [10]:
delayed_flights = flights_df[flights_df['arr_delay'] > 0]

In [11]:
len(delayed_flights)

95885

In [13]:
len(flights_df)

284379

In [12]:
# Removes the column from the DataFrame
weather_df = weather_df.drop(columns=['visibility'])

In [13]:
weather_df = weather_df.dropna() # Ensure no remaining NaNs for Hopsworks

# 3. Clean the Coords DF
coords_df = coords_df.dropna()
coords_df['airport_iata'] = coords_df['airport_iata'].astype(str)

In [14]:
# --- A. TIME-SERIES WEATHER FEATURE GROUP ---
weather_fg = fs.get_or_create_feature_group(
    name="weather_fg",
    version=1,
    primary_key=['airport_iata'],
    event_time='weather_timestamp', # Crucial for point-in-time joins
    description="Hourly historical weather data",
)
weather_fg.insert(weather_df)

# --- B. STATIC AIRPORT METADATA FEATURE GROUP ---
airport_fg = fs.get_or_create_feature_group(
    name="airport_coords_fg",
    version=1,
    primary_key=['airport_iata'],
    # Note: No event_time here because coordinates are static metadata
    description="Latitude and Longitude for airports",
)
airport_fg.insert(coords_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279162/fs/1265774/fg/1890647


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 2439360/2439360 | Elapsed Time: 02:20 | Remaining Time: 00:00


Launching job: weather_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/weather_fg_1_offline_fg_materialization/executions
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279162/fs/1265774/fg/1890648


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 308/308 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: airport_coords_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/airport_coords_fg_1_offline_fg_materialization/executions


(Job('airport_coords_fg_1_offline_fg_materialization', 'SPARK'), None)

In [23]:
from tqdm import tqdm
import time
# 1. Identify missing weather data
required_airports = set(flights_df['dep_airport'].unique())
existing_weather_airports = set(weather_df['airport_iata'].unique()) if 'weather_df' in locals() else set()

missing_weather_airports = required_airports - existing_weather_airports

if not missing_weather_airports:
    print("‚úÖ Weather data is complete for all airports.")
else:
    print(f"üîç Missing weather for {len(missing_weather_airports)} airports. Starting gap fill...")

    # Reuse your existing date range logic
    start_date = flights_df['dep_time_sched'].min().strftime('%Y-%m-%d')
    end_date = flights_df['dep_time_sched'].max().strftime('%Y-%m-%d')
    
    additional_weather_frames = []

    for iata in tqdm(missing_weather_airports, desc="Filling Weather Gaps"):
        # Get coords (using the cached function we created earlier)
        lat, lon = get_coordinates_with_cache(iata)
        
        if lat is None or lon is None:
            print(f"Skipping {iata} - No coordinates available.")
            continue

        # Fetch only for this missing airport
        weather_url = "https://archive-api.open-meteo.com/v1/archive"
        weather_params = {
            "latitude": lat, "longitude": lon,
            "start_date": start_date, "end_date": end_date,
            "hourly": "temperature_2m,precipitation,wind_speed_10m",
            "timezone": "UTC"
        }
        
        try:
            res = requests.get(weather_url, params=weather_params)
            res.raise_for_status()
            w_data = res.json().get('hourly', {})
            
            temp_df = pd.DataFrame(w_data)
            temp_df['airport_iata'] = iata
            temp_df.rename(columns={'time': 'weather_timestamp'}, inplace=True)
            additional_weather_frames.append(temp_df)
            
            # Open-Meteo is generous, but a small sleep helps keep the connection stable
            time.sleep(0.5) 
            
        except Exception as e:
            print(f"‚ùå Failed to fetch weather for {iata}: {e}")

    # 2. Merge the new data into the old DataFrame
    if additional_weather_frames:
        new_weather_df = pd.concat(additional_weather_frames, ignore_index=True)


üîç Missing weather for 32 airports. Starting gap fill...


Filling Weather Gaps:   6%|‚ñã         | 2/32 [00:01<00:27,  1.09it/s]

IATA qmi not in cache. Calling Aviation Edge API...


Filling Weather Gaps:   9%|‚ñâ         | 3/32 [00:02<00:21,  1.38it/s]

No data found for airport qmi or API error: No Record Found
Skipping qmi - No coordinates available.


Filling Weather Gaps:  12%|‚ñà‚ñé        | 4/32 [00:03<00:21,  1.28it/s]

IATA qky not in cache. Calling Aviation Edge API...


Filling Weather Gaps:  16%|‚ñà‚ñå        | 5/32 [00:03<00:19,  1.35it/s]

No data found for airport qky or API error: No Record Found
Skipping qky - No coordinates available.


Filling Weather Gaps:  47%|‚ñà‚ñà‚ñà‚ñà‚ñã     | 15/32 [00:12<00:15,  1.12it/s]

IATA qyf not in cache. Calling Aviation Edge API...


Filling Weather Gaps:  50%|‚ñà‚ñà‚ñà‚ñà‚ñà     | 16/32 [00:13<00:13,  1.18it/s]

No data found for airport qyf or API error: No Record Found
Skipping qyf - No coordinates available.


Filling Weather Gaps:  72%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñè  | 23/32 [00:20<00:07,  1.13it/s]

IATA ehu not in cache. Calling Aviation Edge API...


Filling Weather Gaps:  75%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñå  | 24/32 [00:23<00:14,  1.78s/it]

An error occurred: 404 Client Error: Not Found for url: https://aviation-edge.com/v2/public/airportDatabase?key=1125b8-120b5a&codeIataAirport=ehu
Skipping ehu - No coordinates available.


Filling Weather Gaps: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 32/32 [00:31<00:00,  1.03it/s]


In [25]:
new_weather_df.rename(columns={'time': 'weather_timestamp'}, inplace=True)
new_weather_df['weather_timestamp'] = pd.to_datetime(new_weather_df['weather_timestamp'])

In [26]:
weather_fg.insert(new_weather_df)

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 243936/243936 | Elapsed Time: 00:17 | Remaining Time: 00:00


Launching job: weather_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/weather_fg_1_offline_fg_materialization/executions


(Job('weather_fg_1_offline_fg_materialization', 'SPARK'), None)

In [27]:
# Get a reference to the group
weather_fg = fs.get_feature_group("weather_fg", version=1)

# Read the entire table into a Pandas DataFrame
weather_df = weather_fg.read()

print(f"Retrieved {len(weather_df)} rows from the Feature Group.")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (56.56s) 
Retrieved 2683296 rows from the Feature Group.


In [29]:
# Assuming you already ran: weather_df = weather_fg.read()
cph_weather_df = weather_df[weather_df['airport_iata'] == 'cph']

print(f"Number of rows for CPH: {len(cph_weather_df)}")

Number of rows for CPH: 8712


In [32]:
# 2. Create the FG with a Composite Primary Key
cph_fg = fs.get_or_create_feature_group(
    name="cph_weather_fg",
    version=2, # Use version 2 to avoid conflicts with your current one
    primary_key=['airport_iata', 'weather_timestamp'], # <--- BOTH are now PKs
    event_time='weather_timestamp',
    description="CPH weather with timestamp as primary key for arrival joins",
)

cph_fg.insert(cph_weather_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279162/fs/1265774/fg/1890671


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 8712/8712 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: cph_weather_fg_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/cph_weather_fg_2_offline_fg_materialization/executions


(Job('cph_weather_fg_2_offline_fg_materialization', 'SPARK'), None)