In [1]:
import sys
import subprocess

# Upgrade related dependencies
subprocess.run([sys.executable, "-m", "pip", "install", "--upgrade", "hsfs", "hopsworks", "protobuf"])

CompletedProcess(args=['c:\\Users\\MD\\AppData\\Local\\Programs\\Python\\Python312\\python.exe', '-m', 'pip', 'install', '--upgrade', 'hsfs', 'hopsworks', 'protobuf'], returncode=0)

In [1]:
import sys
import os
from pathlib import Path

# Add the parent directory to sys.path
sys.path.append(str(Path(os.getcwd()).resolve().parent))


In [2]:
import pandas as pd
from pathlib import Path
import hopsworks
import src.config as config

# ========================
# 📂 Load CitiBike Final Data
# ========================
print("📂 Loading rides_citibike_final_2024_with_lags.parquet...")

local_file_path = Path("../data/processed/final_features/rides_citibike_final_2024_with_lags.parquet")
real_df = pd.read_parquet(local_file_path)

print(f"✅ Loaded data shape: {real_df.shape}")

# ========================
# 🔗 Connect to Hopsworks
# ========================
print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)
feature_store = project.get_feature_store()

print("✅ Connected to Hopsworks!")

# ========================
# 🆕 Create New Feature Group
# ========================
from hsfs.feature_group import FeatureGroup

new_feature_group_name = "citibike_hourly_data_v2"

print(f"⚙️ Creating new Feature Group: {new_feature_group_name}")

new_feature_group = feature_store.create_feature_group(
    name=new_feature_group_name,
    version=1,
    primary_key=["start_station_id", "hour_ts"],
    event_time="hour_ts",
    description="Full CitiBike 2024 features with 672 lags, time features, no missing columns",
    online_enabled=False
)

print("✅ New Feature Group created successfully!")

# ========================
# 📦 Insert Full Data
# ========================
print(f"📦 Inserting full CitiBike data into {new_feature_group_name}...")

new_feature_group.insert(real_df)

print("🏁 SUCCESS! Full CitiBike 2024 data inserted into the new Feature Group!")


📂 Loading rides_citibike_final_2024_with_lags.parquet...
✅ Loaded data shape: (43020, 687)
🔗 Connecting to Hopsworks...
2025-05-11 00:56:28,691 INFO: Initializing external client
2025-05-11 00:56:28,691 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 00:56:29,420 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks!
⚙️ Creating new Feature Group: citibike_hourly_data_v2
✅ New Feature Group created successfully!
📦 Inserting full CitiBike data into citibike_hourly_data_v2...
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1231002/fs/1213537/fg/1458581


Uploading Dataframe: 100.00% |██████████| Rows 43020/43020 | Elapsed Time: 02:56 | Remaining Time: 00:00


Launching job: citibike_hourly_data_v2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1231002/jobs/named/citibike_hourly_data_v2_1_offline_fg_materialization/executions
🏁 SUCCESS! Full CitiBike 2024 data inserted into the new Feature Group!


In [3]:
import pandas as pd
from pathlib import Path
import hopsworks
import src.config as config

# ============================
# 📂 Load CitiBike 2025 Final Data
# ============================

print("📂 Loading rides_citibike_final_2025_with_lags.parquet...")

file_path_2025 = Path("../data/processed/final_features/rides_citibike_final_2025_with_lags.parquet")
data_2025 = pd.read_parquet(file_path_2025)

print(f"✅ Loaded 2025 data shape: {data_2025.shape}")

# ============================
# 🔗 Connect to Hopsworks
# ============================

print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)
feature_store = project.get_feature_store()

print("✅ Connected to Hopsworks!")

# ============================
# 📦 Fetch Existing Feature Group
# ============================

feature_group_name = "citibike_hourly_data_v2"   # ✅ SAME as before

print(f"📦 Fetching existing Feature Group: {feature_group_name}")

feature_group = feature_store.get_feature_group(
    name=feature_group_name,
    version=1
)

print(f"✅ Found Feature Group: {feature_group.name}")

# ============================
# 📦 Insert 2025 Data into Feature Group
# ============================

print("📦 Inserting 2025 data into Feature Store...")

feature_group.insert(data_2025)

print("🏁 SUCCESS! 2025 data inserted into Feature Group!")


📂 Loading rides_citibike_final_2025_with_lags.parquet...
✅ Loaded 2025 data shape: (6370, 687)
🔗 Connecting to Hopsworks...
2025-05-11 01:00:04,472 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 01:00:04,475 INFO: Initializing external client
2025-05-11 01:00:04,477 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 01:00:05,053 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks!
📦 Fetching existing Feature Group: citibike_hourly_data_v2
✅ Found Feature Group: citibike_hourly_data_v2
📦 Inserting 2025 data into Feature Store...


Uploading Dataframe: 100.00% |██████████| Rows 6370/6370 | Elapsed Time: 00:24 | Remaining Time: 00:00


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/citi/Resources/jobs/citibike_hourly_data_v2_1_offline_fg_materialization/config_1746939393364) to trigger the materialization job again.

🏁 SUCCESS! 2025 data inserted into Feature Group!


In [4]:
import pandas as pd
import pytz
from datetime import datetime

# Load
df = pd.read_parquet("../data/processed/final_features/rides_citibike_final_2024_with_lags.parquet")
print(f"✅ Loaded data shape: {df.shape}")

# Ensure hour_ts is datetime with timezone
df["hour_ts"] = pd.to_datetime(df["hour_ts"], utc=True).dt.tz_convert("America/New_York")

# Define cutoff
cutoff_date = pd.Timestamp("2024-11-01 00:00:00", tz="America/New_York")

# Split
train_df = df[df["hour_ts"] < cutoff_date]
test_df = df[df["hour_ts"] >= cutoff_date]

print(f"✅ Train shape: {train_df.shape}")
print(f"✅ Test shape: {test_df.shape}")


✅ Loaded data shape: (43020, 687)
✅ Train shape: (32046, 687)
✅ Test shape: (10974, 687)


In [21]:
# ==============================
# 📦 Imports
# ==============================
import pandas as pd
import numpy as np
from pathlib import Path
import os
import sys

import hopsworks
from hsfs.feature_group import FeatureGroup
from datetime import datetime

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from src.utils import split_time_series_data
import src.config as config

# ==============================
# 📂 Load the Final Data
# ==============================

print("📂 Loading rides_citibike_final_2024_with_lags.parquet...")

final_features_path = Path("C:/Users/MD/Desktop/citi/data/processed/final_features")
df = pd.read_parquet(final_features_path / "rides_citibike_final_2024_with_lags.parquet")

print(f"✅ Loaded data shape: {df.shape}")

# ==============================
# ✂️ Train-Test Split
# ==============================

X_train, y_train, X_test, y_test = split_time_series_data(
    df,
    cutoff_date=datetime(2025, 1, 1),
    target_column="ride_count"
)

# Merge features + target together for Hopsworks upload
train_df = X_train.copy()
train_df["ride_count"] = y_train

test_df = X_test.copy()
test_df["ride_count"] = y_test

print(f"✅ Train set shape: {train_df.shape}")
print(f"✅ Test set shape: {test_df.shape}")

# ==============================
# 🔗 Connect to Hopsworks
# ==============================

print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)
feature_store = project.get_feature_store()

print("✅ Connected to Hopsworks!")

# ==============================
# 📦 Create or Fetch Feature Groups
# ==============================

# 1. Train Feature Group
print("⚙️ Creating Train Feature Group...")

train_fg = feature_store.get_or_create_feature_group(
    name="citibike_train_data_v2",
    version=1,
    primary_key=["start_station_id", "hour_ts"],
    description="Train set for CitiBike 8-hour ride count prediction (v2)",
    event_time="hour_ts",
    online_enabled=False
)

print("✅ Train Feature Group ready!")

# 2. Test Feature Group
print("⚙️ Creating Test Feature Group...")

test_fg = feature_store.get_or_create_feature_group(
    name="citibike_test_data_v2",
    version=1,
    primary_key=["start_station_id", "hour_ts"],
    description="Test set for CitiBike 8-hour ride count prediction (v2)",
    event_time="hour_ts",
    online_enabled=False
)

print("✅ Test Feature Group ready!")

# ==============================
# 📥 Insert Data into Feature Groups
# ==============================

print("📥 Inserting Train data into Hopsworks...")
train_fg.insert(train_df, overwrite=True)
print("✅ Train data inserted!")

print("📥 Inserting Test data into Hopsworks...")
test_fg.insert(test_df, overwrite=True)
print("✅ Test data inserted!")

print("🏁 Done! Train and Test datasets successfully uploaded to Hopsworks!")


📂 Loading rides_citibike_final_2024_with_lags.parquet...
✅ Loaded data shape: (43020, 687)
✅ Split complete: 39333 train samples, 3687 test samples
✅ Train set shape: (39333, 687)
✅ Test set shape: (3687, 687)
🔗 Connecting to Hopsworks...
2025-05-11 01:51:13,887 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 01:51:13,892 INFO: Initializing external client
2025-05-11 01:51:13,892 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 01:51:17,094 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks!
⚙️ Creating Train Feature Group...
✅ Train Feature Group ready!
⚙️ Creating Test Feature Group...
✅ Test Feature Group ready!
📥 Inserting Train data into Hopsworks...
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1231002/fs/1213537/fg/1458585


Uploading Dataframe: 100.00% |██████████| Rows 39333/39333 | Elapsed Time: 04:34 | Remaining Time: 00:00


Launching job: citibike_train_data_v2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1231002/jobs/named/citibike_train_data_v2_1_offline_fg_materialization/executions
✅ Train data inserted!
📥 Inserting Test data into Hopsworks...
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1231002/fs/1213537/fg/1458587


Uploading Dataframe: 100.00% |██████████| Rows 3687/3687 | Elapsed Time: 00:20 | Remaining Time: 00:00


Launching job: citibike_test_data_v2_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1231002/jobs/named/citibike_test_data_v2_1_offline_fg_materialization/executions
✅ Test data inserted!
🏁 Done! Train and Test datasets successfully uploaded to Hopsworks!


In [5]:
import joblib
from pathlib import Path

# Path where the model was saved earlier
model_load_path = Path(r"C:\Users\MD\Desktop\citi\models\lgbmhyper.pkl")  # 🛑 Check the correct path

# Load model
lgbmhyper = joblib.load(model_load_path)

print("✅ lgbmhyper model loaded!")


✅ lgbmhyper model loaded!


In [12]:
# ========================
# 📚 Imports
# ========================
import hopsworks
import joblib
from pathlib import Path
import src.config as config  # ✅ Your config.py

# ========================
# 🔗 Connect to Hopsworks
# ========================
print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)
model_registry = project.get_model_registry()

print("✅ Connected to Hopsworks Model Registry!")

# ========================
# 📂 Load Model
# ========================
print("📂 Loading trained LightGBM model...")

model_local_path = Path(r"C:\Users\MD\Desktop\citi\models\lgbmhyper.pkl")  # ✅ Your trained model file
model = joblib.load(model_local_path)

print("✅ Model loaded!")

# ========================
# 🆕 Register Model
# ========================
print("🚀 Registering model into Hopsworks...")

model_registry_model = model_registry.python.create_model(
    name="citibike_lgbm_model_v2",  # ✅ Your model name
    metrics={"mae": 3.18, "rmse": 4.59},  # (Optional - you can update later)
    description="Optimized LightGBM model for CitiBike ride prediction with full lag features."
)

# Save/upload model
model_registry_model.save(str(model_local_path))

print("🏁 SUCCESS! Model registered in Hopsworks at:")
print("👉 https://c.app.hopsworks.ai/p/1231002/models")


🔗 Connecting to Hopsworks...
2025-05-11 01:28:16,655 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 01:28:16,668 INFO: Initializing external client
2025-05-11 01:28:16,669 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 01:28:17,490 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks Model Registry!
📂 Loading trained LightGBM model...
✅ Model loaded!
🚀 Registering model into Hopsworks...


  0%|          | 0/6 [00:00<?, ?it/s]

Uploading C:\Users\MD\Desktop\citi\models\lgbmhyper.pkl: 0.000%|          | 0/11826319 elapsed<00:00 remaining…

Model created, explore it at https://c.app.hopsworks.ai:443/p/1231002/models/citibike_lgbm_model_v2/2
🏁 SUCCESS! Model registered in Hopsworks at:
👉 https://c.app.hopsworks.ai/p/1231002/models


In [23]:
# ==============================
# 📦 Imports
# ==============================
import hopsworks
import src.config as config

# ==============================
# 🔗 Connect to Hopsworks
# ==============================

print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

feature_store = project.get_feature_store()

print("✅ Connected to Hopsworks!")

# ==============================
# 📋 Fetch Feature Group
# ==============================

print("📦 Fetching Feature Group 'citibike_hourly_data_v2'...")

feature_group = feature_store.get_feature_group(
    name="citibike_hourly_data_v2",
    version=1
)

print(f"✅ Feature Group fetched: {feature_group.name}")

# ==============================
# 👀 Create Feature View
# ==============================

print("⚙️ Creating Feature View for full CitiBike data...")

feature_view = feature_store.create_feature_view(
    name="citibike_hourly_feature_view_v2",    # 🚀 Feature View name (new)
    version=1,
    description="Feature View for CitiBike 2024 data with full 8-hour lag features and time features",
    labels=["ride_count"],                     # 🎯 Prediction target
    query=feature_group.select_all()
)

print(f"🏁 SUCCESS! Feature View created: {feature_view.name}")


🔗 Connecting to Hopsworks...
2025-05-11 02:00:46,272 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 02:00:46,289 INFO: Initializing external client
2025-05-11 02:00:46,289 INFO: Base URL: https://c.app.hopsworks.ai:443


2025-05-11 02:00:46,872 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks!
📦 Fetching Feature Group 'citibike_hourly_data_v2'...
✅ Feature Group fetched: citibike_hourly_data_v2
⚙️ Creating Feature View for full CitiBike data...
Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1231002/fs/1213537/fv/citibike_hourly_feature_view_v2/version/1
🏁 SUCCESS! Feature View created: citibike_hourly_feature_view_v2


In [27]:
# ==============================
# 📚 Import required
# ==============================
import hopsworks
import src.config as config

# ==============================
# 🔗 Connect to Hopsworks
# ==============================
print("🔗 Connecting to Hopsworks...")

project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

feature_store = project.get_feature_store()

print("✅ Connected to Hopsworks!")

# ==============================
# 📋 Fetch the Feature View
# ==============================
print("📦 Fetching Feature View 'citibike_hourly_feature_view_v2'...")

feature_view = feature_store.get_feature_view(
    name="citibike_hourly_feature_view_v2",
    version=1
)

print(f"✅ Feature View loaded: {feature_view.name}")

# ==============================
# 📈 Create Training Dataset
# ==============================

print("🎯 Creating training dataset...")

training_dataset, label = feature_view.create_training_data(
    description="Training dataset for CitiBike LGBM prediction (2024 full with lags)",
    data_format="parquet",
    coalesce=True,
    write_options={"wait_for_job": True}
)

print(f"🏁 SUCCESS! Training dataset created with ID: {training_dataset}")


🔗 Connecting to Hopsworks...
2025-05-11 02:07:03,709 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 02:07:03,725 INFO: Initializing external client
2025-05-11 02:07:03,725 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-11 02:07:04,534 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1231002
✅ Connected to Hopsworks!
📦 Fetching Feature View 'citibike_hourly_feature_view_v2'...
✅ Feature View loaded: citibike_hourly_feature_view_v2
🎯 Creating training dataset...
Finished: Materializing data to Hopsworks, using Hopsworks Feature Query Service (8.75s) 

🏁 SUCCESS! Training dataset created with ID: 1


In [29]:
df = feature_group.read()
print(df.dtypes)


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (35.67s) 
hour_ts               datetime64[us, Etc/UTC]
start_station_name                     object
start_station_id                        int64
ride_count                              int32
hour                                    int32
                               ...           
ride_count_lag_668                    float64
ride_count_lag_669                    float64
ride_count_lag_670                    float64
ride_count_lag_671                    float64
ride_count_lag_672                    float64
Length: 687, dtype: object
