In [55]:
import os
import requests
import json
import pandas as pd
from datetime import date, datetime, timedelta
import sys
import hopsworks
import great_expectations as ge

In [56]:
with open("../city_config/vastra_gotaland.json") as f:
    city_config = json.load(f)

FG_VERSIONS = city_config["fg_versions"]

SENSORS = city_config["sensors"]
SENSORS

[{'id': 'GBG_FEMMAN',
  'display_name': 'Göteborg Femman',
  'city': 'Göteborg',
  'street': 'Femman',
  'filename': 'data/goteborg-femman-air-quality.csv',
  'lat': 57.70805758067377,
  'lon': 11.969179894601},
 {'id': 'GBG_OVRE_HAGA',
  'display_name': 'Göteborg Haga',
  'city': 'Göteborg',
  'street': 'Haga',
  'filename': 'data/goteborg-haga-air-quality.csv',
  'lat': 57.4155,
  'lon': 11.5723}]

In [57]:
YESTERDAY = date.today() - timedelta(days=1)
START_DATE = END_DATE = YESTERDAY.isoformat()

In [58]:
MAX_RETRIES = 3
WAIT_SECONDS = 5

def fetch_json(url):
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = requests.get(url, timeout=30)
            resp.raise_for_status()
            return resp.json()
        except Exception as e:
            print(f"Attempt {attempt} failed: {e}")
            if attempt < MAX_RETRIES:
                time.sleep(WAIT_SECONDS)
            else:
                raise RuntimeError(f"Failed after {MAX_RETRIES} attempts")


In [59]:
# Air Quality checks
aq_expectation_suite = ge.core.ExpectationSuite("aq_daily_suite")
aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        "expect_column_values_to_not_be_null",
        kwargs={"column": "pm2_5"}
    )
)
aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        "expect_column_values_to_be_between",
        kwargs={"column": "pm2_5", "min_value": 0.0, "max_value": 500.0}
    )
)


# Weather checks
weather_expectation_suite = ge.core.ExpectationSuite("weather_daily_suite")
weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        "expect_column_values_to_not_be_null",
        kwargs={"column": "temperature_2m_max"}
    )
)
weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        "expect_column_values_to_be_between",
        kwargs={"column": "temperature_2m_max", "min_value": -60, "max_value": 60}
    )
)


{"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "temperature_2m_max", "min_value": -60, "max_value": 60}, "meta": {}}

In [60]:
project = hopsworks.login(
    project=os.getenv("HOPSWORKS_PROJECT"),
    api_key_value=os.getenv("HOPSWORKS_API_KEY")
)

fs = project.get_feature_store()

2025-11-18 14:23:23,175 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-18 14:23:23,194 INFO: Initializing external client
2025-11-18 14:23:23,195 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2025-11-18 14:23:25,353 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271989


In [61]:
aqi_fg = fs.get_or_create_feature_group(
    name="air_quality",
    version=FG_VERSIONS["air_quality"],
    primary_key=["city", "street", "date"],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

weather_fg = fs.get_or_create_feature_group(
    name="weather",
    version=FG_VERSIONS["weather"],
    primary_key=["city", "street", "date"],
    event_time="date",
    expectation_suite=weather_expectation_suite
)

forecast_fg = fs.get_or_create_feature_group(
    name="weather_forecast_features",
    version=FG_VERSIONS["weather_forecast_features"],
    primary_key=["city", "street", "date"],
    event_time="date"
)

In [62]:
YESTERDAY = date.today() - timedelta(days=1)
START_DATE = END_DATE = YESTERDAY.isoformat()

for SENSOR in SENSORS:

    LAT = SENSOR["lat"]
    LON = SENSOR["lon"]
    CITY = SENSOR["city"]
    STREET = SENSOR["street"]

    print(f"\n--- Updating {CITY}, {STREET} ---")

    # ---------- Build URLs ----------
    aq_url = (
        f"https://air-quality-api.open-meteo.com/v1/air-quality?"
        f"latitude={LAT}&longitude={LON}"
        f"&past_days=1"
        f"&hourly=pm2_5"
        "&timezone=Europe%2FBerlin"
    )

    weather_url = (
        f"https://archive-api.open-meteo.com/v1/archive?"
        f"latitude={LAT}&longitude={LON}"
        f"&start_date={START_DATE}&end_date={END_DATE}"
        f"&daily=wind_speed_10m_max,wind_gusts_10m_max,wind_direction_10m_dominant,temperature_2m_max"
        "&timezone=Europe%2FBerlin"
    )

    forecast_url = (
        f"https://api.open-meteo.com/v1/forecast?"
        f"latitude={LAT}&longitude={LON}"
        f"&daily=wind_speed_10m_max,wind_gusts_10m_max,wind_direction_10m_dominant,temperature_2m_max"
        "&timezone=Europe%2FBerlin"
    )

    # ---------- Fetch Air Quality ----------
    aq_data = fetch_json(aq_url)
    air_df = pd.DataFrame({
        "date": aq_data["hourly"]["time"],
        "pm2_5": aq_data["hourly"]["pm2_5"]
    })
    air_df["date"] = pd.to_datetime(air_df["date"])
    air_df = air_df[air_df["date"].dt.hour == 12].copy()
    air_df["date"] = air_df["date"].dt.floor("D")

    air_df["country"] = "Sweden"
    air_df["city"] = CITY
    air_df["street"] = STREET

    # ---------- Fetch Weather ----------
    w_data = fetch_json(weather_url)
    weather_df = pd.DataFrame(w_data["daily"])
    weather_df["date"] = pd.to_datetime(weather_df["time"])
    weather_df.drop(columns=["time"], inplace=True)

    weather_df["country"] = "Sweden"
    weather_df["city"] = CITY
    weather_df["street"] = STREET

    # ---------- Fetch Forecast ----------
    f_data = fetch_json(forecast_url)
    forecast_df = pd.DataFrame(f_data["daily"])
    forecast_df["date"] = pd.to_datetime(forecast_df["time"])
    forecast_df.drop(columns=["time"], inplace=True)

    forecast_df["city"] = CITY
    forecast_df["street"] = STREET

    # ---------- Insert ----------
    aqi_fg.insert(air_df, write_options={"wait_for_job": False})
    weather_fg.insert(weather_df, write_options={"wait_for_job": False})
    forecast_fg.insert(forecast_df, write_options={"wait_for_job": False})

    print(f"✓ Updated: {CITY}, {STREET}")



--- Updating Göteborg, Femman ---
2025-11-18 14:23:29,046 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1718981


Uploading Dataframe: 100.00% |██████████| Rows 6/6 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: air_quality_3_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271989/jobs/named/air_quality_3_offline_fg_materialization/executions
2025-11-18 14:23:42,652 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1718980


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_4_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271989/jobs/named/weather_4_offline_fg_materialization/executions


Uploading Dataframe: 100.00% |██████████| Rows 7/7 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_forecast_features_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271989/jobs/named/weather_forecast_features_2_offline_fg_materialization/executions
✓ Updated: Göteborg, Femman

--- Updating Göteborg, Haga ---
2025-11-18 14:24:10,426 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1718981


Uploading Dataframe: 100.00% |██████████| Rows 6/6 | Elapsed Time: 00:01 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/ID2223_Lab_1_Axel_Kajsa/Resources/jobs/air_quality_3_offline_fg_materialization/config_1763471171218) to trigger the materialization job again.

2025-11-18 14:24:18,992 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1718980


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/ID2223_Lab_1_Axel_Kajsa/Resources/jobs/weather_4_offline_fg_materialization/config_1763471146913) to trigger the materialization job again.



Uploading Dataframe: 100.00% |██████████| Rows 7/7 | Elapsed Time: 00:01 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/ID2223_Lab_1_Axel_Kajsa/Resources/jobs/weather_forecast_features_2_offline_fg_materialization/config_1763471797582) to trigger the materialization job again.

✓ Updated: Göteborg, Haga
