# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"> **Air Quality** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 04: Batch Inference</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Download model and batch inference data
2. Make predictions, generate PNG for forecast
3. Store predictions in a monitoring feature group adn generate PNG for hindcast

## <span style='color:#ff5f27'> üìù Imports

In [1]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml


if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Read the API keys and configuration variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/sebastian/Documents/KTH/Year 1/ID2223-SML/mlfs-book
HopsworksSettings initialized!


In [2]:
import datetime
import pandas as pd
from xgboost import XGBRegressor
import hopsworks
import json
from mlfs.airquality import util
import os

In [3]:
today = datetime.datetime.now() - datetime.timedelta(0)
tomorrow = today + datetime.timedelta(days = 1)
today

datetime.datetime(2025, 11, 18, 15, 15, 39, 16748)

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [4]:
project = hopsworks.login(engine="python", project="air_quality_prediction")
fs = project.get_feature_store() 

secrets = hopsworks.get_secrets_api()
# location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
# location = json.loads(location_str)
# country=location['country']
# city=location['city']
# street=location['street']

sensor_secret_names = [
    "SENSOR_LOCATION_bankgatan_JSON",
    "SENSOR_LOCATION_linakersvagen_JSON",
    "SENSOR_LOCATION_trollebergsvagen_JSON",
]

sensors = {}
for name in sensor_secret_names:
    data = json.loads(secrets.get_secret(name).value)
    sensors[data["street"]] = {
        "country":   data["country"],
        "city":      data["city"],
        "street":    data["street"],
        "latitude":  data["latitude"],
        "longitude": data["longitude"],
        "aqicn_url": data["aqicn_url"],
    }

2025-11-18 15:15:39,026 INFO: Initializing external client
2025-11-18 15:15:39,026 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2025-11-18 15:15:40,527 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271977


In [5]:
from sklearn.preprocessing import LabelEncoder


air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=1
)

air_quality_df_full = air_quality_fg.read()
streets_sorted = sorted(air_quality_df_full['street'].dropna().unique().tolist())
street_encoder = LabelEncoder().fit(streets_sorted)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.92s) 


## <span style="color:#ff5f27;">ü™ù Download the model from Model Registry</span>

In [6]:
mr = project.get_model_registry()

retrieved_model = mr.get_model(
    name="air_quality_xgboost_model",
    version=3,
)

fv = retrieved_model.get_feature_view()

# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()

2025-11-18 15:15:48,633 INFO: Initializing for batch retrieval of feature vectors


Downloading: 0.000%|          | 0/106511 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 1 files)... 

Downloading: 0.000%|          | 0/119756 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 2 files)... 

Downloading: 0.000%|          | 0/106678 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 3 files)... 

Downloading: 0.000%|          | 0/116397 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 4 files)... 

Downloading: 0.000%|          | 0/26776 elapsed<00:00 remaining<?

Downloading model artifact (1 dirs, 5 files)... DONE

In [7]:
# Loading the XGBoost regressor model and label encoder from the saved model directory
# retrieved_xgboost_model = joblib.load(saved_model_dir + "/xgboost_regressor.pkl")
retrieved_xgboost_model = XGBRegressor()

retrieved_xgboost_model.load_model(saved_model_dir + "/model.json")

# Displaying the retrieved XGBoost regressor model
retrieved_xgboost_model

## <span style="color:#ff5f27;">‚ú® Get Weather Forecast Features with Feature View   </span>



In [8]:
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)




### <span style="color:#ff5f27;">ü§ñ Making the predictions</span>

In [9]:
import datetime
today = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

monitor_fg = fs.get_or_create_feature_group(
    name='aq_predictions_clean',
    description='Air Quality prediction monitoring',
    version=1,
    primary_key=['city','street','date','days_before_forecast_day'],
    event_time="date",
)

monitor_fg

<hsfs.feature_group.FeatureGroup at 0x139ba37c0>

In [10]:
all_preds_for_city = []  

weather_base = weather_fg.filter(weather_fg.date >= today).read().copy()
weather_base['date'] = pd.to_datetime(weather_base['date']).dt.tz_localize(None)
weather_base = weather_base.sort_values('date').reset_index(drop=True)
weather_base['days_before_forecast_day'] = ((weather_base['date'] - today).dt.days)
weather_base = weather_base.query('days_before_forecast_day >= 1')

for street, meta in sensors.items():

    batch_data = weather_base.copy()
    batch_data['country'] = meta['country']
    batch_data['city']    = meta['city']
    batch_data['street']  = meta['street']

    batch_data['street_encode'] = street_encoder.transform(batch_data['street']).astype('float32')

    # doing predictions for specific street. This allows for error raise if e.g. one sensor has no data
    hist_aq = air_quality_fg.filter(
        (air_quality_fg.street == street) & 
        (air_quality_fg.date < today)
    ).read()
    
    if hist_aq.empty:
        print(f"[WARN] No historical air quality for {street}; skipping.")
        continue
    
    hist_aq['date'] = pd.to_datetime(hist_aq['date']).dt.tz_localize(None)
    hist_aq = hist_aq.sort_values('date').tail(3).reset_index(drop=True) #only keep the last three days
    
    if len(hist_aq) < 3:
        print(f"[WARN] Less than 3 days of history for {street}; skipping.")
        continue

    predictions = []
    
    for idx, row in batch_data.iterrows(): # is in the correct order
        day_num = row['days_before_forecast_day']
        forecast_idx = idx
        
        if forecast_idx == 0:
            print("Using 3 lagged historical feature.")
            lag_1 = hist_aq.iloc[-1]['pm25']
            lag_2 = hist_aq.iloc[-2]['pm25']
            lag_3 = hist_aq.iloc[-3]['pm25']
        elif forecast_idx == 1:
            print("Using 2 lagged historical feature.")
            lag_1 = predictions[0]
            lag_2 = hist_aq.iloc[-1]['pm25']
            lag_3 = hist_aq.iloc[-2]['pm25']
        elif forecast_idx == 2:
            print("Using 1 lagged historical feature.")
            lag_1 = predictions[1]
            lag_2 = predictions[0]
            lag_3 = hist_aq.iloc[-1]['pm25']
        else:
            print("Using only predicted data as lagged feature.")
            lag_1 = predictions[forecast_idx - 1]
            lag_2 = predictions[forecast_idx - 2]
            lag_3 = predictions[forecast_idx - 3]
        
        features = [
            row['temperature_2m_mean'],
            row['precipitation_sum'],
            row['wind_speed_10m_max'],
            row['wind_direction_10m_dominant'],
            row['street_encode'],
            lag_1,
            lag_2,
            lag_3
        ]
        
        pred = retrieved_xgboost_model.predict([features])[0]
        predictions.append(pred)

    batch_data['predicted_pm25'] = predictions
    
    to_insert = batch_data.drop(columns=['street_encode']).copy()
    to_insert['country'] = to_insert['country'].astype(str)
    to_insert['city'] = to_insert['city'].astype(str)
    to_insert['street'] = to_insert['street'].astype(str)
    to_insert['date'] = pd.to_datetime(to_insert['date']).dt.tz_localize(None)                                                                 
    to_insert['days_before_forecast_day'] = to_insert['days_before_forecast_day'].astype('int64')                                                                    

    all_preds_for_city.append(to_insert)

# upload full dataframe
if all_preds_for_city:
    final_to_insert = pd.concat(all_preds_for_city).reset_index(drop=True)
    #monitor_fg.insert(final_to_insert, wait=True)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.37s) 


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.86s) 
Using 3 lagged historical feature.
Using 2 lagged historical feature.
Using 1 lagged historical feature.
Using only predicted data as lagged feature.
Using only predicted data as lagged feature.
Using only predicted data as lagged feature.
Using only predicted data as lagged feature.


2025-11-18 15:15:58,884 ERROR: Parser Error: . Detail: Python exception: Traceback (most recent call last):
  File "/usr/src/app/src/server.py", line 142, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/src/app/src/server.py", line 166, in wrapper
    result = func(instance, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/src/app/src/server.py", line 196, in do_get
    return self._read_query(context, path, command)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

FeatureStoreException: Could not read data using Hopsworks Query Service.

In [None]:
print(predictions)

[3.3871136, 2.360062, 11.490718, 10.554521, 12.044299, 9.002187, 11.92575]


In [None]:
batch_data.dtypes

date                           datetime64[us]
temperature_2m_mean                   float32
precipitation_sum                     float32
wind_speed_10m_max                    float32
wind_direction_10m_dominant           float32
city                                   object
country                                object
street                                 object
days_before_forecast_day                int64
street_encode                         float32
dtype: object

In [None]:
print("Timedelta:")
print((batch_data['date'] - today).dt.days + 1)

Timedelta:
0    2
1    3
2    4
3    5
4    6
5    7
6    8
Name: date, dtype: int64


### <span style="color:#ff5f27;">ü§ñ Saving the predictions (for monitoring) to a Feature Group</span>

### Create Forecast Graph
Draw a graph of the predictions with dates as a PNG and save it to the github repo
Show it on github pages

In [None]:
from pathlib import Path
import matplotlib.pyplot as plt
root_dir = str(Path().absolute())

from pathlib import Path

# this part is specific to me due to directory naming issues. 
def find_repo_root():
    here = Path.cwd()
    for p in [here, *here.parents]:
        if (p / "docs").exists():
            return p
    return here  

project_root = find_repo_root()

img_dir = project_root / "docs" / "air-quality" / "assets" / "img"
img_dir.mkdir(parents=True, exist_ok=True)

today_str = pd.Timestamp.today().strftime("%Y-%m-%d")
today_str = today.strftime("%Y-%m-%d")

dataset_api = project.get_dataset_api()
if not dataset_api.exists("Resources/airquality"):
    dataset_api.mkdir("Resources/airquality")



In [None]:
city = next(iter(sensors.values()))['city']  # since all sensors are in the same city
preds_1day_city = monitor_fg.filter(
    (monitor_fg.city == city) &
    (monitor_fg.days_before_forecast_day == 1)
).read().copy()

def _norm_dates(series):
    s = pd.to_datetime(series, errors="coerce")
    try:
        s = s.dt.tz_convert(None)   
    except Exception:
        pass
    return s.dt.normalize()

for street, meta in sensors.items():

    preds_1day = preds_1day_city[preds_1day_city['street'] == street].copy()

    outcomes = air_quality_df_full[
        (air_quality_df_full['city'] == meta['city']) &
        (air_quality_df_full['street'] == street)
    ][['date', 'pm25']].copy()


    try:
        nday_df = next(df for df in all_preds_for_city if df['street'].iloc[0] == street)
    except StopIteration:
        print(f"[WARN] No in-memory forecast DF found for street '{street}'. Skipping plots.")
        continue


    safe_city = str(meta['city']).replace(' ', '_').lower()
    safe_street = str(street).replace(' ', '_').lower()

    pred_file_path = str(img_dir / f"{safe_city}_{safe_street}_pm25_forecast.png")
    hindcast_file_path = str(img_dir / f"{safe_city}_{safe_street}_pm25_hindcast_1day.png")


    print(
        street,
        "rows:", len(nday_df),
        "nans:", nday_df['predicted_pm25'].isna().sum() if 'predicted_pm25' in nday_df.columns else 'col-missing',
        "dates:", nday_df['date'].min(), "‚Üí", nday_df['date'].max()
    )
    plt = util.plot_air_quality_forecast(meta['city'], street, nday_df, pred_file_path)
    plt.close()


    if preds_1day.empty or outcomes.empty:
        print(f"[WARN] Empty preds_1day or outcomes for '{street}'. Skipping hindcast plot.")
    else:
  
        preds_1day['date'] = _norm_dates(preds_1day['date'])
        outcomes['date']   = _norm_dates(outcomes['date'])

        min_d, max_d = outcomes['date'].min(), outcomes['date'].max()
        preds_1day = preds_1day[(preds_1day['date'] >= min_d) & (preds_1day['date'] <= max_d)].copy()

        hindcast_df = (
            preds_1day[['date', 'predicted_pm25']]
            .merge(outcomes, on='date', how='inner')
            .sort_values('date')
        )

        if hindcast_df.empty:
            print(f"[WARN] No overlapping dates for hindcast at '{street}'. Skipping hindcast plot.")
        else:
            plt = util.plot_air_quality_forecast(
                meta['city'], street, hindcast_df, hindcast=True, file_path=hindcast_file_path
            )
            plt.close()


    hops_dir = f"Resources/airquality/{safe_city}_{safe_street}_{today_str}"
    dataset_api.upload(pred_file_path, hops_dir, overwrite=True)
    if Path(hindcast_file_path).exists():
        dataset_api.upload(hindcast_file_path, hops_dir, overwrite=True)

print(f"The images are saved here: {project.get_url()}/settings/fb/path/Resources/airquality")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.10s) 
bankgatan rows: 7 nans: 0 dates: 2025-11-19 00:00:00 ‚Üí 2025-11-25 00:00:00

[WARN] No overlapping dates for hindcast at 'bankgatan'. Skipping hindcast plot.


Uploading /Users/sebastian/Documents/KTH/Year 1/ID2223-SML/mlfs-book/docs/air-quality/assets/img/lund_bankgata‚Ä¶

Uploading /Users/sebastian/Documents/KTH/Year 1/ID2223-SML/mlfs-book/docs/air-quality/assets/img/lund_bankgata‚Ä¶

[WARN] No in-memory forecast DF found for street 'lin√•kersv√§gen'. Skipping plots.
[WARN] No in-memory forecast DF found for street 'trollebergsv√§gen'. Skipping plots.
The images are saved here: https://c.app.hopsworks.ai:443/p/1271977/settings/fb/path/Resources/airquality


### Plot the Hindcast comparing predicted with forecasted values (1-day prior forecast)

__This graph will be empty to begin with - this is normal.__

After a few days of predictions and observations, you will get data points in this graph.

### Upload the prediction and hindcast dashboards (png files) to Hopsworks


---