In [1]:
CITY = "Hagsätra, Stockholm"
LAT, LON = 59.2371, 17.9819 
AQICN_STATION_ID = "A59356"

In [2]:
import os
import datetime as dt
from typing import Dict, Any

import pandas as pd
import requests
import hopsworks

In [3]:
AQ_CSV_PATH = os.getenv("AQ_CSV_PATH", "data/aqicn_historical.csv")

BACKFILL_DAYS = 400

In [4]:
project = hopsworks.login() 
fs = project.get_feature_store()

2025-11-01 19:12:07,726 INFO: Initializing external client
2025-11-01 19:12:07,727 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-11-01 19:12:09,223 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1278100


In [5]:
print("Connected to project:", project.name)

Connected to project: airqualityapp


## HELPERS

In [6]:
def _get_open_meteo_archive(lat: float, lon: float, start: str, end: str) -> Dict[str, Any]:
    # Fetch daily weather  for a date range (inclusive).
    url = (
        "https://archive-api.open-meteo.com/v1/era5"
        f"?latitude={lat}&longitude={lon}"
        "&daily=wind_speed_10m_max,wind_direction_10m_dominant,wind_gusts_10m_max,"
        "temperature_2m_max,temperature_2m_min,precipitation_sum"
        f"&start_date={start}&end_date={end}&timezone=UTC"
    )
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    return r.json()

In [7]:
def _to_utc_midnight(values):
    dtv = pd.to_datetime(values, utc=True, errors="coerce")
    # If it's a Series, use .dt.normalize(); if it's a DatetimeIndex, use .normalize()
    if isinstance(dtv, pd.Series):
        return dtv.dt.normalize()
    else:
        return dtv.normalize()


def _weather_payload_to_df(payload: Dict[str, Any], city: str) -> pd.DataFrame:
    daily = payload["daily"]
    df = pd.DataFrame(
        {
            "date": _to_utc_midnight(daily["time"]),
            "city": city,
            "temp_max": daily["temperature_2m_max"],
            "temp_min": daily["temperature_2m_min"],
            "precip_sum": daily["precipitation_sum"],
            "wind_speed_max": daily["wind_speed_10m_max"],
            "wind_gusts_max": daily["wind_gusts_10m_max"],
            "wind_direction_dominant": daily["wind_direction_10m_dominant"],
        }
    )
    # Enforce schema/order
    cols = [
        "city",
        "date",
        "temp_max",
        "temp_min",
        "precip_sum",
        "wind_speed_max",
        "wind_gusts_max",
        "wind_direction_dominant",
    ]
    return df[cols].sort_values("date").dropna(subset=["date"])

In [8]:
def _read_aq_csv_with_stats_schema(path: str, city: str) -> pd.DataFrame:
    """
    Read AQICN CSV with schema:
    date,min,max,median,q1,q3,stdev,count
    Use 'median' as pm2_5. Dates may be ISO with 'Z' (UTC).
    """
    if not os.path.exists(path):
        raise FileNotFoundError(
            f"Air-quality CSV not found at '{path}'. "
            "Expected columns: date,min,max,median,q1,q3,stdev,count"
        )
    df = pd.read_csv(path)

    # Flexible column name handling (case-insensitive)
    cols = {c.lower(): c for c in df.columns}
    date_col = cols.get("date")
    median_col = cols.get("median")

    if not date_col or not median_col:
        raise ValueError(
            f"CSV must contain at least 'date' and 'median' columns. Found: {list(df.columns)}"
        )

    out = pd.DataFrame(
        {
            "city": city,
            "date": _to_utc_midnight(df[date_col]),
            "pm2_5": pd.to_numeric(df[median_col], errors="coerce"),
        }
    )

    # Basic hygiene: drop NaT dates / NaN pm2_5, dedupe, clamp to plausible range
    out = out.dropna(subset=["date", "pm2_5"]).sort_values("date")
    out = out.drop_duplicates(subset=["city", "date"])
    out["pm2_5"] = out["pm2_5"].clip(lower=0, upper=1000)  # µg/m³, generous upper bound

    return out

## Bulding Weather BACKFILL

In [9]:
today = dt.date.today()
start_date = (today - dt.timedelta(days=BACKFILL_DAYS)).isoformat()
end_date = (today - dt.timedelta(days=1)).isoformat() 

print(f"Fetching weather archive for {CITY}: {start_date} to {end_date}")

Fetching weather archive for Hagsätra, Stockholm: 2024-09-27 to 2025-10-31


In [10]:
weather_payload = _get_open_meteo_archive(LAT, LON, start=start_date, end=end_date)
weather_df = _weather_payload_to_df(weather_payload, CITY)

print("Weather rows:", len(weather_df))

Weather rows: 400


In [11]:
display(weather_df.head())

Unnamed: 0,city,date,temp_max,temp_min,precip_sum,wind_speed_max,wind_gusts_max,wind_direction_dominant
0,"Hagsätra, Stockholm",2024-09-27 00:00:00+00:00,13.5,7.7,20.9,13.0,36.4,313
1,"Hagsätra, Stockholm",2024-09-28 00:00:00+00:00,11.2,5.2,1.9,14.4,31.0,285
2,"Hagsätra, Stockholm",2024-09-29 00:00:00+00:00,10.6,4.5,0.0,16.7,38.9,277
3,"Hagsätra, Stockholm",2024-09-30 00:00:00+00:00,12.4,2.5,0.0,8.8,22.3,181
4,"Hagsätra, Stockholm",2024-10-01 00:00:00+00:00,12.0,4.2,0.8,12.5,25.6,75


## Reading Air Quality History

In [12]:
try:
    aq_df = _read_aq_csv_with_stats_schema(AQ_CSV_PATH, CITY)
    print("Air-quality rows:", len(aq_df))
    display(aq_df.head())
except Exception as e:
    aq_df = pd.DataFrame(columns=["city", "date", "pm2_5"])
    print("Air-quality CSV not loaded:", e)

Air-quality rows: 671


Unnamed: 0,city,date,pm2_5
0,"Hagsätra, Stockholm",2019-12-09 00:00:00+00:00,2.1
1,"Hagsätra, Stockholm",2019-12-10 00:00:00+00:00,1.58
2,"Hagsätra, Stockholm",2019-12-11 00:00:00+00:00,2.34
3,"Hagsätra, Stockholm",2019-12-12 00:00:00+00:00,15.44
4,"Hagsätra, Stockholm",2019-12-13 00:00:00+00:00,9.23


## Pushing weather to HOPSWORKS

In [None]:
WEATHER_VERSION = 4

weather_fg = fs.get_or_create_feature_group(
    name="weather",
    version=WEATHER_VERSION,
    primary_key=["city", "date"],
    event_time="date",
    description="Daily weather for each city (Open-Meteo ERA5 archive).",
)

weather_fg.insert(weather_df, write_options={"wait_for_job": True})
print("Weather backfill done.")

Uploading Dataframe: 100.00% |██████████| Rows 400/400 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: weather_4_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1278100/jobs/named/weather_4_offline_fg_materialization/executions
2025-11-01 19:29:14,417 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-01 19:29:20,784 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-01 19:31:30,891 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-11-01 19:31:37,229 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-01 19:31:37,380 INFO: Waiting for log aggregation to finish.
2025-11-01 19:31:45,980 INFO: Execution finished successfully.
Weather backfill done.


## Pushing Air quality 

In [23]:
air_quality_fg = fs.get_or_create_feature_group(
    name="air_quality",
    version=5,
    primary_key=["city", "date"],
    event_time="date",
    description="Daily PM2.5 per city (AQICN, using daily median).",
)
if len(aq_df) > 0:
    print("Inserting air-quality backfill…")
    air_quality_fg.insert(aq_df, write_options={"wait_for_job": True})
    print("Air-quality backfill done.")
else:
    print("Skipped air-quality insert (no rows). Rerun after adding the CSV.")

Inserting air-quality backfill…


Uploading Dataframe: 100.00% |██████████| Rows 671/671 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_5_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1278100/jobs/named/air_quality_5_offline_fg_materialization/executions
2025-11-01 19:39:38,585 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-01 19:39:41,743 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-01 19:41:19,814 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-11-01 19:41:22,990 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-01 19:41:23,130 INFO: Waiting for log aggregation to finish.
2025-11-01 19:41:31,720 INFO: Execution finished successfully.
Air-quality backfill done.


In [None]:
print("Reading a small sample from each Feature Group…")
w_sample = weather_fg.read().query("city == @CITY").sort_values("date").tail(5)
aq_sample = air_quality_fg.read().query("city == @CITY").sort_values("date").tail(5)

display(w_sample)
display(aq_sample)

Reading a small sample from each Feature Group…
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.67s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.72s) 


Unnamed: 0,city,date,temp_max,temp_min,precip_sum,wind_speed_max,wind_gusts_max,wind_direction_dominant
224,"Hagsätra, Stockholm",2025-10-27 00:00:00+00:00,8.2,3.4,3.8,10.7,20.2,203
103,"Hagsätra, Stockholm",2025-10-28 00:00:00+00:00,8.9,2.7,0.4,9.7,22.3,248
197,"Hagsätra, Stockholm",2025-10-29 00:00:00+00:00,6.4,2.7,0.3,10.4,19.8,234
72,"Hagsätra, Stockholm",2025-10-30 00:00:00+00:00,9.7,4.3,17.1,17.0,34.2,124
371,"Hagsätra, Stockholm",2025-10-31 00:00:00+00:00,7.5,5.0,0.0,17.5,34.6,320


Unnamed: 0,city,date,pm2_5
159,"Hagsätra, Stockholm",2025-10-28 00:00:00+00:00,3.9
308,"Hagsätra, Stockholm",2025-10-29 00:00:00+00:00,5.33
116,"Hagsätra, Stockholm",2025-10-30 00:00:00+00:00,4.5
652,"Hagsätra, Stockholm",2025-10-31 00:00:00+00:00,1.8
581,"Hagsätra, Stockholm",2025-11-01 00:00:00+00:00,2.23


: 