In [1]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/emaminotti/ID2223-ScalableMLDL
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [2]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> üåç Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [3]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 
secrets = hopsworks.get_secrets_api()

# Get API Key
AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value
# Get the Multi-Sensor Configuration
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
sensor_config = json.loads(location_str)

print(f"‚úÖ Loaded configuration for {len(sensor_config)} sensors: {list(sensor_config.keys())}")

# Set Location for Weather Data
# All sensors are in the same city, so we use the coordinates of the first sensor.
first_sensor = list(sensor_config.values())[0]
latitude = first_sensor['latitude']
longitude = first_sensor['longitude']
# Define City/Country manually since they aren't in the sensor_config dictionary
city = "Malmo" 
country = "Sweden"

today = datetime.date.today()

2025-11-18 08:54:10,299 INFO: Initializing external client
2025-11-18 08:54:10,300 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-18 08:54:12,030 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1267872
‚úÖ Loaded configuration for 2 sensors: ['radhuset', 'dalaplan']


### <span style="color:#ff5f27;"> üîÆ Get references to the Feature Groups </span>

In [4]:
# Retrieve the Feature Groups
air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=2,
)

weather_fg = fs.get_feature_group(
    name='weather',
    version=2,
)

print("‚úÖ Successfully retrieved Feature Groups.")

‚úÖ Successfully retrieved Feature Groups.


---

## <span style='color:#ff5f27'> üå´ Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [5]:
import requests
import pandas as pd

dfs = []

# Loop through all sensors to fetch today's data
for sensor_name, config in sensor_config.items():
    print(f"Fetching today's data for {sensor_name} ({config['street']})...")
    
    try:
        # Fetch data for the specific sensor
        df_sensor = util.get_pm25(
            config['api_url'], 
            country, 
            city, 
            config['street'], 
            today, 
            AQICN_API_KEY
        )
        
        # Add identifying columns
        df_sensor['street'] = config['street']
        df_sensor['city'] = city
        df_sensor['country'] = country
        df_sensor['url'] = config['api_url']
        
        dfs.append(df_sensor)
        time.sleep(1) # Politeness delay
        
    except Exception as e:
        print(f"‚ö†Ô∏è Failed to fetch data for {sensor_name}: {e}")

# Combine all into one DataFrame
if dfs:
    aq_today_df = pd.concat(dfs).reset_index(drop=True)
    print(f"‚úÖ Fetched {len(aq_today_df)} rows.")
    display(aq_today_df)
else:
    print("‚ùå No data fetched.")

Fetching today's data for radhuset (radhuset)...
Fetching today's data for dalaplan (dalaplan)...
‚úÖ Fetched 2 rows.


Unnamed: 0,pm25,country,city,street,date,url
0,16.0,Sweden,Malmo,radhuset,2025-11-18,https://api.waqi.info/feed/@7866
1,17.0,Sweden,Malmo,dalaplan,2025-11-18,https://api.waqi.info/feed/@10433


In [6]:
aq_today_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   pm25     2 non-null      float32       
 1   country  2 non-null      object        
 2   city     2 non-null      object        
 3   street   2 non-null      object        
 4   date     2 non-null      datetime64[ns]
 5   url      2 non-null      object        
dtypes: datetime64[ns](1), float32(1), object(4)
memory usage: 220.0+ bytes


### Update daily air quality data with lag features

Each daily update now also computes the PM2.5 values from the previous 1, 2, and 3 days per city.  
This allows the feature store to include short-term pollution history for each observation.

In [7]:
# Compute lag features before inserting into Hopsworks
aq_today_df = aq_today_df.sort_values(by=["city", "street", "date"])
for lag in [1, 2, 3]:
    aq_today_df[f"pm25_lag_{lag}"] = aq_today_df.groupby(["city", "street"])["pm25"].shift(lag)

aq_today_df = aq_today_df.dropna(subset=["pm25_lag_1", "pm25_lag_2", "pm25_lag_3"]).reset_index(drop=True)

### Insert into new Feature Group version

Since the schema now includes lag features, we use version 2 of the Feature Group `air_quality`.

In [8]:
# Retrieve the SHARED Feature Group
air_quality_fg = fs.get_feature_group(name='air_quality', version=2)

# 2. Insert the dataframe that contains the LAG features
print(f"Inserting {len(aq_today_df)} rows for sensors: {aq_today_df['street'].unique()}...")

air_quality_fg.insert(aq_today_df)

Inserting 0 rows for sensors: []...
2025-11-18 08:54:38,644 INFO: 	1 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1267872/fs/1254483/fg/1703575


Uploading Dataframe: 0.00% |          | Rows 0/0 | Elapsed Time: 00:00 | Remaining Time: ?


Launching job: air_quality_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1267872/jobs/named/air_quality_2_offline_fg_materialization/executions


(Job('air_quality_2_offline_fg_materialization', 'SPARK'),
 {
   "success": false,
   "results": [
     {
       "success": false,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 747765
         }
       },
       "result": {
         "observed_value": null,
         "element_count": 0,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T07:54:38.000643Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expect

## <span style='color:#ff5f27'> üå¶ Get Weather Forecast data</span>

In [9]:
import requests
import pandas as pd

# We use the latitude and longitude from the sensor config (set in the previous cell)
print(f"Fetching 7-DAY weather forecast for {city} ({latitude}, {longitude})...")

# Define API parameters
params = {
    "latitude": latitude,
    "longitude": longitude,
    "daily": "temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,relative_humidity_2m_mean",
    "timezone": "auto"
}

# Call Open-Meteo Forecast API
response = requests.get("https://api.open-meteo.com/v1/forecast", params=params)
data = response.json()

# Process into DataFrame
daily_df = pd.DataFrame({
    'date': data['daily']['time'],
    'temperature_2m_mean': data['daily']['temperature_2m_mean'],
    'precipitation_sum': data['daily']['precipitation_sum'],
    'wind_speed_10m_max': data['daily']['wind_speed_10m_max'],
    'wind_direction_10m_dominant': data['daily']['wind_direction_10m_dominant'],
    'humidity_mean': data['daily']['relative_humidity_2m_mean']
})

# Add city and format date
daily_df['city'] = city
daily_df['date'] = pd.to_datetime(daily_df['date'])

print(f"‚úÖ Fetched 7-day forecast. Inserting {len(daily_df)} rows.")
daily_df.head()

Fetching 7-DAY weather forecast for Malmo (55.6059, 13.0007)...
‚úÖ Fetched 7-day forecast. Inserting 7 rows.


Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,humidity_mean,city
0,2025-11-18,5.2,0.2,25.2,265,84,Malmo
1,2025-11-19,4.1,12.9,15.8,212,88,Malmo
2,2025-11-20,2.8,4.4,29.2,317,86,Malmo
3,2025-11-21,3.0,0.0,21.0,291,72,Malmo
4,2025-11-22,5.1,0.0,26.7,239,85,Malmo


In [10]:
daily_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 7 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         7 non-null      datetime64[ns]
 1   temperature_2m_mean          7 non-null      float64       
 2   precipitation_sum            7 non-null      float64       
 3   wind_speed_10m_max           7 non-null      float64       
 4   wind_direction_10m_dominant  7 non-null      int64         
 5   humidity_mean                7 non-null      int64         
 6   city                         7 non-null      object        
dtypes: datetime64[ns](1), float64(3), int64(2), object(1)
memory usage: 524.0+ bytes


## <span style="color:#ff5f27;">‚¨ÜÔ∏è Uploading new data to the Feature Store</span>

In [11]:
# Insert the dataframe with the computed LAG features
air_quality_fg.insert(aq_today_df)

2025-11-18 08:55:14,208 INFO: 	1 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1267872/fs/1254483/fg/1703575


Uploading Dataframe: 0.00% |          | Rows 0/0 | Elapsed Time: 00:00 | Remaining Time: ?


(Job('air_quality_2_offline_fg_materialization', 'SPARK'),
 {
   "success": false,
   "results": [
     {
       "success": false,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 747765
         }
       },
       "result": {
         "observed_value": null,
         "element_count": 0,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T07:55:14.000208Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expect

In [12]:
# Insert new data
weather_fg.insert(daily_df, wait=True)

2025-11-18 08:55:20,627 INFO: 	3 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1267872/fs/1254483/fg/1721825


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 7/7 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: weather_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1267872/jobs/named/weather_2_offline_fg_materialization/executions
2025-11-18 08:55:36,726 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-11-18 08:55:39,949 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-18 08:57:05,686 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-18 08:57:05,840 INFO: Waiting for log aggregation to finish.
2025-11-18 08:57:17,635 INFO: Execution finished successfully.


(Job('weather_2_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_values_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": 0.0,
           "max_value": 500.0
         },
         "meta": {
           "expectationId": 762032
         }
       },
       "result": {
         "element_count": 7,
         "missing_count": 0,
         "missing_percent": 0.0,
         "unexpected_count": 0,
         "unexpected_percent": 0.0,
         "unexpected_percent_total": 0.0,
         "unexpected_percent_nonmissing": 0.0,
         "partial_unexpected_list": []
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T07:55:20.000627Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_tracebac

## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
