In [1]:
import sys
from pathlib import Path
import warnings

warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    if root_dir.parts[-1:] == ("src",):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ("airquality",):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ("notebooks",):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir)
    print("Local environment")

print(f"Root dir: {root_dir}")

if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

from utils import config

settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: /Users/max/Repos/KTH/pm25-forecast-openmeteo-aqicn
Added the following directory to the PYTHONPATH: /Users/max/Repos/KTH/pm25-forecast-openmeteo-aqicn
HopsworksSettings initialized!


## Imports

In [2]:
import datetime
import pandas as pd
import hopsworks
import json
import warnings
import pandas as pd
from utils import airquality

warnings.filterwarnings("ignore")

## Setup

In [None]:
import signal
import time

def timeout_handler(signum, frame):
    raise TimeoutError("Hopsworks login timed out")

today = datetime.date.today()
print(f"[DEBUG] Starting hopsworks login at {datetime.datetime.now()}")

# Try login with timeout and retry
max_retries = 3
retry_delay = 10
project = None

for attempt in range(max_retries):
    try:
        print(f"[DEBUG] Login attempt {attempt + 1}/{max_retries}")
        
        # Set a 60-second timeout for the login
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(60)
        
        project = hopsworks.login(engine="python")
        
        # Cancel the alarm if successful
        signal.alarm(0)
        print(f"[DEBUG] Hopsworks login successful at {datetime.datetime.now()}")
        break
        
    except TimeoutError as e:
        signal.alarm(0)
        print(f"[DEBUG] Login attempt {attempt + 1} timed out: {e}")
        if attempt < max_retries - 1:
            print(f"[DEBUG] Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"[DEBUG] All login attempts failed")
            raise
    except Exception as e:
        signal.alarm(0)
        print(f"[DEBUG] Login attempt {attempt + 1} failed with error: {type(e).__name__}: {e}")
        if attempt < max_retries - 1:
            print(f"[DEBUG] Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            raise

if project is None:
    raise RuntimeError("Failed to login to Hopsworks after all retries")

print(f"[DEBUG] Getting feature store at {datetime.datetime.now()}")
fs = project.get_feature_store()
print(f"[DEBUG] Feature store retrieved at {datetime.datetime.now()}")

print(f"[DEBUG] Getting secrets API at {datetime.datetime.now()}")
secrets = hopsworks.get_secrets_api()
print(f"[DEBUG] Getting AQICN_API_KEY secret at {datetime.datetime.now()}")
AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value
print(f"[DEBUG] AQICN_API_KEY retrieved at {datetime.datetime.now()}")

# Retrieve feature groups
print(f"[DEBUG] Getting air_quality_all feature group at {datetime.datetime.now()}")
air_quality_fg = fs.get_feature_group(
    name="air_quality_all",
    version=1,
)
print(f"[DEBUG] air_quality_all feature group retrieved at {datetime.datetime.now()}")

print(f"[DEBUG] Getting weather_all feature group at {datetime.datetime.now()}")
weather_fg = fs.get_feature_group(
    name="weather_all",
    version=1,
)
print(f"[DEBUG] weather_all feature group retrieved at {datetime.datetime.now()}")

Set SENSOR_CSV_FILE in .env with the relative path to a sensor to process it, or leave it unset to process all sensors in the `data` folder

In [4]:
sensor_csv_file = getattr(settings, 'SENSOR_CSV_FILE', None)

if sensor_csv_file:
    # Read one secret for single sensor mode
    _, _, _, _, _, sensor_id = airquality.read_sensor_data(sensor_csv_file)
    secret_name = f"SENSOR_LOCATION_JSON_{sensor_id}"
    location_str = secrets.get_secret(secret_name).value
    locations = {sensor_id: json.loads(location_str)}
else:
    # Read all individual secrets in batch mode
    all_secrets = secrets.get_secrets()
    locations = {}
    for secret in all_secrets:
        if secret.name.startswith("SENSOR_LOCATION_JSON_"):
            sensor_id = secret.name.replace("SENSOR_LOCATION_JSON_", "")
            location_str = secrets.get_secret(secret.name).value
            if location_str:
                locations[sensor_id] = json.loads(location_str)


## Helper Methods

In [5]:
def get_daily_weather_forecast(city, latitude, longitude):
    hourly_df = airquality.get_hourly_weather_forecast(city, latitude, longitude)
    hourly_df = hourly_df.set_index("date")
    daily_df = hourly_df.between_time("11:59", "12:01")
    daily_df = daily_df.reset_index()
    daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.date
    daily_df["date"] = pd.to_datetime(daily_df["date"])
    daily_df["city"] = city
    return daily_df


def fetch_data_for_location(location):
    country = location["country"]
    city = location["city"]
    street = location["street"]
    aqicn_url = location["aqicn_url"]
    latitude = location["latitude"]
    longitude = location["longitude"]

    aq_today_df = airquality.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
    daily_df = get_daily_weather_forecast(city, latitude, longitude)
    return aq_today_df, daily_df

## Script

In [6]:
aqs = []
weathers = []
for sensor, location in locations.items():
    aq_today_df, weather_daily_forecast_df = fetch_data_for_location(location)

    # Air quality FG shape
    aq_today_df = aq_today_df.assign(
        sensor_id=str(sensor),
        street=location["street"],
        city=location["city"],
        country=location["country"],
        feed_url=location["aqicn_url"],
    )
    aq_today_df["date"] = pd.to_datetime(aq_today_df["date"])

    # Weather FG shape
    weather_daily_forecast_df = weather_daily_forecast_df.assign(
        sensor_id=str(sensor),
        city=location["city"],
        latitude=location["latitude"],
        longitude=location["longitude"],
    )
    weather_daily_forecast_df["date"] = pd.to_datetime(
        weather_daily_forecast_df["date"]
    )

    aqs.append(aq_today_df)
    weathers.append(weather_daily_forecast_df)

In [7]:
aq_df = pd.concat(aqs)
aq_df["pm25"] = pd.to_numeric(aq_df["pm25"], errors="coerce").astype("float64")
aq_df["date"] = pd.to_datetime(aq_df["date"]).dt.tz_localize(None)
aq_df = aq_df.drop(columns=["url"], errors="ignore")

# Get historical data for rolling window and lagged features
historical_start = today - datetime.timedelta(days=4)
historical_df = pd.DataFrame()

# Read historical data from feature group and filter for the last 4 days
try:
    historical_df = air_quality_fg.read()
    if not historical_df.empty:
        historical_df["date"] = pd.to_datetime(historical_df["date"]).dt.tz_localize(None)
        historical_df = historical_df[
            (historical_df["date"].dt.date >= historical_start) & (historical_df["date"].dt.date < today)
        ][["date", "sensor_id", "pm25"]]
except Exception:
    pass


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.95s) 


In [8]:
# Combine historical + new data and calculate rolling window
combined_df = pd.concat([historical_df, aq_df], ignore_index=True) if not historical_df.empty else aq_df
combined_df = airquality.add_rolling_window_feature(combined_df, window_days=3, column="pm25", new_column="pm25_rolling_3d")
combined_df = airquality.add_lagged_features(combined_df, column="pm25", lags=[1, 2, 3])
combined_df = airquality.add_nearby_sensor_feature(combined_df, locations, column="pm25_lag_1d", n_closest=3)
aq_df = combined_df[combined_df["date"].dt.date == today].copy()
aq_df

Unnamed: 0,date,sensor_id,pm25,country,city,street,feed_url,pm25_rolling_3d,pm25_lag_1d,pm25_lag_2d,pm25_lag_3d,pm25_nearby_avg
72,2025-11-18,112672,11.0,Sweden,Gothenburg,Bågskyttegatan,https://api.waqi.info/feed/A112672/,1.683333,2.0,1.4,1.65,1.01
61,2025-11-18,154549,4.0,Sweden,Västra Göteborg,Järnbrottsgatan,https://api.waqi.info/feed/A154549/,1.166667,1.3,1.0,1.2,1.273333
71,2025-11-18,194215,7.0,Sweden,Torslanda,Norra Sävviksvägen,https://api.waqi.info/feed/A194215/,0.65,0.52,0.73,0.7,1.01
64,2025-11-18,404209,5.0,Sweden,Lindome,Högkullevägen,https://api.waqi.info/feed/A404209/,1.126667,1.13,0.95,1.3,1.376667
65,2025-11-18,59095,9.0,Sweden,Mölndal,Eklanda Slätt,https://api.waqi.info/feed/A59095/,1.0,0.9,1.1,1.0,1.376667
73,2025-11-18,59893,3.0,Sweden,Lundby,Londongatan,https://api.waqi.info/feed/A59893/,0.866667,1.0,0.8,0.8,1.373333
66,2025-11-18,60535,8.0,Sweden,Majorna-Linné,Annedal,https://api.waqi.info/feed/A60535/,1.233333,1.2,0.93,1.57,1.69
62,2025-11-18,60541,19.0,Sweden,Majorna-Linné,Prinsgatan,https://api.waqi.info/feed/A60541/,2.09,2.09,,,1.393333
67,2025-11-18,60853,20.0,Sweden,Majorna-Linné,Masthugget,https://api.waqi.info/feed/A60853/,0.72,0.73,0.6,0.83,1.846667
69,2025-11-18,61714,5.0,Sweden,Norra Hisingen,Nyhemsgatan,https://api.waqi.info/feed/A61714/,0.89,0.88,0.79,1.0,1.44


In [9]:
weather_df = pd.concat(weathers)
weather_df["date"] = pd.to_datetime(weather_df["date"])
weather_df

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city,sensor_id,latitude,longitude
0,2025-11-18,3.70,0.0,4.334974,184.763550,Västra Göteborg,154549,57.678000,11.910000
1,2025-11-19,1.35,0.0,2.099143,300.963684,Västra Göteborg,154549,57.678000,11.910000
2,2025-11-20,-0.75,0.0,20.063339,9.293242,Västra Göteborg,154549,57.678000,11.910000
3,2025-11-21,0.25,0.0,4.802999,167.005386,Västra Göteborg,154549,57.678000,11.910000
4,2025-11-22,4.30,0.7,22.383780,207.801361,Västra Göteborg,154549,57.678000,11.910000
...,...,...,...,...,...,...,...,...,...
2,2025-11-20,-0.80,0.0,20.063339,9.293242,Centrum,69628,57.681718,11.970109
3,2025-11-21,0.20,0.0,4.802999,167.005386,Centrum,69628,57.681718,11.970109
4,2025-11-22,4.25,0.7,22.383780,207.801361,Centrum,69628,57.681718,11.970109
5,2025-11-23,6.15,0.0,7.628263,199.290146,Centrum,69628,57.681718,11.970109


In [10]:
air_quality_fg.insert(aq_df)
weather_fg.insert(weather_df)

2025-11-18 11:56:13,619 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279179/fs/1265797/fg/1737043


Uploading Dataframe: 100.00% |██████████| Rows 16/16 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: air_quality_all_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279179/jobs/named/air_quality_all_1_offline_fg_materialization/executions
2025-11-18 11:56:27,977 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279179/fs/1265797/fg/1738058


Uploading Dataframe: 100.00% |██████████| Rows 112/112 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_all_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279179/jobs/named/weather_all_1_offline_fg_materialization/executions


(Job('weather_all_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 769026
         }
       },
       "result": {
         "observed_value": 1.0182337760925293,
         "element_count": 112,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T10:56:27.000977Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_