# Daily Feature Pipeline
* Retrieve yesterdays data for flights, weather and the calender
* Add these new data to the Feature Store 

## OpenSky Recent Data 
* Use the OpenSky api to retrieve the most recent flight landing data, to update our feature group

In [1]:
import pandas as pd 
import os
import datetime
import requests 
import hopsworks
import numpy as np

In [None]:
import os
import hopsworks

api_key = os.getenv("HOPSWORKS_API_KEY")  # will exist in GitHub Actions

if api_key:
    fs = hopsworks.login(api_key_value=api_key)
else:
    fs = hopsworks.login()  # local (uses your existing auth)
    
secrets = hopsworks.get_secrets_api()

2026-01-11 16:14:22,399 INFO: Closing external client and cleaning up certificates.
Connection closed.
2026-01-11 16:14:22,406 INFO: Initializing external client
2026-01-11 16:14:22,407 INFO: Base URL: https://c.app.hopsworks.ai:443
2026-01-11 16:14:24,000 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286325


In [16]:
CLIENT_ID = secrets.get_secret("OPENSKY_CLIENT_ID").value
CLIENT_SECRET = secrets.get_secret("OPENSKY_CLIENT_SECRET").value
ICAO = "ESSA"

def get_access_token():
    auth_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET
    }
    response = requests.post(auth_url, data=data)
    response.raise_for_status()
    return response.json().get("access_token")

def fetch_yesterday_data(token):
    # Calculate yesterday's window
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    start_ts = int(datetime.datetime.combine(yesterday, datetime.time.min).timestamp())
    end_ts = int(datetime.datetime.combine(yesterday, datetime.time.max).timestamp())

    api_url = f"https://opensky-network.org/api/flights/arrival?airport={ICAO}&begin={start_ts}&end={end_ts}"
    headers = {"Authorization": f"Bearer {token}"}
    
    response = requests.get(api_url, headers=headers)
    if response.status_code == 200:
        flights = response.json()
        return yesterday, len(flights)
    else:
        print(f"API Error: {response.status_code}")
        return yesterday, 0

token = get_access_token()
flight_date, total_landings = fetch_yesterday_data(token)

print(f"Arlanda had {total_landings} landings on date: {flight_date}")

API Error: 404
Arlanda had 0 landings on date: 2026-01-10


## Uploading new data to the Feature Store 

**New Flight Data**

In [4]:
new_flight_data = pd.DataFrame({
    "date": [flight_date],
    "total_landings": [total_landings],
})

# Enforce dtypes explicitly
new_flight_data["date"] = pd.to_datetime(new_flight_data["date"])
new_flight_data["total_landings"] = new_flight_data["total_landings"].astype(np.int64)

In [5]:
# Retrieve feature group
flight_data_fg = fs.get_feature_group(
    name='flight_data_arlanda',
    version=1,
)

In [6]:
# insert new data
flight_data_fg.insert(new_flight_data, wait = True)

Uploading Dataframe: 100.00% |█| Rows 1/1 | Elapsed Time: 00:00 | Remaining Tim


Launching job: flight_data_arlanda_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286325/jobs/named/flight_data_arlanda_1_offline_fg_materialization/executions
2026-01-11 15:22:20,647 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-11 15:22:23,860 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 15:22:27,075 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 15:24:09,856 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 15:24:10,054 INFO: Waiting for log aggregation to finish.
2026-01-11 15:24:39,105 INFO: Execution finished successfully.


(Job('flight_data_arlanda_1_offline_fg_materialization', 'SPARK'), None)

## Weather and calender Recent Data
* Download the most last days calander and weather data and upload it to Hopsworks

In [7]:
import holidays

**Specify yesterday date**

In [8]:
run_date = (
    pd.Timestamp.utcnow()
    .tz_convert("Europe/Stockholm")
    .normalize()
    - pd.Timedelta(days=1)
).date()

**Function for retrieving weather data for a specific date**

In [9]:
def fetch_weather_for_date(date):
    url = "https://archive-api.open-meteo.com/v1/archive"

    params = {
        "latitude": 59.3293,
        "longitude": 18.0686,
        "start_date": date.strftime("%Y-%m-%d"),
        "end_date": date.strftime("%Y-%m-%d"),
        "daily": [
            "temperature_2m_mean", # daily mean temperature 2 m over water
            "precipitation_sum", # total precipitation for the day
            "snowfall_sum", # total snowfall for the day
            "windspeed_10m_max" # maximum wind speed during the day 10 m over water
        ],
        "timezone": "Europe/Stockholm"
    }

    r = requests.get(url, params=params)
    r.raise_for_status()

    daily = r.json()["daily"]

    df = pd.DataFrame(daily)
    df.rename(columns={
        "time": "date",
        "temperature_2m_mean": "tavg",
        "precipitation_sum": "prcp",
        "snowfall_sum": "snow",
        "windspeed_10m_max": "wspd"
    }, inplace=True)

    df["date"] = pd.to_datetime(df["date"])
    return df

**Function for retrieving holiday and calender data**

In [10]:

def calendar_features(date):
    se_holidays = holidays.Sweden()

    return {
        "day_of_week": np.int64(date.weekday()),
        "is_weekend": np.int64(date.weekday() >= 5),
        "week_of_year": np.int64(date.isocalendar().week),
        "month": np.int64(date.month),
        "is_holiday": np.int64(date in se_holidays)
    }

**Retrieve yesterday calender, holiday and weather data**

In [11]:
weather = fetch_weather_for_date(run_date)

cal = calendar_features(run_date)
 
features = pd.DataFrame([{
    "date": run_date,
    "tavg": weather.loc[0, "tavg"],
    "prcp": weather.loc[0, "prcp"],
    "snow": weather.loc[0, "snow"],
    "wspd": weather.loc[0, "wspd"],
    **cal
}])

features

Unnamed: 0,date,tavg,prcp,snow,wspd,day_of_week,is_weekend,week_of_year,month,is_holiday
0,2026-01-10,-6.1,0.0,0.0,26.6,5,1,2,1,0


**Upload yesterday data to the feature group in Hopsworks**

In [12]:
# Retrieve feature group
weather_cal_fg = fs.get_feature_group(
    name="stockholm_weather_calendar_features",
    version=1
)

In [13]:
features["date"] = pd.to_datetime(features["date"])

weather_cal_fg.insert(
    features,
    write_options={"wait_for_job": True}
)

Uploading Dataframe: 100.00% |█| Rows 1/1 | Elapsed Time: 00:00 | Remaining Tim


Launching job: stockholm_weather_calendar_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286325/jobs/named/stockholm_weather_calendar_features_1_offline_fg_materialization/executions
2026-01-11 15:25:19,811 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-11 15:25:23,046 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 15:25:26,245 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 15:27:15,280 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2026-01-11 15:27:18,481 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-11 15:27:18,667 INFO: Waiting for log aggregation to finish.
2026-01-11 15:27:30,757 INFO: Execution finished successfully.


(Job('stockholm_weather_calendar_features_1_offline_fg_materialization', 'SPARK'),
 None)