# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"> **Air Quality** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 04: Batch Inference</span>

## üóíÔ∏è This notebook is divided into the following sections:

1. Download model and batch inference data
2. Make predictions, generate PNG for forecast
3. Store predictions in a monitoring feature group adn generate PNG for hindcast

## <span style='color:#ff5f27'> üìù Imports

In [None]:
import sys
from pathlib import Path
import os

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml


if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Read the API keys and configuration variables from the file <root_dir>/.env
from mlfs import config
if os.path.exists(f"{root_dir}/.env"):
    settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

In [None]:
import datetime
import pandas as pd
from xgboost import XGBRegressor
import hopsworks
import json
from mlfs.airquality import util

## <span style="color:#ff5f27;"> üì° Connect to Hopsworks Feature Store </span>

In [None]:
project = hopsworks.login()
fs = project.get_feature_store() 

secrets = hopsworks.get_secrets_api()
if len(sys.argv) > 2:

    csv_file = sys.argv[1]
    # If csv_file is just a filename, look for it in the data/ directory
    if not os.path.isabs(csv_file) and not os.path.exists(csv_file):
        # Try data/ directory relative to root_dir
        data_path = os.path.join(root_dir, "data", csv_file)
        if os.path.exists(data_path):
            csv_file = data_path
            print(f"Found CSV file in data/ directory: {csv_file}")
        else:
            # Try current directory with data/ prefix
            data_path = os.path.join("data", csv_file)
            if os.path.exists(data_path):
                csv_file = data_path
                print(f"Found CSV file in data/ directory: {csv_file}")

    print(f"csv_file {csv_file}")
    # aqicn_url = sys.argv[2] if len(sys.argv) > 2 else None
    country = sys.argv[3] if len(sys.argv) > 3 else None
    city = sys.argv[4] if len(sys.argv) > 4 else None
    street = sys.argv[5] if len(sys.argv) > 5 else None
    # latitude = sys.argv[6] if len(sys.argv) > 6 else None
    # longitude = sys.argv[7] if len(sys.argv) > 7 else None
    days_ago = int(sys.argv[8]) if len(sys.argv) > 8 else 0
    print(f"Using command-line arguments:")
    print(f"  CSV      = {csv_file}")
    # print(f"  URL      = {aqicn_url}")
    print(f"  COUNTRY  = {country}")
    print(f"  CITY     = {city}")
    print(f"  STREET   = {street}")
    # print(f"  LATITUDE = {latitude}")
    # print(f"  LONGITUDE= {longitude}")
else:
    location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
    location = json.loads(location_str)
    country=location['country']
    city=location['city']
    street=location['street']
today = datetime.date.today() - datetime.timedelta(days=days_ago)
tomorrow = today + datetime.timedelta(days = 1)
today

## <span style="color:#ff5f27;">ü™ù Download the model from Model Registry</span>

In [None]:
mr = project.get_model_registry()

retrieved_model = mr.get_model(
    name="air_quality_xgboost_model",
    version=1,
)

fv = retrieved_model.get_feature_view()

# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()

In [None]:
# Loading the XGBoost regressor model and label encoder from the saved model directory
# retrieved_xgboost_model = joblib.load(saved_model_dir + "/xgboost_regressor.pkl")
retrieved_xgboost_model = XGBRegressor()

retrieved_xgboost_model.load_model(saved_model_dir + "/model.json")

# Displaying the retrieved XGBoost regressor model
retrieved_xgboost_model

## <span style="color:#ff5f27;">‚ú® Get Weather Forecast Features with Feature View   </span>



In [None]:
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)
batch_data = weather_fg.filter(weather_fg.date >= today).read()
batch_data

### <span style="color:#ff5f27;">ü§ñ Making the predictions</span>

In [None]:
from collections import deque   
import numpy as np

columns = weather_fg.read().columns.to_list()
if 'pm25' in columns:
    columns.remove('pm25')
if 'date' in columns:
    columns.remove('date')
if 'city' in columns:
    columns.remove('city')

lag_windows = [1, 2, 3, 7, 14, 21, 30]

forecast_rows = batch_data.copy().reset_index(drop=True)
air_quality_fg = fs.get_feature_group(name='air_quality', version=1)
air_quality_df = air_quality_fg.read().sort_values('date')
pm25_history = deque(air_quality_df['pm25'].dropna().astype(float).tolist(), maxlen=90)

def compute_pct_change(history: deque, window: int) -> float:
    if len(history) <= window:
        return np.nan
    latest = history[-1]
    previous = history[-(window + 1)]
    if previous in (0, None):
        return np.nan
    return float((latest - previous) / previous)


def compute_std(history: deque, window: int) -> float:
    if not history:
        return np.nan
    window_values = list(history)[-min(len(history), window):]
    if len(window_values) <= 1:
        return 0.0
    return float(np.std(window_values, ddof=0))

for idx in range(len(forecast_rows)):
    for window in lag_windows:
        forecast_rows.at[idx, f'pm25_change_{window}d'] = compute_pct_change(pm25_history, window)
        forecast_rows.at[idx, f'pm25_std_{window}d'] = compute_std(pm25_history, window)

    features = forecast_rows.loc[[idx], columns]
    # make prediction
    prediction = np.clip(float(retrieved_xgboost_model.predict(features)[0]), 0, None)
    # store prediction
    forecast_rows.at[idx, 'predicted_pm25'] = prediction
    # add prediction to history for next iteration
    pm25_history.append(prediction)

batch_data = forecast_rows
for col in batch_data.columns:
    if col not in ['city', 'date']:
        if pd.api.types.is_numeric_dtype(batch_data[col]):
            batch_data[col] = batch_data[col].astype('float32')
batch_data

In [None]:
# batch_data['predicted_pm25'] = retrieved_xgboost_model.predict(
#     batch_data[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']])
# batch_data

In [None]:
batch_data.info()

### <span style="color:#ff5f27;">ü§ñ Saving the predictions (for monitoring) to a Feature Group</span>

In [None]:
batch_data['street'] = street
batch_data['city'] = city
batch_data['country'] = country
# Fill in the number of days before the date on which you made the forecast (base_date)
batch_data['days_before_forecast_day'] = range(1, len(batch_data)+1)
batch_data = batch_data.sort_values(by=['date'])
batch_data

In [None]:
batch_data.info()

### Create Forecast Graph
Draw a graph of the predictions with dates as a PNG and save it to the github repo
Show it on github pages

In [None]:

pred_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_forecast.png"
plt = util.plot_air_quality_forecast(city, street, batch_data, pred_file_path)

plt.show()

In [None]:
# Get or create feature group
monitor_fg = fs.get_or_create_feature_group(
    name='aq_predictions',
    description='Air Quality prediction monitoring',
    version=1,
    primary_key=['city','street','date','days_before_forecast_day'],
    event_time="date"
)

In [None]:
monitor_fg.insert(batch_data, wait=True)

In [None]:
# We will create a hindcast chart for  only the forecasts made 1 day beforehand
monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read()
monitoring_df

In [None]:
air_quality_fg = fs.get_feature_group(name='air_quality', version=1)
air_quality_df = air_quality_fg.read()
air_quality_df

In [None]:
outcome_df = air_quality_df[['date', 'pm25']]
preds_df =  monitoring_df[['date', 'predicted_pm25']]## 1. Localize the 'date' column in place by assignment
try:   
    preds_df['date'] = preds_df['date'].dt.tz_localize('UTC')
except Exception as e:
    print(f"Warning: could not localize weather_df['date']: {e}")

## 2. Localize the 'date' column in the second DataFrame by assignment
try:
    outcome_df['date'] = outcome_df['date'].dt.tz_localize('UTC')
except Exception as e:
    print(f"Warning: could not localize df_feat['date']: {e}")



hindcast_df = pd.merge(preds_df, outcome_df, on="date")
hindcast_df = hindcast_df.sort_values(by=['date'])

# If there are no outcomes for predictions yet, generate some predictions/outcomes from existing data
if len(hindcast_df) == 0:
    hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model, columns)
hindcast_df

### Plot the Hindcast comparing predicted with forecasted values (1-day prior forecast)

__This graph will be empty to begin with - this is normal.__

After a few days of predictions and observations, you will get data points in this graph.

In [None]:
hindcast_file_path = f"{root_dir}/docs/air-quality/assets/img/pm25_hindcast_1day.png"
plt = util.plot_air_quality_forecast(city, street, hindcast_df, hindcast_file_path, hindcast=True)
plt.show()

### Upload the prediction and hindcast dashboards (png files) to Hopsworks


In [None]:
dataset_api = project.get_dataset_api()
str_today = today.strftime("%Y-%m-%d")
if dataset_api.exists("Resources/airquality") == False:
    dataset_api.mkdir("Resources/airquality")
dataset_api.upload(pred_file_path, f"Resources/airquality/{city}_{street}_{str_today}", overwrite=True)
dataset_api.upload(hindcast_file_path, f"Resources/airquality/{city}_{street}_{str_today}", overwrite=True)

proj_url = project.get_url()
print(f"See images in Hopsworks here: {proj_url}/settings/fb/path/Resources/airquality")

---