# Batch Inference Pipeline

In this notebook, we will do the following tasks:
1. Create a batch inference pipeline using the pre-trained model.
2. Run the pipeline and get the predictions.


In [1]:
import hopsworks
import os
import json
import torch
import pandas as pd
import numpy as np
from dotenv import load_dotenv

# Load the.env file
load_dotenv()

# Get the environment variables
hopsworks_api_key = os.getenv("HOPSWORKS_API_KEY")


KeyboardInterrupt: 

In [None]:
project = hopsworks.login(api_key_value=str(hopsworks_api_key))
fs = project.get_feature_store()


2025-02-22 17:33:12,710 INFO: Initializing external client
2025-02-22 17:33:12,714 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-02-22 17:33:15,415 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1212597


### Get the model from model registry

In [None]:
mr = project.get_model_registry()

EVALUATION_METRIC="mean_squared_error"  
SORT_METRICS_BY="min" # your sorting criteria

# get best model based on custom metrics
best_model = mr.get_best_model("amazon_stock_price_prediction_model_torch",
                               EVALUATION_METRIC,
                               SORT_METRICS_BY)


In [None]:
# Load the pre-trained model
model_dir = "../models/amazon_stock_price_prediction_model_torch"
best_model.download(model_dir)
state_dict = torch.load(f"{model_dir}/model.pt", weights_only=True).to('cpu')


with open("../preprocessor/hyper_params.json", "r") as f:
    hyper_params = json.load(f)

Downloading model artifact (0 dirs, 1 files)... DONE

In [None]:
# Create the model
import torch
from torch import nn
class LSTMModel(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int, num_layers: int, device:str = 'cpu'):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.device = device
        
		# LSTM layer
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
	# forward pass
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(self.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(self.device)
        
        out, (_, _) = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out
        


In [None]:
model = LSTMModel(input_dim=hyper_params['input_size'], hidden_dim=hyper_params['hidden_size'], output_dim=hyper_params['forecast_steps'], num_layers=hyper_params['num_layers'], device='cpu').to('cpu')

# Load the trained model state_dict
model.load_state_dict(state_dict)


<All keys matched successfully>

### Get Feature view

In [None]:
amazon_fv = fs.get_feature_view("amazon_fv")





In [None]:
batch_data = amazon_fv.get_batch_data()


# get the last 24 days of data for window_size
sample  = batch_data.sort_values('datetime').drop('datetime', axis=1).tail(hyper_params['window_size'])

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.91s) 


In [None]:
import pandas as pd
batch_data['datetime'] = pd.to_datetime(batch_data['datetime'], utc=True)

batch_data = batch_data.sort_values('datetime')


In [None]:
model.eval()
with torch.inference_mode():
    outputs = model(torch.tensor(np.array(sample)).float().unsqueeze(0).to('cpu'))

In [None]:
outputs = outputs.reshape(-1, 1)

In [None]:
# Get the actual values for comparison
import yfinance as yf

# Get the last today data
actual_values = pd.DataFrame(yf.download('AMZN', period='1d', interval='1h', multi_level_index=False))['Close']

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed


In [None]:
import dataframe_image as dfi
import pandas as pd

time_stamps = batch_data.tail(hyper_params['forecast_steps'])['datetime'].values[::-1]

predicted_df = pd.DataFrame(outputs, columns=["predicted"])
predicted_df = predicted_df.set_index(time_stamps)
predicted_df.reset_index(inplace=True)
predicted_df.rename(columns={"index": "datetime"}, inplace=True)
predicted_df['id'] = [str(date) for date in predicted_df['datetime']]

In [None]:
predicted_df

Unnamed: 0,datetime,predicted,id
0,2025-02-21 20:30:00,146.063446,2025-02-21 20:30:00
1,2025-02-21 19:30:00,146.107147,2025-02-21 19:30:00
2,2025-02-21 18:30:00,146.148193,2025-02-21 18:30:00
3,2025-02-21 17:30:00,146.188782,2025-02-21 17:30:00
4,2025-02-21 16:30:00,146.235748,2025-02-21 16:30:00
5,2025-02-21 15:30:00,146.287613,2025-02-21 15:30:00
6,2025-02-21 14:30:00,146.327774,2025-02-21 14:30:00


In [None]:
import dataframe_image as dfi

# Create the directory for assets
os.makedirs("../assets", exist_ok=True)

# Yesterdays predu
dfi.export(predicted_df.set_index('datetime').drop('id', axis=1), "../assets/todays_predictions.png", table_conversion='matplotlib')

In [None]:
#  Create feature group for storing predictions
amazon_stock_predictions_fg = fs.get_or_create_feature_group("amazon_stock_predictions", version=1, description="Predicted prices for Amazon stocks", online_enabled=True, primary_key=["id"], event_time=["datetime"])


In [None]:
# Create a feature store and store predictions
amazon_stock_predictions_fg.insert(predicted_df)

Uploading Dataframe: 100.00% |██████████| Rows 7/7 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: amazon_stock_predictions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1212597/jobs/named/amazon_stock_predictions_1_offline_fg_materialization/executions


(Job('amazon_stock_predictions_1_offline_fg_materialization', 'SPARK'), None)

In [None]:
# Create a list of dictionaries with column names and descriptions
column_descriptions = [
    {'name': 'datetime', 'description': 'The date and time of the stock data'},
    {'name': 'predicted', 'description': 'The closing price of the stock'},
    {'name': 'id', 'description': 'primary key'}
]

for desc in column_descriptions:
    amazon_stock_predictions_fg.update_feature_description(desc['name'], desc['description'])

##### Plot Yesterdays Predictions and Actual Values

In [None]:
df = amazon_stock_predictions_fg.read()
df

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.79s) 


Unnamed: 0,datetime,predicted,id
0,2025-02-21 17:30:00+00:00,146.188782,2025-02-21 17:30:00
1,2025-02-21 16:30:00+00:00,146.235748,2025-02-21 16:30:00
2,2025-02-21 15:30:00+00:00,146.287613,2025-02-21 15:30:00
3,2025-02-21 14:30:00+00:00,146.327774,2025-02-21 14:30:00
4,2025-02-21 18:30:00+00:00,146.148193,2025-02-21 18:30:00
5,2025-02-21 19:30:00+00:00,146.107147,2025-02-21 19:30:00
6,2025-02-21 20:30:00+00:00,146.063446,2025-02-21 20:30:00


In [None]:
# predictions read last 7 values

predicted_df = df.tail(7)
actual_df = pd.DataFrame(yf.download('AMZN', period='1d', interval='1h', multi_level_index=False))['Close']
predicted_df['actual'] = actual_df.values

[*********************100%***********************]  1 of 1 completed

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy






In [None]:
predicted_df.drop('datetime', axis=1, inplace=True)
predicted_df = predicted_df.rename(columns={'predicted': 'Predicted', 'actual': 'Actual', 'id': 'datetime'})

In [None]:
import dataframe_image as dfi

dfi.export(predicted_df.set_index('datetime'), "../assets/yesterdays_predictions.png", table_conversion='matplotlib')