In [1]:
import sys
from pathlib import Path
import os

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
if os.path.exists(f"{root_dir}/.env"):
    settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/mac/Documents/Documents/ID2223_Scalable/lab1_new/Air_Quality_Prediction


ModuleNotFoundError: No module named 'pydantic'

# <span style="font-width:bold; font-size: 3rem; color:#333;">Training Pipeline</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Select features for the model and create a Feature View with the selected features  
2. Add lag features and create a new Feature Group    
3. Create training data using the feature view
4. Train baseline model (only weather features) and enhanced model (weather+lagged features)
5. Evaluate model performance
6. Save model to model registry

### <span style='color:#ff5f27'> üìù Imports

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from xgboost import XGBRegressor
from xgboost import plot_importance
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import hopsworks
from mlfs.airquality import util
import json

import warnings
warnings.filterwarnings("ignore")

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [None]:
# Check if HOPSWORKS_API_KEY env variable is set or if it is set in ~/.env
if settings.HOPSWORKS_API_KEY is not None:
    api_key = settings.HOPSWORKS_API_KEY.get_secret_value()
    os.environ['HOPSWORKS_API_KEY'] = api_key
project = hopsworks.login()
fs = project.get_feature_store() 

secrets = hopsworks.get_secrets_api()
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
location = json.loads(location_str)
country=location['country']
city=location['city']
street=location['street']

In [None]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

--- 

## <span style="color:#ff5f27;"> Add Lagged Features  </span>

Lag features can capture patterns in time series. We will add PM2.5 values from 1, 2, and 3 days ago as new features.


In [None]:
# Read air quality data from Feature Group
print("üìä Reading Air Quality data from Feature Group...")
air_quality_df = air_quality_fg.read()
print(f"Original rows: {len(air_quality_df)}")
print(f"Date range: {air_quality_df['date'].min()} to {air_quality_df['date'].max()}")
print(f"\nOriginal columns: {air_quality_df.columns.tolist()}")


In [None]:
# Add lag features
print("\nüîÑ Adding lag features (PM2.5 from 1, 2, 3 days ago)...")
air_quality_with_lags = util.add_lagged_features(
    air_quality_df,
    target_column='pm25',
    lags=[1, 2, 3]
)

print("\n‚úÖ Lag features created successfully!")
print(f"New columns: {[col for col in air_quality_with_lags.columns if 'lag_' in col]}")
print(f"Rows after removing NaN: {len(air_quality_with_lags)}")

display(air_quality_with_lags[['date', 'city', 'pm25', 'lag_1_pm25', 'lag_2_pm25', 'lag_3_pm25']].head(10))

### Save data with lagged features to Feature Group

We create a new Feature Group 'air_quality-with_1ags' to store data containing lagged features, which can be reused in future training and inference.


In [None]:
# Create or get Feature Group with lagged features
print("üíæ Creating/retrieving Feature Group 'air_quality_with_lags'...")

air_quality_lag_fg = fs.get_or_create_feature_group(
    name='air_quality_with_lags',
    version=1,
    description='Air Quality data with 1, 2, 3-day lagged PM2.5 features',
    primary_key=['city', 'street', 'date'],
    event_time='date',
    online_enabled=False,
)

# Insert data into Feature Group
print(f"üìù Inserting {len(air_quality_with_lags)} records into Feature Group...")
air_quality_lag_fg.insert(air_quality_with_lags, write_options={"wait_for_job": True})

print("\n‚úÖ Feature Group 'air_quality_with_lags' updated successfully!")
print(f"   Total features: {len(air_quality_with_lags.columns)}")
print(f"   Total records: {len(air_quality_with_lags)}")

--- 

## <span style="color:#ff5f27;"> üñç Create Feature Views </span>

We will create two Feature Views for model comparison:
1. **Baseline Feature View**: Weather features only (4 features)
2. **Enhanced Feature View**: Weather + lagged features (7 features)

In [None]:
# 1. Baseline Feature View: Weather features only
print("üìã Creating Baseline Feature View (weather features only)...")
selected_features_baseline = air_quality_fg.select(['pm25', 'date']).join(
    weather_fg.select_features(), 
    on=['city']
)

In [None]:
# 2. Enhanced Feature View: Weather + lagged features
print("üìã Creating Enhanced Feature View (weather + lagged features)...")
selected_features_enhanced = air_quality_lag_fg.select([
    'pm25', 
    'date',
    'lag_1_pm25',
    'lag_2_pm25', 
    'lag_3_pm25'
]).join(
    weather_fg.select_features(), 
    on=['city']
)

print("‚úÖ Feature queries created!")

### About Feature Views

`Feature Views` are selections of features from different **Feature Groups** that form the input/output API (schema) for a model. They can create **Training Data** and retrieve **Inference Data**.

In [None]:
# Create Baseline Feature View
print("üíæ Creating Baseline Feature View...")
feature_view_baseline = fs.get_or_create_feature_view(
    name='air_quality_fv_baseline',
    description="Baseline: weather features only with air quality as target",
    version=1,
    labels=['pm25'],
    query=selected_features_baseline,
)
print("‚úÖ Baseline Feature View created!")

In [None]:
# Create Enhanced Feature View
print("üíæ Creating Enhanced Feature View...")
feature_view_enhanced = fs.get_or_create_feature_view(
    name='air_quality_fv_enhanced',
    description="Enhanced: weather + lagged PM2.5 features with air quality as target",
    version=1,
    labels=['pm25'],
    query=selected_features_enhanced,
)
print("‚úÖ Enhanced Feature View created!")

## <span style="color:#ff5f27;">ü™ù Prepare Training and Test Data </span>

We use a time-series split: data before `start_date_test_data` for training, data after for testing.

In [None]:
start_date_test_data = "2025-05-01"
# Convert string to datetime object
test_start = datetime.strptime(start_date_test_data, "%Y-%m-%d")

In [None]:
# Get training data for Baseline model (weather features only)
print("üìä Loading Baseline training data...")
X_train_base, X_test_base, y_train, y_test = feature_view_baseline.train_test_split(
    test_start=test_start
)
print(f"‚úÖ Baseline data: {len(X_train_base)} train, {len(X_test_base)} test")

In [None]:
# Get training data for Enhanced model (weather + lagged features)
print("\nüìä Loading Enhanced training data...")
X_train_enh, X_test_enh, y_train_enh, y_test_enh = feature_view_enhanced.train_test_split(
    test_start=test_start
)
print(f"‚úÖ Enhanced data: {len(X_train_enh)} train, {len(X_test_enh)} test")

In [None]:
# Baseline features (4 weather features)
print("Baseline features:")
display(X_train_base.head())

In [None]:
# Enhanced features (4 weather + 3 lagged features)
print("Enhanced features:")
display(X_train_enh.head())

In [None]:
# Prepare feature data (drop date column)
X_train_base_features = X_train_base.drop(columns=['date'])
X_test_base_features = X_test_base.drop(columns=['date'])

X_train_enh_features = X_train_enh.drop(columns=['date'])
X_test_enh_features = X_test_enh.drop(columns=['date'])

print(f"Baseline features: {X_train_base_features.shape[1]}")
print(f"Enhanced features: {X_train_enh_features.shape[1]}")
print(f"\nBaseline columns: {X_train_base_features.columns.tolist()}")
print(f"Enhanced columns: {X_train_enh_features.columns.tolist()}")

In [None]:
# View target variable
print(f"Target variable (PM2.5): {len(y_train)} train, {len(y_test)} test")
display(y_train.head())

---

## <span style="color:#ff5f27;">üß¨ Model Training and Comparison</span>

We will train two XGBoost regression models:
1. **Baseline Model**: Using 4 weather features only
2. **Enhanced Model**: Using 4 weather features + 3 lagged features

By comparing the two models' performance, we verify if lagged features improve prediction accuracy.

In [None]:
# Train Baseline Model (Weather Features Only)
print("\nüöÄ Training Baseline model (weather features only)...")
model_baseline = XGBRegressor(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42
)

model_baseline.fit(X_train_base_features, y_train)
print("‚úÖ Baseline model training completed!")

In [None]:
# Evaluate Baseline Model
print("\nüìä Evaluating Baseline model performance...")
y_pred_baseline = model_baseline.predict(X_test_base_features)

mse_baseline = mean_squared_error(y_test.iloc[:,0], y_pred_baseline)
rmse_baseline = np.sqrt(mse_baseline)
mae_baseline = mean_absolute_error(y_test.iloc[:,0], y_pred_baseline)
r2_baseline = r2_score(y_test.iloc[:,0], y_pred_baseline)

print("\n‚úÖ Baseline Model Results (Weather Features Only):")
print(f"  MSE:  {mse_baseline:.4f}")
print(f"  RMSE: {rmse_baseline:.4f}")
print(f"  MAE:  {mae_baseline:.4f}")
print(f"  R¬≤:   {r2_baseline:.4f}")

In [None]:
# Train Enhanced Model (Weather + Lagged Features)
print("\nüöÄ Training Enhanced model (weather + lagged features)...")
model_enhanced = XGBRegressor(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42
)

model_enhanced.fit(X_train_enh_features, y_train_enh)
print("‚úÖ Enhanced model training completed!")

In [None]:
# Evaluate Enhanced Model
print("\nüìä Evaluating Enhanced model performance...")
y_pred_enhanced = model_enhanced.predict(X_test_enh_features)

mse_enhanced = mean_squared_error(y_test_enh.iloc[:,0], y_pred_enhanced)
rmse_enhanced = np.sqrt(mse_enhanced)
mae_enhanced = mean_absolute_error(y_test_enh.iloc[:,0], y_pred_enhanced)
r2_enhanced = r2_score(y_test_enh.iloc[:,0], y_pred_enhanced)

print("\n‚úÖ Enhanced Model Results (Weather + Lagged Features):")
print(f"  MSE:  {mse_enhanced:.4f}")
print(f"  RMSE: {rmse_enhanced:.4f}")
print(f"  MAE:  {mae_enhanced:.4f}")
print(f"  R¬≤:   {r2_enhanced:.4f}")

---

## <span style="color:#ff5f27;">üìä Performance Comparison and Analysis</span>

In [None]:
# Create comparison table
comparison_df = pd.DataFrame({
    'Model': ['Baseline (Weather Only)', 'Enhanced (Weather + Lags)'],
    'Features': [X_train_base_features.shape[1], X_train_enh_features.shape[1]],
    'MSE': [mse_baseline, mse_enhanced],
    'RMSE': [rmse_baseline, rmse_enhanced],
    'MAE': [mae_baseline, mae_enhanced],
    'R¬≤': [r2_baseline, r2_enhanced]
})

print("="*80)
print("MODEL PERFORMANCE COMPARISON")
print("="*80)
display(comparison_df)

# Calculate improvement percentages
mse_improve = ((mse_baseline - mse_enhanced) / mse_baseline * 100)
rmse_improve = ((rmse_baseline - rmse_enhanced) / rmse_baseline * 100)
mae_improve = ((mae_baseline - mae_enhanced) / mae_baseline * 100)
r2_improve = ((r2_enhanced - r2_baseline) / abs(r2_baseline) * 100) if r2_baseline != 0 else 0

print("\n" + "="*80)
print("üìà PERFORMANCE IMPROVEMENT:")
print("="*80)
print(f"  MSE decreased:  {mse_improve:+.2f}%")
print(f"  RMSE decreased: {rmse_improve:+.2f}%")
print(f"  MAE decreased:  {mae_improve:+.2f}%")
print(f"  R¬≤ improved:    {r2_improve:+.2f}%")

if r2_enhanced > r2_baseline:
    print("\n‚úÖ CONCLUSION: Lagged features significantly improved model performance!")
else:
    print("\n‚ö†Ô∏è CONCLUSION: Lagged features did not improve performance, needs further investigation.")

### Feature Importance Tables

In [None]:
# Feature importance tables for both models
print("\n" + "="*80)
print("FEATURE IMPORTANCE ANALYSIS")
print("="*80)

# Baseline model feature importance
feature_importance_baseline = pd.DataFrame({
    'feature': X_train_base_features.columns,
    'importance': model_baseline.feature_importances_
}).sort_values('importance', ascending=False)

print("\nüìä Baseline Model Feature Importance (4 weather features):")
display(feature_importance_baseline)

# Enhanced model feature importance
feature_importance_enhanced = pd.DataFrame({
    'feature': X_train_enh_features.columns,
    'importance': model_enhanced.feature_importances_
}).sort_values('importance', ascending=False)

print("\nüìä Enhanced Model Feature Importance (7 features):")
display(feature_importance_enhanced)

print("\nüí° Key Observations:")
print(f"  - Top feature in Baseline: {feature_importance_baseline.iloc[0]['feature']}")
print(f"  - Top feature in Enhanced: {feature_importance_enhanced.iloc[0]['feature']}")
if 'lag_' in feature_importance_enhanced.iloc[0]['feature']:
    print("  - Lagged features dominate in Enhanced model, explaining the performance gain!")

In [None]:
# Prepare visualization data for both models
print("\nüìä Preparing prediction results for visualization...")

# Baseline model predictions
df_baseline = y_test.copy()
df_baseline['predicted_pm25'] = y_pred_baseline
df_baseline['date'] = X_test_base['date'].values
df_baseline = df_baseline.sort_values(by=['date'])

# Enhanced model predictions
df_enhanced = y_test_enh.copy()
df_enhanced['predicted_pm25'] = y_pred_enhanced
df_enhanced['date'] = X_test_enh['date'].values
df_enhanced = df_enhanced.sort_values(by=['date'])

In [None]:
# Display prediction samples
print("\nüìä Baseline Model - Prediction Samples:")
display(df_baseline.head(10))

print("\nüìä Enhanced Model - Prediction Samples:")
display(df_enhanced.head(10))

In [None]:
# Create directory for images
images_dir = "air_quality_model/images"
os.makedirs(images_dir, exist_ok=True)
print(f"Images directory: {images_dir}")

In [None]:
# Plot PM2.5 predictions for both models
print("\nüìä Plotting PM2.5 predictions...")

# Baseline model plot
print("\n1. Baseline Model (Weather Only):")
file_path_baseline = images_dir + "/pm25_hindcast_baseline.png"
plt_obj_baseline = util.plot_air_quality_forecast(city, street, df_baseline, file_path_baseline, hindcast=True)
#plt_obj_baseline.suptitle(f'Baseline Model - PM2.5 Predictions (R¬≤={r2_baseline:.4f})', fontsize=14, fontweight='bold', y=0.98)
plt_obj_baseline.show()
print(f"‚úÖ Baseline plot saved: {file_path_baseline}")

# Enhanced model plot
print("\n2. Enhanced Model (Weather + Lagged Features):")
file_path_enhanced = images_dir + "/pm25_hindcast_enhanced.png"
plt_obj_enhanced = util.plot_air_quality_forecast(city, street, df_enhanced, file_path_enhanced, hindcast=True)
#plt_obj_enhanced.suptitle(f'Enhanced Model - PM2.5 Predictions (R¬≤={r2_enhanced:.4f})', fontsize=14, fontweight='bold', y=0.98)
plt_obj_enhanced.show()
print(f"‚úÖ Enhanced plot saved: {file_path_enhanced}")

print(f"\nüìä Both prediction plots saved to: {images_dir}")

In [None]:
# Plot feature importance comparison using XGBoost built-in function
print("\nüìä Plotting feature importance comparison...")

# Create side-by-side comparison
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Baseline model feature importance
plt.sca(axes[0])
plot_importance(model_baseline, ax=axes[0], max_num_features=10, 
                importance_type='weight', show_values=True)
axes[0].set_title(f'Baseline Model (R¬≤={r2_baseline:.4f})', 
                 fontsize=14, fontweight='bold', pad=15)
axes[0].set_xlabel('F score (Weight)', fontsize=12)
axes[0].grid(axis='x', alpha=0.3)

# Enhanced model feature importance
plt.sca(axes[1])
plot_importance(model_enhanced, ax=axes[1], max_num_features=10, 
                importance_type='weight', show_values=True)
axes[1].set_title(f'Enhanced Model (R¬≤={r2_enhanced:.4f})', 
                 fontsize=14, fontweight='bold', pad=15)
axes[1].set_xlabel('F score (Weight)', fontsize=12)
axes[1].grid(axis='x', alpha=0.3)

plt.tight_layout()

# Save the plot
feature_importance_path = images_dir + "/feature_importance_comparison.png"
plt.savefig(feature_importance_path, dpi=300, bbox_inches='tight')
print(f"‚úÖ Feature importance comparison plot saved: {feature_importance_path}")

plt.show()

---

## <span style='color:#ff5f27'>üóÑ Save Models and Upload to Hopsworks</span>

We save both models locally and upload the Enhanced model (best performing) to Hopsworks Model Registry.

In [None]:
# Save both models locally
print("\nüíæ Saving models locally...")

# Save Baseline model
baseline_model_dir = "air_quality_model"
os.makedirs(baseline_model_dir, exist_ok=True)
model_baseline.save_model(baseline_model_dir + "/model.json")
print(f"  ‚úÖ Baseline model saved: {baseline_model_dir}/model.json")

# Save Enhanced model
enhanced_model_dir = "air_quality_model_enhanced"
os.makedirs(enhanced_model_dir, exist_ok=True)
model_enhanced.save_model(enhanced_model_dir + "/model.json")
print(f"  ‚úÖ Enhanced model saved: {enhanced_model_dir}/model.json")

In [None]:
# Prepare model metrics for both models
# Note: Hopsworks only accepts numeric metrics and requires Python native types (not numpy types)
print("\nüìä Preparing model metrics...")

# Baseline model metrics (convert to Python native types)
res_dict_baseline = { 
    "MSE": float(mse_baseline),
    "RMSE": float(rmse_baseline),
    "MAE": float(mae_baseline),
    "R2": float(r2_baseline),
    "n_features": int(X_train_base_features.shape[1])
}

# Enhanced model metrics (convert to Python native types)
res_dict_enhanced = { 
    "MSE": float(mse_enhanced),
    "RMSE": float(rmse_enhanced),
    "MAE": float(mae_enhanced),
    "R2": float(r2_enhanced),
    "n_features": int(X_train_enh_features.shape[1]),
    "baseline_R2": float(r2_baseline),
    "improvement_percent": float(r2_improve)
}

print(f"  Baseline metrics: MSE={mse_baseline:.2f}, R¬≤={r2_baseline:.4f}")
print(f"  Enhanced metrics: MSE={mse_enhanced:.2f}, R¬≤={r2_enhanced:.4f}, Improvement={r2_improve:+.2f}%")

In [None]:
# Upload models to Hopsworks Model Registry
print("\nüóÑ Uploading models to Hopsworks Model Registry...")
mr = project.get_model_registry()

# 1. Upload Baseline model
print("\n1Ô∏è‚É£ Uploading Baseline model...")
aq_model_baseline = mr.python.create_model(
    name="air_quality_xgboost_model_baseline", 
    metrics=res_dict_baseline,
    feature_view=feature_view_baseline,
    description="Air Quality (PM2.5) predictor - Baseline model using weather features only",
)
aq_model_baseline.save(baseline_model_dir)
print("  ‚úÖ Baseline model uploaded!")
print(f"     Model: air_quality_xgboost_model_baseline")
print(f"     Feature View: air_quality_fv_baseline")
print(f"     R¬≤ score: {r2_baseline:.4f}")

# 2. Upload Enhanced model
print("\n2Ô∏è‚É£ Uploading Enhanced model...")
aq_model_enhanced = mr.python.create_model(
    name="air_quality_xgboost_model_enhanced", 
    metrics=res_dict_enhanced,
    feature_view=feature_view_enhanced,
    description="Air Quality (PM2.5) predictor with lagged features - Enhanced model using weather + 1,2,3-day lagged PM2.5",
)
aq_model_enhanced.save(enhanced_model_dir)
print("  ‚úÖ Enhanced model uploaded!")
print(f"     Model: air_quality_xgboost_model_enhanced")
print(f"     Feature View: air_quality_fv_enhanced")
print(f"     R¬≤ score: {r2_enhanced:.4f}")
print(f"     Improvement: {r2_improve:+.2f}%")

print(f"\n{'='*80}")
print("üéâ Both models successfully uploaded to Hopsworks Model Registry!")
print(f"{'='*80}")

---
## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 04: Batch Inference</span>

In the following notebook you will use your model for Batch Inference.
