# Inference Pipeline
Predicts delays for the next hour given delays from the last three 30 minutes

### Imports

In [1]:
from datetime import date
import hopsworks
import os
import pandas as pd
from pathlib import Path
import sys
import joblib

root_dir = Path().absolute()
# Strip subdirectories if the notebook started in any
if root_dir.parts[-1:] == ('pipeline',):
    root_dir = Path(*root_dir.parts[:-1])
if root_dir.parts[-1:] == ('src',):
    root_dir = Path(*root_dir.parts[:-1])
root_dir = str(root_dir) 

os.chdir(root_dir)
print(f"Root dir: {Path.cwd()}")

from src.data_utils.plots import *

Root dir: /Users/serkan/ID2223-project


### Connect to Hopsworks

In [2]:
# Enter the project name if the project in Hopsworks is not your main project
#project_name = None
project_name = 'metro_delay_prediction'
if project_name:
    project = hopsworks.login(project=f'{project_name}')
else:
    project = hopsworks.login()

fs = project.get_feature_store()

2026-01-03 16:40:39,505 INFO: Initializing external client
2026-01-03 16:40:39,505 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2026-01-03 16:40:41,533 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1330333


### Retrieve Model from Model Registry and load model + artifacts

In [3]:
mr = project.get_model_registry()
retrieved_model = mr.get_model(name='xgb_regressor', version=3)

model_dir = retrieved_model.download()

# fv = retrieved_model.get_feature_view()
# saved_dir = retrieved_model.download()
xgb_model = joblib.load(f'{model_dir}/model.joblib')
line_encoder = joblib.load(f'{model_dir}/line_encoder.pkl')
day_encoder = joblib.load(f'{model_dir}/day_encoder.pkl')

Downloading: 0.000%|          | 0/229483 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 1 files)... 

Downloading: 0.000%|          | 0/517 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 2 files)... 

Downloading: 0.000%|          | 0/579 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 3 files)... DONE

### Fetch features and take most recent data

In [4]:
features_fg = fs.get_feature_group(
    name="delay_features_fg",
    version=1
)

features_df = features_fg.read(online=True)

In [5]:
latest_df = features_df.sort_values("timestamp").groupby("line").tail(1).reset_index(drop=True)
print(latest_df)

            timestamp        timestamp_str          line   delay_60  delay_30  \
0 2026-01-03 16:39:00  2026-01-03T16:39:00   Röda linjen  -7.040000     -2.60   
1 2026-01-03 16:39:00  2026-01-03T16:39:00    Blå linjen -23.583333    -24.75   
2 2026-01-03 16:39:00  2026-01-03T16:39:00  Gröna linjen  67.483871     85.00   

   delay_current  
0      -2.520000  
1     -18.666667  
2      84.000000  


### Encode non-numerical features and set up data for inference

In [6]:
latest_df["day"] = latest_df["timestamp"].dt.day_name()
latest_df["day_encoded"] = day_encoder.transform(latest_df["day"])
latest_df["line_encoded"] = line_encoder.transform(latest_df["line"])
print(latest_df.head())





            timestamp        timestamp_str          line   delay_60  delay_30  \
0 2026-01-03 16:39:00  2026-01-03T16:39:00   Röda linjen  -7.040000     -2.60   
1 2026-01-03 16:39:00  2026-01-03T16:39:00    Blå linjen -23.583333    -24.75   
2 2026-01-03 16:39:00  2026-01-03T16:39:00  Gröna linjen  67.483871     85.00   

   delay_current       day  day_encoded  line_encoded  
0      -2.520000  Saturday            2             2  
1     -18.666667  Saturday            2             0  
2      84.000000  Saturday            2             1  


In [7]:
inference_df = latest_df[["line_encoded", "day_encoded", "delay_60", "delay_30", "delay_current"]].copy()
X = inference_df.dropna(subset=["delay_60", "delay_30", "delay_current"]).reset_index(drop=True)
print(X)

   line_encoded  day_encoded   delay_60  delay_30  delay_current
0             2            2  -7.040000     -2.60      -2.520000
1             0            2 -23.583333    -24.75     -18.666667
2             1            2  67.483871     85.00      84.000000


### Make predictions

In [8]:
preds = xgb_model.predict(X)
print(preds)

[-13.137091 -29.50791   41.85341 ]


In [9]:
inference_df["line"] = line_encoder.inverse_transform(inference_df["line_encoded"])
inference_df["prediction"] = preds
inference_df["timestamp"] = latest_df["timestamp"]
inference_df["timestamp_pk"] = inference_df["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S")
inference_df["delay_hind"] = latest_df["delay_current"]
result = inference_df[["line", "timestamp", "timestamp_pk", "prediction", "delay_hind"]]
print(result)



           line           timestamp         timestamp_pk  prediction  \
0   Röda linjen 2026-01-03 16:39:00  2026-01-03T16:39:00  -13.137091   
1    Blå linjen 2026-01-03 16:39:00  2026-01-03T16:39:00  -29.507910   
2  Gröna linjen 2026-01-03 16:39:00  2026-01-03T16:39:00   41.853409   

   delay_hind  
0   -2.520000  
1  -18.666667  
2   84.000000  


### Upload predictions to hopsworks

In [10]:
monitor_fg = fs.get_or_create_feature_group(
    name='sl_prediction',
    description='SL metro lines prediction monitoring',
    version=1,
    primary_key=['line', 'timestamp_pk'],
    event_time='timestamp',
)

monitor_fg.insert(result, wait=True)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1330333/fs/1320008/fg/1897803


Uploading Dataframe: 100.00% |██████████| Rows 3/3 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: sl_prediction_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1330333/jobs/named/sl_prediction_1_offline_fg_materialization/executions
2026-01-03 16:41:10,314 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-03 16:41:13,560 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-03 16:42:44,070 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-03 16:42:44,265 INFO: Waiting for log aggregation to finish.
2026-01-03 16:42:56,438 INFO: Execution finished successfully.


(Job('sl_prediction_1_offline_fg_materialization', 'SPARK'), None)

### Get all predictions and update plots

In [None]:
predictions_fg = fs.get_feature_group(
    name="sl_prediction",
    version=1
)
predictions_df = predictions_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.70s) 


In [12]:
file_path = Path('data/')
plot_metro_delay_predictions(predictions_df, file_path, hindcast=True)


