# Demand Feature Pipeline Notebook

This notebook processes demand data and uploads it to the Hopsworks feature store. It replicates the functionality of the feature_pipeline.py script in an interactive format.

In [1]:
import pandas as pd
import hopsworks
import os
from datetime import datetime

  from .autonotebook import tqdm as notebook_tqdm


## Load Environment Variables

We'll load environment variables for Hopsworks connection credentials.

## Connect to Hopsworks Feature Store

Establish connection to the Hopsworks Feature Store using credentials from environment variables.

In [2]:
project = hopsworks.login()
fs = project.get_feature_store()

2025-05-22 23:40:52,868 INFO: Initializing external client
2025-05-22 23:40:52,870 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-22 23:40:54,573 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/398


## Load Source Data

Load the demand data from CSV file and prepare it for the feature store.

In [3]:
print("Loading source data")
demand_df = pd.read_csv('../data/demand_qty_item_loc.csv')

# Display first few rows to inspect the data
display(demand_df.head())

Loading source data


Unnamed: 0,sp_id,loc_id,time_bucket,repetitive_demand_quantity
0,9684698,3,202104,55.0
1,9684698,3,202105,117.0
2,9684698,3,202106,62.0
3,9684698,3,202107,45.0
4,9684698,3,202108,77.0


In [None]:

# Add datetime column for feature store
# demand_df['datetime'] = datetime.now()

demand_df = demand_df.sort_values(by='time_bucket')

# Create lagged features
demand_df['lagged_1day_demand_quantity'] = demand_df['repetitive_demand_quantity'].shift(1)
demand_df['lagged_2day_demand_quantity'] = demand_df['repetitive_demand_quantity'].shift(2)

# Remove rows with NaN values
demand_df = demand_df.dropna()

# Display the transformed dataframe
display(demand_df.head())

Unnamed: 0,sp_id,loc_id,time_bucket,repetitive_demand_quantity,datetime,lagged_1day_demand_quantity,lagged_2day_demand_quantity
1824,9052071,3,202104,37.0,2025-05-22 23:41:01.569913,516.0,55.0
1776,10087345,3,202104,56.0,2025-05-22 23:41:01.569913,37.0,516.0
7104,8830337,3,202104,19.0,2025-05-22 23:41:01.569913,56.0,37.0
1728,8302117,3,202104,184.0,2025-05-22 23:41:01.569913,19.0,56.0
7152,8033728,3,202104,845.0,2025-05-22 23:41:01.569913,184.0,19.0


## Create Feature Group and Upload Data

Define the feature group schema and upload the prepared data to the feature store.

In [5]:
print("⬆ Creating/getting feature group")
# Define the feature group
# Configure parameters (these can be modified as needed)
feature_group_name = 'demand_features'
version = 1
demand_fg = fs.get_or_create_feature_group(
    name=feature_group_name,
    version=version,
    description="Item demand by location and time",
    primary_key=['sp_id', 'loc_id', 'time_bucket'],
    event_time='time_bucket',
)

⬆ Creating/getting feature group


In [6]:
print("⬆ Uploading data to the Feature Store")
# Upload data to the feature store
demand_fg.insert(demand_df)
print("Feature pipeline completed successfully")

⬆ Uploading data to the Feature Store
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fg/1480010


Uploading Dataframe: 100.00% |██████████| Rows 9598/9598 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: demand_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/398/jobs/named/demand_features_1_offline_fg_materialization/executions
2025-05-22 23:41:53,970 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-22 23:42:00,345 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-22 23:44:23,412 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-22 23:44:23,570 INFO: Waiting for log aggregation to finish.
2025-05-22 23:44:42,230 INFO: Execution finished successfully.
Feature pipeline completed successfully
