# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"> **Air Quality** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 04: Batch Inference</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Download model and batch inference data
2. Make predictions, generate PNG for forecast
3. Store predictions in a monitoring feature group adn generate PNG for hindcast

## <span style='color:#ff5f27'> üìù Imports

In [None]:
import sys
from pathlib import Path
import os

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml


if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Read the API keys and configuration variables from the file <root_dir>/.env
from mlfs import config
if os.path.exists(f"{root_dir}/.env"):
    settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

In [None]:
import datetime
import pandas as pd
from xgboost import XGBRegressor
import hopsworks
import json
from mlfs.airquality import util

In [None]:
today = datetime.datetime.now() - datetime.timedelta(0)
tomorrow = today + datetime.timedelta(days = 1)
today

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [None]:
project = hopsworks.login()
fs = project.get_feature_store() 

secrets = hopsworks.get_secrets_api()

In [None]:
streets = ["gulangyu", "hongwen"]
locations = []
for s in streets:
    # This line will fail if you have not registered the AQICN_API_KEY as a secret in Hopsworks
    location_str = secrets.get_secret(f"SENSOR_LOCATION_JSON_{s.upper()}").value
    location = json.loads(location_str)
    locations.append(location)

    country=location['country']
    city=location['city']
    street=location['street']
    aqicn_url=location['aqicn_url']
    latitude=location['latitude']
    longitude=location['longitude']

locations

## <span style="color:#ff5f27;">ü™ù Download the model from Model Registry</span>

In [None]:
mr = project.get_model_registry()


retrieved_model = mr.get_model(
    name="air_quality_xgboost_model",
    version=4,
)

fv = retrieved_model.get_feature_view()

# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()

In [None]:
# Loading the XGBoost regressor model and label encoder from the saved model directory
# retrieved_xgboost_model = joblib.load(saved_model_dir + "/xgboost_regressor.pkl")
retrieved_xgboost_model = XGBRegressor()

retrieved_xgboost_model.load_model(saved_model_dir + "/model.json")

# Displaying the retrieved XGBoost regressor model
retrieved_xgboost_model

## <span style="color:#ff5f27;">‚ú® Get Weather Forecast Features with Feature View   </span>



In [None]:
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=2,
)

# today does not start from 00:00:00, so batch data contains 6 days from tomorrow
batch_data = weather_fg.filter(weather_fg.date >= today).read()
batch_data.sort_values(by=['date'],inplace=True)
batch_data


### <span style="color:#ff5f27;">ü§ñ Making the predictions</span>

In [None]:
aq_df = air_quality_fg.read()
aq_df.sort_values(by=['street','date'],inplace=True)
aq_df["date"] = pd.to_datetime(aq_df["date"])

weather_fg_df = weather_fg.filter(weather_fg.date <= aq_df['date'].iloc[-1]).read()
weather_fg_df.sort_values(by=['date'],inplace=True)

In [None]:
aq_df

In [None]:
aq_df['date'].iloc[-1]

In [None]:
street_cols = [c for c in aq_df.columns if c.startswith("street_")]

weather_cols = [
    "temperature_2m_mean",
    "precipitation_sum",
    "wind_speed_10m_max",
    "wind_direction_10m_dominant",
]

FEATURE_COLUMNS = ["pm25_rolling"] + street_cols + weather_cols

max_date = batch_data['date'].max()

for street_name in aq_df["street"].unique():
    while aq_df.loc[aq_df["street"] == street_name, "date"].max() < max_date:
        street_hist = aq_df[aq_df["street"] == street_name].sort_values("date")
        last_row = street_hist.iloc[-1]

        df = weather_fg_df[weather_cols].iloc[[-1]].copy()

        # add pm25_rolling feature from this street's last row
        df.insert(0, "pm25_rolling", last_row["pm25_rolling"])

        for col in street_cols:
            df[col] = last_row[col]

        df = df[FEATURE_COLUMNS]

        # rolling target feature: mean of last 3 pm25 for THIS street
        pm25_rolling_next = street_hist["pm25"].iloc[-3:].mean()

        new_row = {
            "date": last_row["date"] + datetime.timedelta(days=1),
            "pm25": float(retrieved_xgboost_model.predict(df)[0]),
            "pm25_rolling": pm25_rolling_next,
            "country": country,
            "city": city,
            "street": street_name,
        }

        for col in street_cols:
            new_row[col] = last_row[col]

        aq_df = pd.concat([aq_df, pd.DataFrame([new_row])], ignore_index=True)

aq_df.sort_values(by=["street", "date"], inplace=True)
aq_df

In [None]:
aq_df[aq_df["street"] == "gulangyu"][-15:]

In [None]:
aq_df[aq_df["street"] == "hongwen"][-15:]

In [None]:
start_date = batch_data['date'].min()

predicted_pm25s = aq_df[aq_df['date'] >= start_date][["date","pm25", "city", "country", "street"]]

predicted_pm25s

In [None]:
streets = predicted_pm25s['street'].unique()
streets_df = pd.DataFrame({'street': streets})

batch_expanded = batch_data.merge(streets_df, how='cross')

predicted_pm25s['date'] = pd.to_datetime(predicted_pm25s['date'], utc=True)
batch_expanded['date'] = pd.to_datetime(batch_expanded['date'], utc=True)

batch_with_pm25 = batch_expanded.merge(
    predicted_pm25s,
    on=['date', 'street'],
    how='left'
)

batch_with_pm25 = batch_with_pm25.drop(columns=['city_y'])
batch_with_pm25 = batch_with_pm25.rename(columns={'city_x': 'city'})

batch_with_pm25.rename(columns={'pm25': 'predicted_pm25'}, inplace=True)
batch_with_pm25["predicted_pm25"] = batch_with_pm25["predicted_pm25"].astype("float32")
batch_with_pm25

In [None]:
batch_data = batch_with_pm25
batch_data.info()

### <span style="color:#ff5f27;">ü§ñ Saving the predictions (for monitoring) to a Feature Group</span>

In [None]:
base_date = start_date
batch_data['days_before_forecast_day'] = (
    (batch_data['date'] - base_date).dt.days + 1
)
batch_data

In [None]:
batch_data.info()

### Create Forecast Graph
Draw a graph of the predictions with dates as a PNG and save it to the github repo
Show it on github pages

In [None]:
batch_data[batch_data['street'] == 'gulangyu']
# batch_data[batch_data['street'] == 'hongwen']


In [None]:
for s in streets:
    pred_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_forecast_{s}.png"
    plt = util.plot_air_quality_forecast(city, s, batch_data, pred_file_path)
    plt.show()

In [None]:
# Get or create feature group
monitor_fg = fs.get_or_create_feature_group(
    name='aq_predictions',
    description='Air Quality prediction monitoring',
    version=3,
    primary_key=['city','street','date','days_before_forecast_day'],
    event_time="date"
)

In [None]:
monitor_fg.insert(batch_data, wait=True)

In [None]:
# We will create a hindcast chart for only the forecasts made 1 day beforehand
monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read()
monitoring_df

In [None]:
air_quality_fg = fs.get_feature_group(name='air_quality', version=2)
air_quality_df = air_quality_fg.read()
air_quality_df

In [None]:
outcome_df = air_quality_df[['date', 'pm25', "street"]]
preds_df =  monitoring_df[['date', 'predicted_pm25', "street"]]

hindcast_df = pd.merge(preds_df, outcome_df, on=["date", "street"])
hindcast_df = hindcast_df.sort_values(by=['date'])

# If there are no outcomes for predictions yet, generate some predictions/outcomes from existing data
if len(hindcast_df) == 0:
    hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model)
hindcast_df

### Plot the Hindcast comparing predicted with forecasted values (1-day prior forecast)

__This graph will be empty to begin with - this is normal.__

After a few days of predictions and observations, you will get data points in this graph.

In [None]:
hindcast_df

In [None]:
streets

In [None]:
for s in streets:
    hindcast_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_hindcast_1day_{s}.png"
    plt = util.plot_air_quality_forecast(city, s, hindcast_df, hindcast_file_path, hindcast=True)
    plt.show()

### Upload the prediction and hindcast dashboards (png files) to Hopsworks


In [None]:
dataset_api = project.get_dataset_api()
str_today = today.strftime("%Y-%m-%d")
if dataset_api.exists("Resources/airquality") == False:
    dataset_api.mkdir("Resources/airquality")
for s in streets:
    dataset_api.upload(pred_file_path, f"Resources/airquality/{city}_{s}_{str_today}", overwrite=True)
    dataset_api.upload(hindcast_file_path, f"Resources/airquality/{city}_{s}_{str_today}", overwrite=True)

proj_url = project.get_url()
print(f"See images in Hopsworks here: {proj_url}/settings/fb/path/Resources/airquality")

---