In [58]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [59]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
import importlib
from mlfs.airquality import util
importlib.reload(util)
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> üåç Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [60]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 
secrets = hopsworks.get_secrets_api()

# This line will fail if you have not registered the AQICN_API_KEY as a secret in Hopsworks
AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value
location_str_1 = secrets.get_secret("SENSOR_LOCATION_JSON_1").value
location_str_2 = secrets.get_secret("SENSOR_LOCATION_JSON_2").value
location_str_3 = secrets.get_secret("SENSOR_LOCATION_JSON_3").value
location_str_4 = secrets.get_secret("SENSOR_LOCATION_JSON_4").value
location_str_5 = secrets.get_secret("SENSOR_LOCATION_JSON_5").value
location_1 = json.loads(location_str_1)
location_2 = json.loads(location_str_2)
location_3 = json.loads(location_str_3)
location_4 = json.loads(location_str_4)
location_5 = json.loads(location_str_5)

def get_info(location):
    country=location['country']
    city=location['city']
    street=location['street']
    aqicn_url=location['aqicn_url']
    latitude=location['latitude']
    longitude=location['longitude']
    csv=location['csv']
    return country,city,street,aqicn_url,latitude,longitude,csv

country_1,city_1,street_1,url_1,lat_1,long_1,csv_1 = get_info(location_1)
country_2,city_2,street_2,url_2,lat_2,long_2,csv_2 = get_info(location_2)
country_3,city_3,street_3,url_3,lat_3,long_3,csv_3 = get_info(location_3)
country_4,city_4,street_4,url_4,lat_4,long_4,csv_4 = get_info(location_4)
country_5,city_5,street_5,url_5,lat_5,long_5,csv_5 = get_info(location_5)

today = datetime.date.today()


2025-11-17 17:11:29,545 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-17 17:11:29,553 INFO: Initializing external client
2025-11-17 17:11:29,555 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-17 17:11:31,071 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286306


### <span style="color:#ff5f27;"> üîÆ Get references to the Feature Groups </span>

In [61]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality_a',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_a',
    version=1,
)

---

## <span style='color:#ff5f27'> üå´ Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [62]:
import requests
import pandas as pd

aq_today_df_1= util.get_pm25(url_1, country_1, city_1, street_1, today, AQICN_API_KEY)
aq_today_df_2= util.get_pm25(url_2, country_2, city_2, street_2, today, AQICN_API_KEY)
aq_today_df_3= util.get_pm25(url_3, country_3, city_3, street_3, today, AQICN_API_KEY)
aq_today_df_4= util.get_pm25(url_4, country_4, city_4, street_4, today, AQICN_API_KEY)
aq_today_df_5= util.get_pm25(url_5, country_5, city_5, street_5, today, AQICN_API_KEY)
# aq_today_df = aq_today_df.drop(['country','city','street','url'],axis = 'columns')
aq_today_df_1

Unnamed: 0,pm25,country,city,street,date,url
0,171.0,sweden,√ñrnsk√∂ldsvik,H√∂rnettv√§gen,2025-11-17,https://api.waqi.info/feed/A105325


In [63]:
aq_today_df_1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   pm25     1 non-null      float32       
 1   country  1 non-null      object        
 2   city     1 non-null      object        
 3   street   1 non-null      object        
 4   date     1 non-null      datetime64[ns]
 5   url      1 non-null      object        
dtypes: datetime64[ns](1), float32(1), object(4)
memory usage: 172.0+ bytes


## <span style='color:#ff5f27'> üå¶ Get Weather Forecast data</span>

In [64]:
def get_daily_df(city,latitude,longitude,street):
    hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
    hourly_df = hourly_df.set_index('date')
    
    # We will only make 1 daily prediction, so we will replace the hourly forecasts with a single daily forecast
    # We only want the daily weather data, so only get weather at 12:00
    daily_df = hourly_df.between_time('11:59', '12:01')
    daily_df = daily_df.reset_index()
    daily_df['date'] = pd.to_datetime(daily_df['date']).dt.date
    daily_df['date'] = pd.to_datetime(daily_df['date'])
    daily_df['city'] = city
    daily_df['street'] = street
    return daily_df

daily_df_1 = get_daily_df(city_1,lat_1,long_1,street_1)
daily_df_2 = get_daily_df(city_2,lat_2,long_2,street_2)
daily_df_3 = get_daily_df(city_3,lat_3,long_3,street_3)
daily_df_4 = get_daily_df(city_4,lat_4,long_4,street_4)
daily_df_5 = get_daily_df(city_5,lat_5,long_5,street_5)

Coordinates 63.25¬∞N 18.75¬∞E
Elevation 25.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 42.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [65]:
def get_hourly_df(city,latitude,longitude):
    hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
    hourly_df = hourly_df.set_index('date')
    return hourly_df
hourly_df_1 = get_hourly_df(city_1,lat_1,long_1)
hourly_df_2 = get_hourly_df(city_2,lat_2,long_2)
hourly_df_3 = get_hourly_df(city_3,lat_3,long_3)
hourly_df_4 = get_hourly_df(city_4,lat_4,long_4)
hourly_df_5 = get_hourly_df(city_5,lat_5,long_5)

Coordinates 63.25¬∞N 18.75¬∞E
Elevation 25.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 42.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.5¬∞N 17.25¬∞E
Elevation 16.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


## <span style="color:#ff5f27;">‚¨ÜÔ∏è Uploading new data to the Feature Store</span>

In [66]:
def get_latest_day(csv_file):
    csv_file = f'{root_dir}/data/{csv_file}'
    util.check_file_path(csv_file)
    df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True)
    latest_day = df['date'].max()
    # latest_day = pd.to_datetime(latest_day)  # Keep it timezone-naive
    latest_day = pd.to_datetime(latest_day)
    return latest_day
latest_day_1 = get_latest_day(csv_1)
latest_day_2 = get_latest_day(csv_2)
latest_day_3 = get_latest_day(csv_3)
latest_day_4 = get_latest_day(csv_4)
latest_day_5 = get_latest_day(csv_5)

File successfully found at the path: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a/data/H√∂rnettv√§gen_√ñrnsk√∂ldsvik.csv
File successfully found at the path: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a/data/sundsvall_k√∂pmangatan.csv
File successfully found at the path: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a/data/Korstav√§gen_Sundsvall_Sw.csv
File successfully found at the path: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a/data/Bj√∂rneborgsgatan_Sk√∂nsmon.csv
File successfully found at the path: D:\ID2223 Scalable Machine Learning\Lab 1\mlfs_a/data/sundsvall_bergsgatan.csv


In [67]:
air = air_quality_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.78s) 


In [68]:
def add_rolling_daily(air_quality_fg, latest_day,street,aq_today_df, country,city,aqicn_url):
    # New data from the latest date in .csv file
    aq_df_pseu = air_quality_fg.read()
    aq_df_pseu = aq_df_pseu[aq_df_pseu['street'] == street]
    aq_df_pseu['date'] = aq_df_pseu['date'].dt.tz_localize(None)
    filtered_df = aq_df_pseu[(aq_df_pseu['date']>= latest_day + datetime.timedelta(days = -3))]
    # Drop the unnecessary columns for lagging
    filtered_df = filtered_df.drop(['lagging1','lagging2','lagging3','country','city','street','url'],axis = 'columns')
    filtered_df = pd.concat([filtered_df,aq_today_df.drop(['country','city','street','url'],axis = 'columns')],ignore_index=True)
    filtered_df = filtered_df.set_index('date').sort_index()
    filtered_df.insert(1,'lagging1',filtered_df['pm25'].shift(1))
    filtered_df.insert(1,'lagging2',filtered_df['pm25'].shift(2))
    filtered_df.insert(1,'lagging3',filtered_df['pm25'].shift(3))
    
    filtered_df = filtered_df.reset_index()
    filtered_df = filtered_df[filtered_df['date'] > latest_day]
    filtered_df['country']=country
    filtered_df['city']=city
    filtered_df['street']=street
    filtered_df['url']=aqicn_url
    filtered_df['lagging1'] = filtered_df['lagging1'].astype('float32')
    filtered_df['lagging2'] = filtered_df['lagging2'].astype('float32')
    filtered_df['lagging3'] = filtered_df['lagging3'].astype('float32')
    return filtered_df

filtered_df_1 = add_rolling_daily(air_quality_fg,latest_day_1,street_1,aq_today_df_1,country_1,city_1,url_1)
filtered_df_2 = add_rolling_daily(air_quality_fg,latest_day_2,street_2,aq_today_df_2,country_2,city_2,url_2)
filtered_df_3 = add_rolling_daily(air_quality_fg,latest_day_3,street_3,aq_today_df_3,country_3,city_3,url_3)
filtered_df_4 = add_rolling_daily(air_quality_fg,latest_day_4,street_4,aq_today_df_4,country_4,city_4,url_4)
filtered_df_5 = add_rolling_daily(air_quality_fg,latest_day_5,street_5,aq_today_df_5,country_5,city_5,url_5)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.09s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.94s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.71s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.86s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.93s) 


In [69]:
filtered_df = pd.concat([filtered_df_1,filtered_df_2,filtered_df_3,filtered_df_4,filtered_df_5],axis = 0)

In [70]:
# Insert new data
air_quality_fg.insert(filtered_df,wait = True)

2025-11-17 17:11:51,378 INFO: 	4 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1286306/fs/1273933/fg/1721872


Uploading Dataframe: 0.00% |                                       | Rows 0/0 | Elapsed Time: 00:00 | Remaining Time: ?


Launching job: air_quality_a_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286306/jobs/named/air_quality_a_1_offline_fg_materialization/executions
2025-11-17 17:12:07,540 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-17 17:12:10,723 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-17 17:13:01,891 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-17 17:13:02,070 INFO: Waiting for log aggregation to finish.
2025-11-17 17:13:10,737 INFO: Execution finished successfully.


(Job('air_quality_a_1_offline_fg_materialization', 'SPARK'),
 {
   "success": false,
   "results": [
     {
       "success": false,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "lagging1",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 762075
         }
       },
       "result": {
         "observed_value": null,
         "element_count": 0,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-17T04:11:51.000377Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": false,
       "expectation_config": {
         "expectation_type": "expect_column_

In [71]:
air = air_quality_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.77s) 


In [72]:
air[air['date']==air['date'].max()]

Unnamed: 0,date,pm25,lagging1,lagging2,lagging3,country,city,street,url
3829,2025-11-17 00:00:00+00:00,0.15,0.22,0.2,0.4,sweden,Sk√∂nsmon,Bj√∂rneborgsgatan,https://api.waqi.info/feed/A60889
4841,2025-11-17 00:00:00+00:00,0.0,0.1,0.2,0.1,sweden,Sundsvall,Korstav√§gen,https://api.waqi.info/feed/A351115
6310,2025-11-17 00:00:00+00:00,7.0,25.0,22.0,32.0,sweden,Sundsvall,K√∂pmangatan,https://api.waqi.info/feed/@10010
7624,2025-11-17 00:00:00+00:00,0.0,4.0,1.0,1.0,sweden,√ñrnsk√∂ldsvik,H√∂rnettv√§gen,https://api.waqi.info/feed/A105325
8572,2025-11-17 00:00:00+00:00,5.0,33.0,21.0,44.0,sweden,Sundsvall,Bergsgatan,https://api.waqi.info/feed/@13973


In [74]:
daily_df = pd.concat([daily_df_1,daily_df_2,daily_df_3,daily_df_4,daily_df_5], axis = 0)

In [75]:
# Insert new data
weather_fg.insert(daily_df, wait=True)

2025-11-17 17:13:42,365 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1286306/fs/1273933/fg/1721873


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 40/40 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_a_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286306/jobs/named/weather_a_1_offline_fg_materialization/executions
2025-11-17 17:13:59,975 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-17 17:14:06,396 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-17 17:16:17,929 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-17 17:16:18,109 INFO: Waiting for log aggregation to finish.
2025-11-17 17:16:40,297 INFO: Execution finished successfully.


(Job('weather_a_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 762077
         }
       },
       "result": {
         "observed_value": 1.2979984283447266,
         "element_count": 40,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-17T04:13:42.000365Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_typ

In [76]:
fs = project.get_feature_store() 
air_quality_fg = fs.get_feature_group(
    name='air_quality_a',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_a',
    version=1,
)

In [77]:
weather = weather_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.89s) 


In [78]:
weather[weather['date'] == weather['date'].max()]

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city,street
9675,2025-11-24 00:00:00+00:00,1.5,0.2,11.525623,88.210129,√ñrnsk√∂ldsvik,H√∂rnettv√§gen
9676,2025-11-24 00:00:00+00:00,1.85,0.1,9.885262,56.888649,Sundsvall,Bergsgatan
9688,2025-11-24 00:00:00+00:00,1.85,0.1,9.885262,56.888649,Sundsvall,Korstav√§gen
9689,2025-11-24 00:00:00+00:00,1.85,0.1,9.885262,56.888649,Sundsvall,K√∂pmangatan
9707,2025-11-24 00:00:00+00:00,1.7,0.1,9.885262,56.888649,Sk√∂nsmon,Bj√∂rneborgsgatan


## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
