# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"> **Air Quality** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 04: Batch Inference</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Download model and batch inference data
2. Make predictions, generate PNG for forecast
3. Store predictions in a monitoring feature group adn generate PNG for hindcast

## <span style='color:#ff5f27'> üìù Imports

In [None]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml


if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Settings are provided via Hopsworks secrets; no .env needed for this notebook.
# from mlfs import config
# settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

In [None]:
import datetime
import pandas as pd
from xgboost import XGBRegressor
import hopsworks
import json
from mlfs.airquality import util
import os
import numpy as np

In [None]:
today = datetime.datetime.now() - datetime.timedelta(0)
tomorrow = today + datetime.timedelta(days = 1)
today

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [None]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 

# Read per-sensor secret via SENSOR_SLUG
SENSOR_SLUG = os.getenv("SENSOR_SLUG", "visby_ostra_tvargatan")
secrets = hopsworks.get_secrets_api()
location_str = secrets.get_secret(f"SENSOR_LOCATION_JSON_{SENSOR_SLUG.upper()}").value
location = json.loads(location_str)
country = location['country']
city = location['city']
street = location['street']
latitude = location.get('latitude')
longitude = location.get('longitude')

## <span style="color:#ff5f27;">ü™ù Download the model from Model Registry</span>

In [None]:
mr = project.get_model_registry()

# Retrieve latest per-sensor lag model
model_name = f"air_quality_xgboost_model_lags_{SENSOR_SLUG}"
retrieved_model = mr.get_model(
    name=model_name,
)

fv = retrieved_model.get_feature_view()

# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()

In [None]:
# Loading the XGBoost regressor model and label encoder from the saved model directory
# retrieved_xgboost_model = joblib.load(saved_model_dir + "/xgboost_regressor.pkl")
retrieved_xgboost_model = XGBRegressor()

retrieved_xgboost_model.load_model(saved_model_dir + "/model.json")

# Displaying the retrieved XGBoost regressor model
retrieved_xgboost_model

## <span style="color:#ff5f27;">‚ú® Get Weather Forecast Features with Feature View   </span>



In [None]:
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)
# Filter to target city and dates >= today
batch_data = weather_fg.filter((weather_fg.city == city) & (weather_fg.date >= today)).read()
batch_data

In [None]:
# Prepare lag features for D+1 prediction using latest history from air_quality v3
hist_fg = fs.get_feature_group(name='air_quality', version=3)
hist_df = hist_fg.filter(
    (hist_fg.city == city) & (hist_fg.street == street) & (hist_fg.country == country)
).read()
hist_df = hist_df.sort_values('date')
last_vals = hist_df['pm25'].dropna().tail(3).tolist()

# Initialize lag columns with NaN
for col in ['pm25_lag_1','pm25_lag_2','pm25_lag_3']:
    batch_data[col] = np.nan

if len(last_vals) >= 1:
    batch_data.loc[batch_data['date'].idxmin(), 'pm25_lag_1'] = np.float32(last_vals[-1])
if len(last_vals) >= 2:
    batch_data.loc[batch_data['date'].idxmin(), 'pm25_lag_2'] = np.float32(last_vals[-2])
if len(last_vals) >= 3:
    batch_data.loc[batch_data['date'].idxmin(), 'pm25_lag_3'] = np.float32(last_vals[-3])

# Ensure correct dtypes
batch_data[['pm25_lag_1','pm25_lag_2','pm25_lag_3']] = batch_data[['pm25_lag_1','pm25_lag_2','pm25_lag_3']].astype('float32')

batch_data.head(3)


### <span style="color:#ff5f27;">ü§ñ Making the predictions</span>

In [None]:
# Predict 7-day horizon recursively using lag model if available, else weather-only
try:
    expected = retrieved_xgboost_model.get_booster().feature_names or []
except Exception:
    expected = []
use_lags = any(f.startswith('pm25_lag_') for f in expected)

batch_data = batch_data.sort_values('date').copy()
batch_data['predicted_pm25'] = np.nan

if use_lags:
    # Initialize lags from history (computed earlier as last_vals)
    lags = last_vals[-3:] if len(last_vals) >= 3 else []
    # Ensure length 3 by prepending NaNs if needed
    while len(lags) < 3:
        lags.insert(0, np.nan)

    feature_order = expected if expected else ['pm25_lag_1','pm25_lag_2','pm25_lag_3','temperature_2m_mean','precipitation_sum','wind_speed_10m_max','wind_direction_10m_dominant']

    for idx, row in batch_data.iterrows():
        lag1, lag2, lag3 = lags[-1], lags[-2], lags[-3]
        if np.isnan(lag1) or np.isnan(lag2) or np.isnan(lag3):
            # cannot predict until we have 3 values; skip
            continue
        feat = {
            'pm25_lag_1': np.float32(lag1),
            'pm25_lag_2': np.float32(lag2),
            'pm25_lag_3': np.float32(lag3),
            'temperature_2m_mean': row['temperature_2m_mean'],
            'precipitation_sum': row['precipitation_sum'],
            'wind_speed_10m_max': row['wind_speed_10m_max'],
            'wind_direction_10m_dominant': row['wind_direction_10m_dominant'],
        }
        X = pd.DataFrame([[feat[k] for k in feature_order]], columns=feature_order)
        pred = retrieved_xgboost_model.predict(X)[0]
        batch_data.at[idx, 'predicted_pm25'] = float(pred)
        # roll lags forward with prediction
        lags.append(float(pred))
else:
    # Weather-only model: predict all rows
    features = ['temperature_2m_mean','precipitation_sum','wind_speed_10m_max','wind_direction_10m_dominant']
    batch_data['predicted_pm25'] = retrieved_xgboost_model.predict(batch_data[features])

# Restrict forecast plot to predicted rows (up to 7 days)
forecast_df = batch_data[batch_data['predicted_pm25'].notna()].head(7)
forecast_df

In [None]:
batch_data.info()

### <span style="color:#ff5f27;">ü§ñ Saving the predictions (for monitoring) to a Feature Group</span>

In [None]:
# Prepare rows to log for monitoring: only predicted rows (D+1..D+7)
forecast_df = forecast_df.copy()
forecast_df['street'] = street
forecast_df['city'] = city
forecast_df['country'] = country
# Days before forecast day: 1 for D+1, 2 for D+2, ...
forecast_df = forecast_df.sort_values(by=['date'])
forecast_df['days_before_forecast_day'] = range(1, len(forecast_df)+1)
forecast_df

In [None]:
batch_data.info()

### Create Forecast Graph
Draw a graph of the predictions with dates as a PNG and save it to the github repo
Show it on github pages

In [None]:

slug = SENSOR_SLUG
pred_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_forecast_{slug}.png"
plt = util.plot_air_quality_forecast(city, street, forecast_df, pred_file_path)

plt.show()

In [None]:
# Get or create feature group
monitor_fg = fs.get_or_create_feature_group(
    name='aq_predictions',
    description='Air Quality prediction monitoring',
    version=2,
    primary_key=['city','street','date','days_before_forecast_day'],
    event_time="date"
)

In [None]:
monitor_fg.insert(forecast_df, wait=True)

In [None]:
# We will create a hindcast chart for  only the forecasts made 1 day beforehand
monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read()
monitoring_df

In [None]:
air_quality_fg = fs.get_feature_group(name='air_quality', version=3)
# Filter outcomes to this sensor
air_quality_df = air_quality_fg.filter(
    (air_quality_fg.city == city) & (air_quality_fg.street == street) & (air_quality_fg.country == country)
).read()
air_quality_df

In [None]:
#DEL

# Normalize dates to avoid timezone/time-of-day mismatches before hindcast merge
monitoring_df['date'] = pd.to_datetime(monitoring_df['date']).dt.tz_localize(None)
air_quality_df['date'] = pd.to_datetime(air_quality_df['date']).dt.tz_localize(None)
monitoring_df['date'] = pd.to_datetime(monitoring_df['date']).dt.normalize()
air_quality_df['date'] = pd.to_datetime(air_quality_df['date']).dt.normalize()


In [None]:
outcome_df = air_quality_df[['date', 'pm25']]
preds_df =  monitoring_df[['date', 'predicted_pm25']]

hindcast_df = pd.merge(preds_df, outcome_df, on="date")
hindcast_df = hindcast_df.sort_values(by=['date'])

# If there are no outcomes for predictions yet, generate some predictions/outcomes from existing data
if len(hindcast_df) == 0:
    hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model)
hindcast_df

### Plot the Hindcast comparing predicted with forecasted values (1-day prior forecast)

__This graph will be empty to begin with - this is normal.__

After a few days of predictions and observations, you will get data points in this graph.

In [None]:
hindcast_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_hindcast_1day_{slug}.png"
plt = util.plot_air_quality_forecast(city, street, hindcast_df, hindcast_file_path, hindcast=True)
plt.show()

### Upload the prediction and hindcast dashboards (png files) to Hopsworks


In [None]:
dataset_api = project.get_dataset_api()
str_today = today.strftime("%Y-%m-%d")
if dataset_api.exists("Resources/airquality") == False:
    dataset_api.mkdir("Resources/airquality")
dataset_api.upload(pred_file_path, f"Resources/airquality/{city}_{street}_{str_today}", overwrite=True)
dataset_api.upload(hindcast_file_path, f"Resources/airquality/{city}_{street}_{str_today}", overwrite=True)

proj_url = project.get_url()
print(f"See images in Hopsworks here: {proj_url}/settings/fb/path/Resources/airquality")

---