In [1]:
%reload_ext autoreload
%autoreload 2

In [136]:
import pandas as pd
import hopsworks
import src.config as config

from pathlib import Path
from datetime import datetime, timedelta
from typing import Tuple

from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline

from src.paths import PARENT_DIR
from src.data import prepare_feature_store_data_for_training

In [None]:
"""
TODO: Wrap these into a function, get_feature_view()
"""

# connect to project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY,
)

# connect to feature store
feature_store = project.get_feature_store()

# connect to feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)

feature_view = feature_store.get_feature_view(
    name=config.FEATURE_VIEW_NAME, 
    version=config.FEATURE_VIEW_VERSION
)

2025-03-19 11:26:50,605 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-19 11:26:50,609 INFO: Initializing external client
2025-03-19 11:26:50,609 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-19 11:26:51,670 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1051798


In [None]:
def load_batch_of_features_for_inference(current_date: str) -> pd.DataFrame:
    """
    Loads a batch of features from the Hopsworks feature store.
    
    Args:
        current_date: 
    """
    
    # Adding some padding so that we don't lose any data
    to_date = current_date + timedelta(days=2)
    from_date = current_date - timedelta(days=config.DAYS_HISTORICAL + 14)

    data = feature_view.get_batch_data(
        start_time=from_date, 
        end_time=to_date,
        )

    return prepare_feature_store_data_for_training(data)


def load_model_from_registry(
    hopsworks_model_dir: str = "models", 
    pipeline_name: str = "preprocessing_pipeline",
    model_name: str = "lgbm"
    ) -> Tuple[BaseEstimator, Pipeline]:
    """
    Loads saved model files from the Hopsworks model registry. 
    
    Args:
        hopsworks_model_dir: name of folder holding the model artifact
        pipeline_name: name of pickled preprocessing pipeline
        model_name: name of pickled trained model
        
    Returns:
        (model, preprocessing_pipeline)
    """

    import shutil
    import os
    import joblib

    model_registry = project.get_model_registry()
    model_files = model_registry.get_model(
    name=config.MODEL_NAME,
    version=config.MODEL_VERSION,
    )

    model_dir = model_files.download()

    filepath = Path(model_dir) / f"{hopsworks_model_dir}.zip"
    extract_dir = PARENT_DIR / "downloaded_model_bundle"
    if not extract_dir.exists():
        os.mkdir(extract_dir)

    shutil.unpack_archive(filepath, extract_dir)

    preprocessing_pipeline = joblib.load(extract_dir / f"{pipeline_name}.pkl")
    model = joblib.load(extract_dir / f"{model_name}.pkl")
    
    return model, preprocessing_pipeline


def get_model_predictions(model: BaseEstimator, 
                          preprocessing_pipeline: Pipeline, 
                          X: pd.DataFrame, 
                          features_end: str) -> pd.DataFrame:
    """
    Uses {model} to make predictions of daily demand for the day after {features_end}
    using features {X}.

    Args:
        model: a fitted SKLearn model
        preprocessing pipeline: the pipeline used to train {model}
        X: the features for inference
        features_end: the date corresponding to the day before the prediction date,
            in format, e.g., '2025-03-15'

    Returns:
        A dataframe holding the predictions.

        Columns:
            ba_code: str
            predicted_demand: float64
            datetime: datetime64
    """
    features_end = pd.Timestamp(features_end)
    features_start = features_end - pd.offsets.Day(config.DAYS_HISTORICAL)

    # Filter to appropriate date range
    inference_data = X.loc[
        (X["datetime"] <= features_end) & (X["datetime"] >= features_start)
    ].copy()

    # Transform data with fitted pipeline
    inference_data_t = preprocessing_pipeline.transform(inference_data)

    predictions = model.predict(inference_data_t)

    ba_codes = inference_data['ba_code'].unique()

    predictions_df = pd.DataFrame(
        {"ba_code": ba_codes, "predicted_demand": predictions}
    )
    
    # Predictions are for the day after {features_end} 
    predictions_df["datetime"] = features_end + pd.offsets.Day(1)
    
    return predictions_df


def load_predictions_from_store():
    # Loads a set of predictions from the feature store to be passed to the UI
    pass

In [139]:
current_date = datetime.now().date()

data = load_batch_of_features_for_inference(current_date)
display(data.head())
display(data.tail())

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.67s) 


Unnamed: 0,datetime,demand,ba_code
1878,2024-03-06,47943.0,AECI
17470,2024-03-07,51039.0,AECI
4528,2024-03-08,52012.0,AECI
7874,2024-03-09,56646.0,AECI
10260,2024-03-10,54399.0,AECI


Unnamed: 0,datetime,demand,ba_code
24857,2025-03-12,1849.0,WAUW
24961,2025-03-13,1989.0,WAUW
25002,2025-03-14,1988.0,WAUW
25100,2025-03-15,2053.0,WAUW
25157,2025-03-16,1965.0,WAUW


In [140]:
model, preprocessing_pipeline = load_model_from_registry()

Downloading model artifact (0 dirs, 1 files)... DONE

In [142]:
predictions = get_model_predictions(
    model=model, 
    preprocessing_pipeline=preprocessing_pipeline, 
    X=data, 
    features_end="2025-03-15",
    )
predictions.head()

Unnamed: 0,ba_code,predicted_demand,datetime
0,AECI,56746.982152,2025-03-16
1,AVA,33245.561408,2025-03-16
2,AZPS,63720.183805,2025-03-16
3,BANC,40158.109385,2025-03-16
4,BPAT,138158.303792,2025-03-16
