In [14]:
import hopsworks
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import os
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
import joblib
import os
from pathlib import Path
from sklearn.metrics import (
    precision_score,
    recall_score,
    accuracy_score,
    f1_score,
    mean_absolute_error
)
from datetime import datetime, timedelta

In [2]:
load_dotenv()
project_name = os.getenv("HOPSWORKS_PROJECT")
api_key = os.getenv("HOPSWORKS_API_KEY")
test_start_string = os.getenv("TEST_START_DATE")
test_start_date = pd.to_datetime(test_start_string).date()

project = hopsworks.login(project=project_name, api_key_value=api_key)
fs = project.get_feature_store()

model_name = "occupancy_xgboost_model_new"
model_version = 2
today = pd.Timestamp.utcnow().floor("H")

2026-01-07 10:53:48,607 INFO: Initializing external client
2026-01-07 10:53:48,607 INFO: Base URL: https://c.app.hopsworks.ai:443
2026-01-07 10:53:50,127 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271989


In [3]:
# Features used for training
VEHICLE_FEATURES = [
    "trip_id",
    "vehicle_id",
    "max_speed",
    "n_positions",
    "lat_mean",
    "lon_mean",
    "hour",
    "day_of_week",
]

WEATHER_FEATURES = [
    "temperature_2m",
    "precipitation",
    "cloud_cover",
    "wind_speed_10m",
    "snowfall",
    "rain"
]

HOLIDAY_FEATURES = [
    "is_work_free",
    "is_red_day",
    "is_day_before_holiday",
]

# Target variable
TARGET = "occupancy_mode"

In [4]:
vehicle_trip_agg_fg = fs.get_feature_group(
    name="vehicle_trip_agg_fg",
    version=2
)

weather_fg = fs.get_feature_group("weather_hourly_fg", version=1)
weather_df = weather_fg.read()
weather_df["datetime_hour"] = pd.to_datetime(weather_df["timestamp"]).dt.floor("H")

holiday_fg = fs.get_feature_group("swedish_holidays_fg", version=1)
holiday_df = holiday_fg.read()
holiday_df["datetime_hour"] = pd.to_datetime(holiday_df["date"]).dt.floor("H")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.97s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.72s) 


In [5]:
print(len(weather_df))
print(len(holiday_df))

1488
730


In [6]:
monitor_df = vehicle_trip_agg_fg.read()
print(len(monitor_df))
monitor_df["window_start"] = pd.to_datetime(monitor_df["window_start"])
monitor_df["datetime_hour"] = monitor_df["window_start"].dt.floor("H")
monitor_df["_date"] = monitor_df["window_start"].dt.date

# Merge weather by datetime_hour
monitor_df = monitor_df.merge(
    weather_df[["datetime_hour"] + WEATHER_FEATURES],
    on="datetime_hour",
    how="left"
)

print(len(monitor_df))

# Merge holidays by day
holiday_df["date"] = pd.to_datetime(holiday_df["date"]).dt.date
monitor_df = monitor_df.merge(
    holiday_df[["date"] + HOLIDAY_FEATURES],
    left_on="_date",
    right_on="date",
    how="left"
)
print(len(monitor_df))

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (114.67s) 
6442370
6442370
6442370


In [7]:
# Download the model from the model registry

mr = project.get_model_registry()

retrieved_model = mr.get_model(
    name=model_name,
    version=model_version,
)

fv = retrieved_model.get_feature_view()

# Download the saved model artifacts to a local directory
saved_model_dir = retrieved_model.download()

2026-01-07 10:57:01,077 INFO: Initializing for batch retrieval of feature vectors


Downloading: 0.000%|          | 0/1747261 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 1 files)... 

Downloading: 0.000%|          | 0/43482 elapsed<00:00 remaining<?

Downloading model artifact (0 dirs, 2 files)... DONE

In [8]:
print(fv.name)

occupancy_fv


In [9]:
model = XGBClassifier()
model.load_model(Path(saved_model_dir) / "model.json")

In [17]:
static_trip_info_fg = fs.get_feature_group(
    name="static_trip_info_fg",
    version=1
)

static_df = static_trip_info_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.19s) 


In [21]:
# Linköping bus station coordinates as default monitoring point: https://traveling.com/sv/buss/station/linkoeping-busstation 
DEFAULT_LAT = 58.419274
DEFAULT_LON = 15.619256

In [47]:
import requests
import time

OPENMETEO_FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
WEATHER_VARIABLES = [
    "temperature_2m",
    "precipitation",
    "cloud_cover",
    "wind_speed_10m",
    "rain",
    "snowfall"
]

MAX_RETRIES = 5
INITIAL_DELAY = 1  # seconds

def get_weather_for_prediction(lat: float, lon: float, forecast_days: int) -> dict:
    """
    Get weather forecast for a specific location and time with exponential backoff.

    Args:
        lat: Latitude
        lon: Longitude
        target_datetime: Target datetime for prediction

    Returns:
        Dict with weather features for the model
    """
    # now = datetime.now()
    # days_ahead = (target_datetime.date() - now.date()).days

    # if days_ahead > 16:
    #     print(f"Warning: Date too far in future, using default weather")
    #     return _default_weather()

    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": ",".join(WEATHER_VARIABLES),
        "timezone": "Europe/Stockholm",
        "forecast_days": forecast_days,
    }

    delay = INITIAL_DELAY
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            response = requests.get(OPENMETEO_FORECAST_URL, params=params, timeout=30)
            if response.status_code == 200:
                data = response.json()
                hourly = data.get("hourly", {})
                if not hourly:
                    return _default_weather()

                # Match target hour
                times = hourly.get("time", [])
                try:
                    idx = times.index(target_str)
                except ValueError:
                    target_hour = target_datetime.hour
                    target_date = target_datetime.strftime("%Y-%m-%d")
                    for i, t in enumerate(times):
                        if t.startswith(target_date) and f"T{target_hour:02d}:" in t:
                            idx = i
                            break
                    else:
                        print(f"Could not find matching hour for {target_datetime}")
                        return _default_weather()

                return {
                    "temperature_2m": hourly.get("temperature_2m", [None])[idx] or 10.0,
                    "precipitation": hourly.get("precipitation", [None])[idx] or 0.0,
                    "cloud_cover": hourly.get("cloud_cover", [None])[idx] or 50.0,
                    "wind_speed_10m": hourly.get("wind_speed_10m", [None])[idx] or 5.0,
                    "rain": hourly.get("rain", [None])[idx] or 0.0,
                    "snowfall": hourly.get("snowfall", [None])[idx] or 0.0,
                }

            else:
                print(f"Weather API returned status {response.status_code}. Retrying in {delay}s...")
                time.sleep(delay)
                delay *= 2  # exponential backoff

        except requests.RequestException as e:
            print(f"Weather API request failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)
            delay *= 2

    print(f"Max retries reached for {target_datetime}, returning default weather")
    return _default_weather()


def fetch_weather_all_hours(lat, lon, forecast_days=7):
    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": ",".join(WEATHER_VARIABLES),
        "timezone": "Europe/Stockholm",
        "forecast_days": forecast_days
    }
    try:
        resp = requests.get(OPENMETEO_FORECAST_URL, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json().get("hourly", {})
        return data
    except requests.RequestException as e:
        print(f"Weather API request failed: {e}, returning defaults")
        # Build default hourly data for each variable
        return {var: [0.0]*(forecast_days*24) for var in WEATHER_VARIABLES} | {"time": []}


def _default_weather() -> dict:
    """Return default weather values."""
    return {
        "temperature_2m": 10.0,
        "precipitation": 0.0,
        "cloud_cover": 50.0,
        "wind_speed_10m": 5.0,
        "rain": 0.0,
        "snowfall": 0.0
    }


In [34]:
# Svenska Dagar API
SVENSKA_DAGAR_API_URL = "https://sholiday.faboul.se/dagar/v2.1"


def get_holiday_features(target_datetime: datetime) -> dict:
    """
    Get holiday features for a specific date.

    Args:
        target_datetime: Target datetime

    Returns:
        Dict with holiday features for the model
    """
    try:
        date = target_datetime.date()
        url = f"{SVENSKA_DAGAR_API_URL}/{date.year}/{date.month:02d}/{date.day:02d}"

        response = requests.get(url, timeout=30)

        if response.status_code != 200:
            print(f"Holiday API error: {response.status_code}")
            return _default_holidays(target_datetime)

        data = response.json()
        days = data.get("dagar", [])

        if not days:
            return _default_holidays(target_datetime)

        day = days[0]

        return {
            "is_work_free": day.get("arbetsfri dag") == "Ja",
            "is_red_day": day.get("röd dag") == "Ja",
            "is_day_before_holiday": day.get("dag före arbetsfri helgdag") == "Ja",
            "holiday_name": day.get("helgdag"),
            "day_of_week": int(day.get("dag i vecka", target_datetime.weekday() + 1)) - 1,  # Convert to 0-indexed
        }

    except Exception as e:
        print(f"Error fetching holiday data: {e}")
        return _default_holidays(target_datetime)


def _default_holidays(target_datetime: datetime) -> dict:
    """Return default holiday values based on day of week."""
    day_of_week = target_datetime.weekday()

    # Weekends are typically work-free
    is_weekend = day_of_week >= 5

    return {
        "is_work_free": is_weekend,
        "is_red_day": day_of_week == 6,  # Sundays are red days
        "is_day_before_holiday": False,
        "holiday_name": None,
        "day_of_week": day_of_week,
    }


In [73]:
forecast_days = 7
start_date = today
hours = list(range(5, 24))          # hours relevant for predicting occupancy

FEATURE_ORDER = [
    "trip_id",
    "vehicle_id",
    "max_speed",
    "n_positions",
    "lat_mean",
    "lon_mean",
    "hour",
    "day_of_week",
    "temperature_2m",
    "precipitation",
    "cloud_cover",
    "wind_speed_10m",
    "snowfall", 
    "rain",  
    "is_work_free",
    "is_red_day",
    "is_day_before_holiday",
]

DEFAULT_VEHICLE_FEATURES = {
    "max_speed": 25.0,      # typical max speed (in m/s, i.e. 90 km/h)
    "n_positions": 30,      # typical GPS points per trip window
}

forecast_dates = [today + timedelta(days=i) for i in range(1, forecast_days + 1)]
hours = list(range(5, 24))
forecast_times = [datetime.combine(d, datetime.min.time()).replace(hour=h)
                  for d in forecast_dates for h in hours]

weather_data = fetch_weather_all_hours(DEFAULT_LAT, DEFAULT_LON, forecast_days=forecast_days)
time_list = weather_data.get("time", [])
weather_by_dt = {}

for i, t in enumerate(time_list):
    dt = datetime.strptime(t, "%Y-%m-%dT%H:%M")
    weather_by_dt[dt] = {var: weather_data[var][i] for var in WEATHER_VARIABLES}

weather_rows = []
for dt in forecast_times:
    row = {"window_start": dt}
    row.update(weather_by_dt.get(dt, _default_weather()))
    weather_rows.append(row)

weather_df = pd.DataFrame(weather_rows)

holiday_rows = []
for d in forecast_dates:
    holidays = get_holiday_features(datetime.combine(d, datetime.min.time()))
    holiday_cleaned = {k: int(v) if v is not None else 0 for k, v in holidays.items()}
    holiday_rows.append({"_date": d, **holiday_cleaned})
holiday_df = pd.DataFrame(holiday_rows)

# Feature table per trip
forecast_rows = []
for trip in static_df["trip_id"].unique():
    for dt in forecast_times:
        row = {
            "window_start": dt,
            "trip_id": trip,
            "vehicle_id": 0,
            "lat_mean": DEFAULT_LAT,
            "lon_mean": DEFAULT_LON,
            "hour": dt.hour,
            "day_of_week": dt.weekday(),
            "max_speed": DEFAULT_VEHICLE_FEATURES["max_speed"],
            "n_positions": DEFAULT_VEHICLE_FEATURES["n_positions"],
        }
        forecast_rows.append(row)

forecast_features = pd.DataFrame(forecast_rows)


In [75]:
print(len(forecast_features))
print(forecast_features.columns)

print(len(weather_df))
print(weather_df.columns)

print(len(holiday_df))
print(holiday_df.columns)

forecast_features["_date"] = forecast_features["window_start"].dt.date
forecast_features['_date'] = pd.to_datetime(forecast_features['_date'], utc=True)
print(forecast_features['_date'].dtype)
print(holiday_df['_date'].dtype)

2149679
Index(['window_start', 'trip_id', 'vehicle_id', 'lat_mean', 'lon_mean', 'hour',
       'day_of_week', 'max_speed', 'n_positions'],
      dtype='object')
133
Index(['window_start', 'temperature_2m', 'precipitation', 'cloud_cover',
       'wind_speed_10m', 'rain', 'snowfall'],
      dtype='object')
7
Index(['_date', 'is_work_free', 'is_red_day', 'is_day_before_holiday',
       'holiday_name', 'day_of_week'],
      dtype='object')
datetime64[ns, UTC]
datetime64[ns, UTC]


In [78]:
print(forecast_features.columns)

Index(['window_start', 'trip_id', 'vehicle_id', 'lat_mean', 'lon_mean', 'hour',
       'day_of_week', 'max_speed', 'n_positions', '_date', 'temperature_2m',
       'precipitation', 'cloud_cover', 'wind_speed_10m', 'rain', 'snowfall'],
      dtype='object')


In [None]:
forecast_features = forecast_features.merge(weather_df, on="window_start", how="left")
forecast_features["_date"] = forecast_features["window_start"].dt.date
forecast_features['_date'] = pd.to_datetime(forecast_features['_date'], utc=True)
forecast_features = forecast_features.merge(holiday_df, on="_date", how="left")

In [None]:
print(forecast_features.columns)
print(len(forecast_features))

forecast_features = forecast_features.drop(['precipitation_x', 'temperature_2m_x', 'cloud_cover_x', 'wind_speed_10m_x', 'rain_x',
        'snowfall_x'], axis=1)

forecast_features = forecast_features.rename(columns={
    "precipitation_y": "precipitation",
    "temperature_2m_y": "temperature_2m",
    "cloud_cover_y": "cloud_cover",
    "wind_speed_10m_y": "wind_speed_10m",
    "rain_y": "rain",
    "snowfall_y": "snowfall"
})

Index(['window_start', 'trip_id', 'vehicle_id', 'lat_mean', 'lon_mean', 'hour',
       'day_of_week_x', 'max_speed', 'n_positions', '_date',
       'temperature_2m_y', 'precipitation_y', 'cloud_cover_y',
       'wind_speed_10m_y', 'rain_y', 'snowfall_y', 'is_work_free',
       'is_red_day', 'is_day_before_holiday', 'holiday_name', 'day_of_week_y'],
      dtype='object')
2149679


In [None]:
forecast_features = forecast_features.drop(['day_of_week_x'], axis=1)

forecast_features = forecast_features.rename(columns={
    "day_of_week_y": "day_of_week",
})

In [88]:
print(forecast_features.columns)
print(len(forecast_features))

Index(['window_start', 'trip_id', 'vehicle_id', 'lat_mean', 'lon_mean', 'hour',
       'max_speed', 'n_positions', '_date', 'temperature_2m', 'precipitation',
       'cloud_cover', 'wind_speed_10m', 'rain', 'snowfall', 'is_work_free',
       'is_red_day', 'is_day_before_holiday', 'holiday_name', 'day_of_week'],
      dtype='object')
2149679


In [89]:
X_forecast = forecast_features[FEATURE_ORDER]
proba = model.predict_proba(X_forecast)

forecast_features["predicted_occupancy_mode"] = np.argmax(proba, axis=1)
forecast_features["predicted_confidence"] = proba.max(axis=1)
forecast_features["predicted_proba"] = proba.tolist()

In [90]:
print(len(forecast_features))

2149679


In [91]:
hindcast_features = monitor_df[monitor_df["window_start"] < today]

print(hindcast_features.columns)

Index(['trip_id', 'window_start', 'vehicle_id', 'avg_speed', 'max_speed',
       'n_positions', 'speed_std', 'lat_min', 'lat_max', 'lat_mean', 'lon_min',
       'lon_max', 'lon_mean', 'bearing_min', 'bearing_max', 'occupancy_mode',
       'date_x', 'hour', 'day_of_week', 'datetime_hour', '_date',
       'temperature_2m', 'precipitation', 'cloud_cover', 'wind_speed_10m',
       'snowfall', 'rain', 'date_y', 'is_work_free', 'is_red_day',
       'is_day_before_holiday'],
      dtype='object')


In [94]:
hindcast_features = hindcast_features.drop(['date_x'], axis=1)

hindcast_features = hindcast_features.rename(columns={
    "date_y": "date",
})

In [95]:
print(hindcast_features.columns)

Index(['trip_id', 'window_start', 'vehicle_id', 'avg_speed', 'max_speed',
       'n_positions', 'speed_std', 'lat_min', 'lat_max', 'lat_mean', 'lon_min',
       'lon_max', 'lon_mean', 'bearing_min', 'bearing_max', 'occupancy_mode',
       'hour', 'day_of_week', 'datetime_hour', '_date', 'temperature_2m',
       'precipitation', 'cloud_cover', 'wind_speed_10m', 'snowfall', 'rain',
       'date', 'is_work_free', 'is_red_day', 'is_day_before_holiday'],
      dtype='object')


In [97]:
required_features = VEHICLE_FEATURES + WEATHER_FEATURES + HOLIDAY_FEATURES
missing = [c for c in required_features if c not in forecast_features.columns]
if missing:
    raise ValueError(f"Missing features from feature view: {missing}")

forecast_features["predicted_occupancy_mode"] = model.predict(
    forecast_features[required_features]
)

probas = model.predict_proba(forecast_features[required_features])

forecast_features["predicted_confidence"] = np.max(probas, axis=1)  # between 0 and 1

forecast_out = forecast_features[[
    "window_start",
    "trip_id",
    "predicted_occupancy_mode",
    "predicted_confidence",
]]

In [98]:
for col in HOLIDAY_FEATURES:
    if col in hindcast_features.columns:
        hindcast_features = hindcast_features.dropna(subset=HOLIDAY_FEATURES)
        hindcast_features[col] = hindcast_features[col].astype(int)

In [99]:
hindcast_features['trip_id'] = pd.to_numeric(hindcast_features['trip_id'], errors='coerce')
hindcast_features = hindcast_features.dropna(subset=['trip_id'])

hindcast_features['vehicle_id'] = pd.to_numeric(hindcast_features['vehicle_id'], errors='coerce')
hindcast_features = hindcast_features.dropna(subset=['vehicle_id'])

In [100]:
hindcast_features["predicted_occupancy_mode"] = model.predict(
    hindcast_features[VEHICLE_FEATURES + WEATHER_FEATURES + HOLIDAY_FEATURES]
)

hindcast_out = hindcast_features[[
    "window_start",
    "trip_id",
    "occupancy_mode",               # ground truth
    "predicted_occupancy_mode"
]].rename(columns={
    "occupancy_mode": "actual_occupancy_mode"
})


In [101]:
forecast_out.columns

Index(['window_start', 'trip_id', 'predicted_occupancy_mode',
       'predicted_confidence'],
      dtype='object')

In [102]:
print(hindcast_out.columns)

Index(['window_start', 'trip_id', 'actual_occupancy_mode',
       'predicted_occupancy_mode'],
      dtype='object')


In [None]:
# Merge with static trip information for analytics

forecast_out = forecast_out.merge(
    static_df[
        [
            "trip_id",
            "route_id",
            "route_short_name",
            "route_long_name",
            "route_desc"
        ]
    ],
    on="trip_id",
    how="left"
)


hindcast_out = hindcast_out.merge(
    static_df[
        [
            "trip_id",
            "route_id",
            "route_short_name",
            "route_long_name",
            "route_desc"
        ]
    ],
    on="trip_id",
    how="left"
)

print(hindcast_out.columns)

Index(['window_start', 'trip_id', 'actual_occupancy_mode',
       'predicted_occupancy_mode', 'route_id', 'route_short_name',
       'route_long_name', 'route_desc'],
      dtype='object')


KeyError: "['route_id_x', 'route_short_name_x', 'route_long_name_x', 'route_desc_x'] not found in axis"

In [104]:
print(hindcast_out.columns)

Index(['window_start', 'trip_id', 'actual_occupancy_mode',
       'predicted_occupancy_mode', 'route_id', 'route_short_name',
       'route_long_name', 'route_desc'],
      dtype='object')


In [105]:
y_true = hindcast_out["actual_occupancy_mode"]
y_pred = hindcast_out["predicted_occupancy_mode"]

precision = precision_score(y_true, y_pred, average="weighted", zero_division=0)
recall = recall_score(y_true, y_pred, average="weighted", zero_division=0)
accuracy = accuracy_score(y_true, y_pred)
f1_weighted = f1_score(y_true, y_pred, average="weighted")
mae = mean_absolute_error(y_true, y_pred)

print(f"\n  Results:")
print(f"    Accuracy:  {accuracy:.4f}")
print(f"    Precision: {precision:.4f} (weighted)")
print(f"    Recall:    {recall:.4f} (weighted)")
print(f"    F1 Score:  {f1_weighted:.4f} (weighted)")
print(f"    MAE:  {mae:.4f}")

hindcast_out["precision"] = precision
hindcast_out["recall"] = recall
hindcast_out["accuracy"] = accuracy
hindcast_out["mae"] = mae
hindcast_out["f1_weighted"] = f1_weighted
hindcast_out["model_version"] = model_version


  Results:
    Accuracy:  0.7190
    Precision: 0.6792 (weighted)
    Recall:    0.7190 (weighted)
    F1 Score:  0.6882 (weighted)
    MAE:  0.2952


In [106]:
forecast_fg = fs.get_or_create_feature_group(
    name="forecast_fg",
    version=1,
    primary_key=["window_start", "trip_id"],
    description="Forward bus occupancy predictions",
    online_enabled=False
)

In [108]:
forecast_out = forecast_out.rename(columns={
    "route_id_y": "route_id",
    "route_short_name_y": "route_short_name",
    "route_long_name_y": "route_long_name",
    "route_desc_y": "route_desc"
})


In [109]:
print(len(forecast_out))
print(forecast_out.columns)

5324256
Index(['window_start', 'trip_id', 'predicted_occupancy_mode',
       'predicted_confidence', 'route_id', 'route_short_name',
       'route_long_name', 'route_desc'],
      dtype='object')


In [110]:
forecast_out = forecast_out.drop(columns=["route_short_name", 
                                          "route_long_name", "route_desc"])

print(forecast_out.columns)

Index(['window_start', 'trip_id', 'predicted_occupancy_mode',
       'predicted_confidence', 'route_id'],
      dtype='object')


In [111]:
print(len(forecast_out))

5324256


In [112]:
forecast_fg.insert(
    forecast_out,
    write_options={"wait_for_job": True,"overwrite": True}   # only include latest forecast for that trip by overwriting
)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1908135


Uploading Dataframe: 100.00% |██████████| Rows 5324256/5324256 | Elapsed Time: 01:24 | Remaining Time: 00:00


Launching job: forecast_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271989/jobs/named/forecast_fg_1_offline_fg_materialization/executions
2026-01-07 12:35:04,465 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-07 12:35:07,630 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-07 12:35:10,801 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-07 12:41:09,504 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-07 12:41:09,747 INFO: Waiting for log aggregation to finish.
2026-01-07 12:41:41,872 INFO: Execution finished successfully.


(Job('forecast_fg_1_offline_fg_materialization', 'SPARK'), None)

In [113]:
hindcast_out = hindcast_out.drop(columns=["route_long_name", "route_short_name", 
                                          "route_desc"])

print(hindcast_out.columns)

Index(['window_start', 'trip_id', 'actual_occupancy_mode',
       'predicted_occupancy_mode', 'route_id', 'precision', 'recall',
       'accuracy', 'mae', 'f1_weighted', 'model_version'],
      dtype='object')


In [114]:
print(len(hindcast_out))

14957087


In [115]:
monitor_fg = fs.get_or_create_feature_group(
    name="monitor_fg",
    version=1,
    primary_key=["window_start", "trip_id"],
    description="Model monitoring and hindcast diagnostics",
    online_enabled=False
)

monitor_fg.insert(
    hindcast_out,
    write_options={"wait_for_job": True, "overwrite": False}   # append
)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1271989/fs/1258587/fg/1893848


Uploading Dataframe: 100.00% |██████████| Rows 14957087/14957087 | Elapsed Time: 05:55 | Remaining Time: 00:00


Launching job: monitor_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271989/jobs/named/monitor_fg_1_offline_fg_materialization/executions
2026-01-07 12:48:48,224 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-07 12:48:54,571 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-07 12:53:21,207 INFO: Waiting for execution to finish. Current state: FINISHED. Final status: SUCCEEDED
2026-01-07 12:53:21,627 INFO: Waiting for log aggregation to finish.
2026-01-07 12:53:21,627 INFO: Execution finished successfully.


(Job('monitor_fg_1_offline_fg_materialization', 'SPARK'), None)