In [90]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/davidbazalduamendez/Documents/GitHub/Air-Quality-Prediction-Service-Solna
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [91]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util_2
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> üåç Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [92]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store()
secrets = hopsworks.get_secrets_api()

AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value

sensors_json = secrets.get_secret("SENSORS_MADRID_JSON").value
SENSORS = json.loads(sensors_json)

today = datetime.date.today()

SENSORS

location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value

location = json.loads(location_str)

location

2025-11-17 18:52:57,009 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-17 18:52:57,037 INFO: Initializing external client
2025-11-17 18:52:57,037 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-17 18:52:58,522 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271984


{'country': 'spain',
 'city': 'madrid',
 'street': 'castellana',
 'aqicn_url': 'https://api.waqi.info/feed/@3231',
 'latitude': '40.4397',
 'longitude': '-3.6882'}

### <span style="color:#ff5f27;"> üîÆ Get references to the Feature Groups </span>

In [93]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name="air_quality_madrid",
    version=1
)

weather_fg = fs.get_feature_group(
    name="weather_madrid_sensors",
    version=1
)


---

## <span style='color:#ff5f27'> üå´ Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [94]:
SENSORS[0], SENSORS[0].keys()

({'sensor_id': 'casa_de_campo',
  'csv_file': 'casa-de-campo-air-quality.csv',
  'city': 'madrid',
  'country': 'spain',
  'street': 'casa-de-campo',
  'latitude': 40.42,
  'longitude': -3.753,
  'url': 'https://api.waqi.info/feed/@3228'},
 dict_keys(['sensor_id', 'csv_file', 'city', 'country', 'street', 'latitude', 'longitude', 'url']))

In [95]:
all_rows = []

for sensor in SENSORS:

    try:
        df = util_2.get_pm25(
            sensor["url"],
            sensor["country"],
            sensor["city"],
            sensor["street"],
            today,
            AQICN_API_KEY
        )

        df["sensor_id"] = sensor["sensor_id"]

        all_rows.append(df)

        print(f"OK ‚Üí {sensor['street']}")

    except Exception as e:
        print(f"ERROR ‚Üí {sensor['street']}: {e}")

# merge all dataframes
aq_today_df = pd.concat(all_rows, ignore_index=True)

# Assign proper data types
aq_today_df["date"] = pd.to_datetime(aq_today_df["date"])
aq_today_df["pm25"] = aq_today_df["pm25"].astype("float64")  # o float32 si tu FG lo tiene as√≠

# Select only the columns needed for insertion
aq_today_df = aq_today_df[["date", "pm25", "sensor_id", "city", "country", "street"]]
aq_today_df.head()


OK ‚Üí casa-de-campo
OK ‚Üí castellana
OK ‚Üí cuatro-caminos
OK ‚Üí escuelas-aguirre
OK ‚Üí fernandez-ladreda
OK ‚Üí mendez-alvaro
OK ‚Üí plaza-de-castilla


Unnamed: 0,date,pm25,sensor_id,city,country,street
0,2025-11-17,34.0,casa_de_campo,madrid,spain,casa-de-campo
1,2025-11-17,13.0,castellana,madrid,spain,castellana
2,2025-11-17,21.0,cuatro_caminos,madrid,spain,cuatro-caminos
3,2025-11-17,25.0,escuelas_aguirre,madrid,spain,escuelas-aguirre
4,2025-11-17,21.0,fernandez_ladreda,madrid,spain,fernandez-ladreda


In [96]:
aq_today_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 6 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   date       7 non-null      datetime64[ns]
 1   pm25       7 non-null      float64       
 2   sensor_id  7 non-null      object        
 3   city       7 non-null      object        
 4   country    7 non-null      object        
 5   street     7 non-null      object        
dtypes: datetime64[ns](1), float64(1), object(4)
memory usage: 464.0+ bytes


## <span style='color:#ff5f27'> üå¶ Get Weather Forecast data</span>

In [97]:
from mlfs.airquality.util_2 import get_hourly_weather_forecast
weather_rows = []

for sensor in SENSORS:
    try:
        hourly_df = get_hourly_weather_forecast(
            sensor["city"],
            sensor["latitude"],
            sensor["longitude"]
        )
        hourly_df = hourly_df.set_index("date")

        daily_df = hourly_df.between_time("11:59", "12:01").reset_index()

        daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.date
        daily_df["date"] = pd.to_datetime(daily_df["date"])

        daily_df["sensor_id"] = sensor["sensor_id"]
        daily_df["city"] = sensor["city"]
        daily_df["country"] = sensor["country"]
        daily_df["street"] = sensor["street"]

        weather_rows.append(daily_df)

        print(f"OK forecast ‚Üí {sensor['street']}")

    except Exception as e:
        print(f"ERROR forecast ‚Üí {sensor['street']}: {e}")

weather_forecast_df = pd.concat(weather_rows, ignore_index=True)


Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 649.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí casa-de-campo
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 698.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí castellana
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 708.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí cuatro-caminos
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 695.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí escuelas-aguirre
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 639.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí fernandez-ladreda
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 606.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí mendez-alvaro
Coordinates: 40.5¬∞N -3.75¬∞E
Elevation: 749.0 m asl
Timezone difference to GMT+0: 0s
OK forecast ‚Üí plaza-de-castilla


In [98]:
weather_forecast_df.sort_values('date', inplace=True)
weather_forecast_df

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,cloud_cover_max,relative_humidity_2m_mean,dew_point_2m_max,sensor_id,city,country,street
0,2025-11-16,12.9,0.1,17.287498,238.627045,34.0,70.0,7.559474,casa_de_campo,madrid,spain,casa-de-campo
21,2025-11-16,12.6,0.1,17.287498,238.627045,34.0,70.0,7.271856,escuelas_aguirre,madrid,spain,escuelas-aguirre
28,2025-11-16,13.0,0.1,17.287498,238.627045,34.0,70.0,7.655344,fernandez_ladreda,madrid,spain,fernandez-ladreda
42,2025-11-16,12.25,0.1,17.287498,238.627045,34.0,70.0,6.936285,plaza_castilla,madrid,spain,plaza-de-castilla
7,2025-11-16,12.6,0.1,17.287498,238.627045,34.0,70.0,7.271856,castellana,madrid,spain,castellana
35,2025-11-16,13.2,0.1,17.287498,238.627045,34.0,70.0,7.847077,mendez_alvaro,madrid,spain,mendez-alvaro
14,2025-11-16,12.55,0.1,17.287498,238.627045,34.0,70.0,7.223919,cuatro_caminos,madrid,spain,cuatro-caminos
1,2025-11-17,12.35,0.0,5.351785,42.273628,48.0,69.0,6.822279,casa_de_campo,madrid,spain,casa-de-campo
29,2025-11-17,12.45,0.0,5.351785,42.273628,48.0,69.0,6.917996,fernandez_ladreda,madrid,spain,fernandez-ladreda
43,2025-11-17,11.7,0.0,5.351785,42.273628,48.0,69.0,6.200077,plaza_castilla,madrid,spain,plaza-de-castilla


In [99]:
weather_forecast_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 49 entries, 0 to 48
Data columns (total 12 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         49 non-null     datetime64[ns]
 1   temperature_2m_mean          49 non-null     float32       
 2   precipitation_sum            49 non-null     float32       
 3   wind_speed_10m_max           49 non-null     float32       
 4   wind_direction_10m_dominant  49 non-null     float32       
 5   cloud_cover_max              49 non-null     float32       
 6   relative_humidity_2m_mean    49 non-null     float32       
 7   dew_point_2m_max             49 non-null     float32       
 8   sensor_id                    49 non-null     object        
 9   city                         49 non-null     object        
 10  country                      49 non-null     object        
 11  street                       49 non-null     object 

## <span style="color:#ff5f27;">‚¨ÜÔ∏è Uploading new data to the Feature Store</span>

In [100]:
# Read historical air quality data
history_df = air_quality_fg.read()

history_df["date"] = pd.to_datetime(history_df["date"], utc=True).dt.tz_convert(None)

aq_today_df["date"] = pd.to_datetime(aq_today_df["date"], utc=True).dt.tz_convert(None)

processed_rows = []

for (country, city, street, sensor_id), df_today_sensor in aq_today_df.groupby(
    ["country", "city", "street", "sensor_id"]
):

    hist_sensor = history_df[
        (history_df["country"] == country) &
        (history_df["city"] == city) &
        (history_df["street"] == street) &
        (history_df["sensor_id"] == sensor_id)
    ].sort_values("date")

    last_days_sensor = hist_sensor.tail(3)

    df_window = pd.concat([last_days_sensor, df_today_sensor], ignore_index=True)

    df_window["date"] = pd.to_datetime(df_window["date"])

    df_window = util_2.add_lags_and_rolling(df_window)

    df_window["is_weekend"] = (df_window["date"].dt.weekday >= 5).astype(int)

    df_today_processed = df_window[df_window["date"].dt.date == pd.to_datetime(today).date()].copy()

    processed_rows.append(df_today_processed)

aq_today_processed = pd.concat(processed_rows, ignore_index=True)
aq_today_processed.head(7)


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.77s) 


Unnamed: 0,date,pm25,sensor_id,city,country,street,pm25_lag1,pm25_lag2,pm25_lag3,pm25_roll3,is_weekend
0,2025-11-17,21.0,casa_de_campo,madrid,spain,casa-de-campo,25.0,13.0,,19.666667,0
1,2025-11-17,34.0,casa_de_campo,madrid,spain,casa-de-campo,21.0,25.0,13.0,26.666667,0
2,2025-11-17,30.0,castellana,madrid,spain,castellana,17.0,29.0,,25.333333,0
3,2025-11-17,13.0,castellana,madrid,spain,castellana,30.0,17.0,29.0,20.0,0
4,2025-11-17,24.0,cuatro_caminos,madrid,spain,cuatro-caminos,25.0,11.0,,20.0,0
5,2025-11-17,21.0,cuatro_caminos,madrid,spain,cuatro-caminos,24.0,25.0,11.0,23.333333,0
6,2025-11-17,25.0,escuelas_aguirre,madrid,spain,escuelas-aguirre,14.0,17.0,51.0,18.666667,0


In [101]:
# Insert new data
air_quality_fg.insert(aq_today_processed)

2025-11-17 18:53:14,505 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271984/fs/1258583/fg/1718838


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 11/11 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_madrid_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271984/jobs/named/air_quality_madrid_1_offline_fg_materialization/executions


(Job('air_quality_madrid_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 758971
         }
       },
       "result": {
         "observed_value": 5.0,
         "element_count": 11,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-17T05:53:14.000505Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_e

In [102]:
# Insert new data
weather_fg.insert(weather_forecast_df, wait=True)

2025-11-17 18:53:28,855 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271984/fs/1258583/fg/1718839


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 49/49 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_madrid_sensors_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271984/jobs/named/weather_madrid_sensors_1_offline_fg_materialization/executions
2025-11-17 18:53:46,274 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-17 18:53:52,644 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-17 18:56:13,063 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-17 18:56:13,223 INFO: Waiting for log aggregation to finish.
2025-11-17 18:56:21,840 INFO: Execution finished successfully.


(Job('weather_madrid_sensors_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 758972
         }
       },
       "result": {
         "observed_value": 2.9024126529693604,
         "element_count": 49,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-17T05:53:28.000855Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "ex

## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
