In [56]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/sambarati/Documents/GitHub/mlfs-book
HopsworksSettings initialized!


# <span style="font-width:bold; font-size: 3rem; color:#333;">Training Pipeline</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Select features for the model and create a Feature View with the selected features
2. Create training data using the feature view
3. Train model
4. Evaluate model performance
5. Save model to model registry

### <span style='color:#ff5f27'> üìù Imports

In [57]:
import os
from datetime import datetime, timedelta
import pandas as pd
import matplotlib.pyplot as plt
from xgboost import XGBRegressor
from xgboost import plot_importance
from sklearn.metrics import mean_squared_error, r2_score
import hopsworks
from mlfs.airquality import util
import json
from sklearn.metrics import mean_absolute_error
import numpy as np

import warnings
warnings.filterwarnings("ignore")

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [58]:
# Check if HOPSWORKS_API_KEY env variable is set or if it is set in ~/.env
if settings.HOPSWORKS_API_KEY is not None:
    api_key = settings.HOPSWORKS_API_KEY.get_secret_value()
    os.environ['HOPSWORKS_API_KEY'] = api_key
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 

secrets = hopsworks.get_secrets_api()
sensors = secrets.get_secret("BIRMINGHAM_SENSOR_LOCATIONS").value
sensors = json.loads(sensors)

2025-11-18 10:46:33,931 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-18 10:46:33,933 INFO: Initializing external client
2025-11-18 10:46:33,933 INFO: Base URL: https://c.app.hopsworks.ai:443
Connection closed.
2025-11-18 10:46:33,933 INFO: Initializing external client
2025-11-18 10:46:33,933 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-18 10:46:35,347 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1267871

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1267871


In [59]:
print(sensors)

[{'country': 'United Kingdom', 'city': 'Birmingham', 'street': 'Birmingham A4540 Roadside', 'aqicn_url': 'https://api.waqi.info/feed/@10101/', 'latitude': '52.476145', 'longitude': '-1.874978', 'csv_file': 'birmingham-a4540-roadside-air-quality.csv'}, {'country': 'United Kingdom', 'city': 'Birmingham', 'street': 'Birmingham Ladywood', 'aqicn_url': 'https://api.waqi.info/feed/@11652/', 'latitude': '52.481346', 'longitude': '-1.918235', 'csv_file': 'birmingham-ladywood-air-quality.csv'}, {'country': 'United Kingdom', 'city': 'Birmingham', 'street': 'Coventry Allesley', 'aqicn_url': 'https://api.waqi.info/feed/@8913/', 'latitude': '52.411628', 'longitude': '-1.560189', 'csv_file': 'coventry-allesley-air-quality.csv'}]


In [60]:
air_quality_fg_2 = fs.get_feature_group(
    name='air_quality',
    version=2,
)

weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

In [61]:
type(air_quality_fg_2)

hsfs.feature_group.FeatureGroup

In [62]:
df = air_quality_fg_2.read()
df.head()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.40s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.40s) 


Unnamed: 0,date,pm25,pm_25_1_day_lag,pm_25_2_day_lag,pm_25_3_day_lag,country,city,street,url
0,2022-03-06 00:00:00+00:00,118.0,94.0,112.0,109.0,France,Paris,Boulevard Peripherique Est,https://api.waqi.info/feed/@3088/
1,2023-12-28 00:00:00+00:00,41.0,45.0,41.0,32.0,France,Paris,Boulevard Peripherique Est,https://api.waqi.info/feed/@3088/
2,2018-11-27 00:00:00+00:00,86.0,89.0,70.0,72.0,France,Paris,Boulevard Peripherique Est,https://api.waqi.info/feed/@3088/
3,2015-10-24 00:00:00+00:00,69.0,68.0,61.0,77.0,France,Paris,Boulevard Peripherique Est,https://api.waqi.info/feed/@3088/
4,2014-12-28 00:00:00+00:00,83.0,58.0,84.0,78.0,France,Paris,Boulevard Peripherique Est,https://api.waqi.info/feed/@3088/


In [63]:
df.filter(df.street == 'Birmingham A4540 Roadside')

0
1
2
3
4
...
12823
12824
12825
12826
12827


--- 

## <span style="color:#ff5f27;"> üñç Feature View Creation and Retrieving </span>

### Feature Views

`Feature Views` are selections of features from different **Feature Groups** that make up the input and output API (or schema) for a model. A **Feature Views** can create **Training Data** and also be used in Inference to retrieve inference data.

The Feature Views allows a schema in form of a query with filters, defining a model target feature/label and additional transformation functions (declarative feature encoding).

In order to create Feature View we can use `FeatureStore.get_or_create_feature_view()` method.

You can specify the following parameters:

- `name` - name of a feature group.

- `version` - version of a feature group.

- `labels`- our target variable.

- `transformation_functions` - declarative feature encoding (not used here)

- `query` - selected features/labels for the model 

## <span style="color:#ff5f27;">ü™ù Split the training data into train/test data sets </span>

We use a time-series split here, with training data before this date `start_date_test_data` and test data after this date

In [64]:
start_date_test_data = "2025-05-01"
# Convert string to datetime object
test_start = datetime.strptime(start_date_test_data, "%Y-%m-%d")

The `Feature View` is now saved in Hopsworks and you can retrieve it using `FeatureStore.get_feature_view(name='...', version=1)`.

---

## <span style="color:#ff5f27;">üß¨ Modeling</span>

We will train a regression model to predict pm25 using our 4 features (wind_speed, wind_dir, temp, precipitation)

In [65]:
city = sensors[2]['city']
street = sensors[2]['street']

print(f"Training model for: {city} - {street}")

selected_features = air_quality_fg_2.select([
    'pm25', 
    'date',
    'city',
    'street',
    'pm_25_1_day_lag',
    'pm_25_2_day_lag', 
    'pm_25_3_day_lag'
]).join(weather_fg.select_features(), on=['city']).filter(
    (air_quality_fg_2.city == city) & (air_quality_fg_2.street == street)
)

Training model for: Birmingham - Coventry Allesley
2025-11-18 10:46:41,088 INFO: Using ['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant'] from feature group `weather` as features for the query. To include primary key and event time use `select_all`.


In [66]:
feature_view = fs.get_or_create_feature_view(
    name=f'air_quality_{city.lower().replace(" ", "_")}_{street.lower().replace(" ", "_")}_fv',
    description=f"Air quality model for {city} - {street}",
    version=1,
    labels=['pm25'],
    query=selected_features,
)

Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1267871/fs/1262659/fv/air_quality_birmingham_coventry_allesley_fv/version/1


In [67]:
X_train, X_test, y_train, y_test = feature_view.train_test_split(
    test_start=test_start
)

X_features = X_train.drop(columns=['date', 'city', 'street'])
X_test_features = X_test.drop(columns=['date', 'city', 'street'])

print(f"Features: {list(X_features.columns)}")
print(f"Training samples: {len(X_train)}, Test samples: {len(X_test)}")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.37s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.37s) 
Features: ['pm_25_1_day_lag', 'pm_25_2_day_lag', 'pm_25_3_day_lag', 'temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']
Training samples: 3240, Test samples: 197
Features: ['pm_25_1_day_lag', 'pm_25_2_day_lag', 'pm_25_3_day_lag', 'temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']
Training samples: 3240, Test samples: 197


In [68]:
print(f"Training model for {street}...")
xgb_regressor = XGBRegressor(random_state=42)
xgb_regressor.fit(X_features, y_train)

Training model for Coventry Allesley...


In [69]:
y_pred = xgb_regressor.predict(X_test_features)
mse = mean_squared_error(y_test.iloc[:,0], y_pred)
r2 = r2_score(y_test.iloc[:,0], y_pred)
mae = mean_absolute_error(y_test.iloc[:,0], y_pred)
rmse = np.sqrt(mse)

print(f"\nModel Performance for {street}")
print(f"MSE:  {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAE:  {mae:.4f}")
print(f"R¬≤:   {r2:.4f}")


Model Performance for Coventry Allesley
MSE:  147.3570
RMSE: 12.1391
MAE:  8.1477
R¬≤:   0.0947


In [70]:
model_dir = f"air_quality_model_{city.lower().replace(' ', '_')}_{street.lower().replace(' ', '_')}"
if not os.path.exists(model_dir):
    os.mkdir(model_dir)

---

## <span style='color:#ff5f27'>üóÑ Model Registry</span>

One of the features in Hopsworks is the model registry. This is where you can store different versions of models and compare their performance. Models from the registry can then be served as API endpoints.

In [71]:
xgb_regressor.save_model(model_dir + "/model.json")
print(f"Model saved to {model_dir}")

Model saved to air_quality_model_birmingham_coventry_allesley


In [72]:
res_dict = { 
    "MSE": float(mse),
    "RMSE": float(rmse),
    "MAE": float(mae),
    "R2": float(r2),
}

In [73]:
mr = project.get_model_registry()

model_name = f"air_quality_{city.lower().replace(' ', '_')}_{street.lower().replace(' ', '_')}"

aq_model = mr.python.create_model(
    name=model_name, 
    metrics=res_dict,
    feature_view=feature_view,
    description=f"Air Quality Predictor for {city} - {street}",
)

aq_model.save(model_dir)
print(f"Model '{model_name}' uploaded to registry")

  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /Users/sambarati/Documents/GitHub/mlfs-book/notebooks/airquality/air_quality_model_birmingham_covent‚Ä¶

Uploading /Users/sambarati/Documents/GitHub/mlfs-book/notebooks/airquality/model_schema.json: 0.000%|         ‚Ä¶

Model created, explore it at https://c.app.hopsworks.ai:443/p/1267871/models/air_quality_birmingham_coventry_allesley/1
Model 'air_quality_birmingham_coventry_allesley' uploaded to registry


---
## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 04: Batch Inference</span>

In the following notebook you will use your model for Batch Inference.
