In [1]:
#Step 1.1: Load and Merge All Monthly CSVs
import pandas as pd
import os
from glob import glob

# Correct folder path
data_path = '/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata'

# Recursively search for CSVs
csv_files = glob(os.path.join(data_path, '**', '*.csv'), recursive=True)

# Show what files were found
print(f"🔎 Found {len(csv_files)} CSV files.")
for file in csv_files:
    print(file)

# Continue only if found
if csv_files:
    df_list = [pd.read_csv(f) for f in csv_files]
    df = pd.concat(df_list, ignore_index=True)
    print("✅ Combined dataset shape:", df.shape)
else:
    print("⚠️ Still no CSVs found. Try checking extensions (.txt?) or share a screenshot of folder contents.")


🔎 Found 12 CSV files.
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/4_April/201404-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/12_December/201412-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/11_November/201411-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/7_July/201407-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/10_October/201410-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/9_September/201409-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/8_August/201408-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/6_June/201406-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/3_March/201403-citibike-tripdata_1.csv
/Users/sapthavarnidevineni/Downloads/2014-citibike-tripdata/1_January/201

In [3]:
#Step 1.2: Clean and Preprocess the Data
import pandas as pd

# ✅ Standardize column names (make lowercase, remove spaces)
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

# ✅ Convert time columns to datetime (allowing failed parsing)
df['starttime'] = pd.to_datetime(df['starttime'], errors='coerce', format='mixed')
df['stoptime'] = pd.to_datetime(df['stoptime'], errors='coerce', format='mixed')

# ✅ Add trip duration where timestamps are valid
df['trip_duration_min'] = (df['stoptime'] - df['starttime']).dt.total_seconds() / 60

# 📊 Just show how many timestamps failed parsing (optional)
print("Invalid starttime values:", df['starttime'].isna().sum())
print("Invalid stoptime values:", df['stoptime'].isna().sum())

print("✅ Cleaned dataset shape (no rows dropped):", df.shape)


Invalid starttime values: 0
Invalid stoptime values: 0
✅ Cleaned dataset shape (no rows dropped): (8081216, 16)


In [5]:
#Step 1.3: Select Top 3 Start Stations by Ride Count
top_stations = df['start_station_name'].value_counts().head(3)
print("Top 3 Stations:")
print(top_stations)

# Filter dataset for top 3 stations only
top_station_names = top_stations.index.tolist()
df_top3 = df[df['start_station_name'].isin(top_station_names)]

print("Filtered dataset shape (Top 3 stations):", df_top3.shape)


Top 3 Stations:
start_station_name
8 Ave & W 31 St          100498
Lafayette St & E 8 St     86692
E 17 St & Broadway        80166
Name: count, dtype: int64
Filtered dataset shape (Top 3 stations): (267356, 16)


In [7]:
import hopsworks

# Step 1: Login
project = hopsworks.login(
    api_key_value="HB0zAW5eEzl4iuNq.KJX5bZAdAnGaRJrIFVFVB30exr8wMMql5TZUuNMVeMUbcOVqRXg0fW3OWz2aRzOi",
    project="CDA500FINAL"
)

fs = project.get_feature_store()

# Step 2: Clean object (string) columns
for col in df_top3.select_dtypes(include='object').columns:
    df_top3[col] = df_top3[col].astype(str).fillna('')

# Optional: Fill numerical NaNs (if you want)
# df_top3 = df_top3.fillna(0)

# Step 3: Create or Get Feature Group
from hsfs.feature_group import FeatureGroup

fg = fs.get_or_create_feature_group(
    name="citibike_2014_top3",
    version=1,
    description="Citi Bike 2014 rides for top 3 start stations",
    primary_key=["starttime", "start_station_name"],
    event_time="starttime"
)

# Step 4: Insert into Hopsworks
fg.insert(df_top3, write_options={"wait_for_job": True})

print("✅ Data successfully inserted into Hopsworks Feature Store.")


2025-05-09 15:22:25,641 INFO: Initializing external client
2025-05-09 15:22:25,642 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-09 15:22:26,688 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1225937


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

Launching job: citibike_2014_top3_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1225937/jobs/named/citibike_2014_top3_1_offline_fg_materialization/executions
2025-05-09 15:26:05,662 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-09 15:26:08,747 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-09 15:28:34,642 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-09 15:28:34,733 INFO: Waiting for log aggregation to finish.
2025-05-09 15:28:43,045 INFO: Execution finished successfully.
✅ Data successfully inserted into Hopsworks Feature Store.


In [17]:
import os

netrc_path = os.path.expanduser("~/.netrc")
with open(netrc_path, "w") as f:
    f.write("""machine dagshub.com
login dsapthavarni
password af783d5cf1e2bc107ae9ce02b1af20612fc6c3c9
""")

os.chmod(netrc_path, 0o600)
print("✅ DagsHub token saved to ~/.netrc")


✅ DagsHub token saved to ~/.netrc


In [19]:
import mlflow

mlflow.set_tracking_uri("https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow")
mlflow.set_experiment("citi-bike-trip-prediction")


2025/05/09 23:39:13 INFO mlflow.tracking.fluent: Experiment with name 'citi-bike-trip-prediction' does not exist. Creating a new experiment.


<Experiment: artifact_location='mlflow-artifacts:/2add5aa706534654a26588936ea3bc2f', creation_time=1746848353832, experiment_id='0', last_update_time=1746848353832, lifecycle_stage='active', name='citi-bike-trip-prediction', tags={}>

In [25]:
import dagshub
dagshub.init(repo_owner='dsapthavarni', repo_name='CDA500BIKE', mlflow=True)

Output()



Open the following link in your browser to authorize the client:
https://dagshub.com/login/oauth/authorize?state=b2170373-262e-4f79-8890-b28ea0f498e7&client_id=32b60ba385aa7cecf24046d8195a71c07dd345d9657977863b52e7748e0f0f28&middleman_request_id=92b86de6a821f062bb441cb1dc4da79bb85a43f02f1d53b2424b5d879c46a28e


2025-05-09 23:41:23,081 INFO: HTTP Request: POST https://dagshub.com/login/oauth/middleman "HTTP/1.1 200 OK"


2025-05-09 23:41:23,230 INFO: HTTP Request: POST https://dagshub.com/login/oauth/access_token "HTTP/1.1 200 OK"
2025-05-09 23:41:23,377 INFO: HTTP Request: GET https://dagshub.com/api/v1/user "HTTP/1.1 200 OK"


2025-05-09 23:41:23,385 INFO: Accessing as dsapthavarni
2025-05-09 23:41:23,562 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/dsapthavarni/CDA500BIKE "HTTP/1.1 200 OK"
2025-05-09 23:41:23,714 INFO: HTTP Request: GET https://dagshub.com/api/v1/user "HTTP/1.1 200 OK"


2025-05-09 23:41:23,719 INFO: Initialized MLflow to track repo "dsapthavarni/CDA500BIKE"


2025-05-09 23:41:23,722 INFO: Repository dsapthavarni/CDA500BIKE initialized!


In [29]:
# -----------------------------
# Step 0: Import Required Modules
# -----------------------------
import pandas as pd
import mlflow
from dagshub import dagshub_logger
import dagshub
from sklearn.metrics import mean_absolute_error

# -----------------------------
# Step 1: Init DagsHub + MLflow
# -----------------------------
dagshub.init(repo_owner='dsapthavarni', repo_name='CDA500BIKE', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow")
mlflow.set_experiment("citi-bike-trip-prediction")

# -----------------------------
# Step 2: Create Daily Lag Features (1 to 28)
# -----------------------------
df_top3['starttime'] = pd.to_datetime(df_top3['starttime'])
df_top3['date'] = df_top3['starttime'].dt.date

# Get daily trip counts per station
daily_counts = df_top3.groupby(['start_station_name', 'date']).size().reset_index(name='trip_count')

# Sort properly
daily_counts = daily_counts.sort_values(['start_station_name', 'date'])

# Create lag features
for lag in range(1, 29):
    daily_counts[f'lag_{lag}'] = (
        daily_counts.groupby('start_station_name')['trip_count'].shift(lag)
    )

# Drop rows with NaNs in lag features
daily_lagged = daily_counts.dropna().reset_index(drop=True)

# Ensure 'date' column is datetime for filtering
daily_lagged['date'] = pd.to_datetime(daily_lagged['date'])

# -----------------------------
# Step 3: Train-Test Split (last 14 days = test)
# -----------------------------
cutoff_date = daily_lagged['date'].max() - pd.Timedelta(days=14)
train_df = daily_lagged[daily_lagged['date'] <= cutoff_date]
test_df = daily_lagged[daily_lagged['date'] > cutoff_date]

# -----------------------------
# Step 4: Baseline Model (Naive Lag 1)
# -----------------------------
y_true = test_df['trip_count'].values
y_pred = test_df['lag_1'].values
mae = mean_absolute_error(y_true, y_pred)

# -----------------------------
# Step 5: Log to DagsHub MLflow
# -----------------------------
with mlflow.start_run(run_name="baseline_naive_lag"):
    mlflow.log_param("model_type", "Naive Lag-1")
    mlflow.log_param("lag_used", 1)
    mlflow.log_metric("mae", mae)
    mlflow.set_tag("step", "baseline")
    
    print(f"✅ Baseline MAE logged to MLflow: {mae:.3f}")


2025-05-09 23:43:58,154 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/dsapthavarni/CDA500BIKE "HTTP/1.1 200 OK"


2025-05-09 23:43:58,160 INFO: Initialized MLflow to track repo "dsapthavarni/CDA500BIKE"


2025-05-09 23:43:58,162 INFO: Repository dsapthavarni/CDA500BIKE initialized!


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


✅ Baseline MAE logged to MLflow: 53.293
🏃 View run baseline_naive_lag at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0/runs/b84d1a92823a41fa92549b1edf791834
🧪 View experiment at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0


In [31]:
pip install lightgbm --quiet


Note: you may need to restart the kernel to use updated packages.


In [33]:
# -----------------------------
# Step 1: Import Dependencies
# -----------------------------
import lightgbm as lgb
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
import mlflow
import dagshub

# -----------------------------
# Step 2: Init DagsHub + MLflow (if not already done)
# -----------------------------
dagshub.init(repo_owner='dsapthavarni', repo_name='CDA500BIKE', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow")
mlflow.set_experiment("citi-bike-trip-prediction")

# -----------------------------
# Step 3: Define features and target
# -----------------------------
feature_cols = [f'lag_{i}' for i in range(1, 29)]
target_col = 'trip_count'

X = daily_lagged[feature_cols]
y = daily_lagged[target_col]

# Use the same 14-day holdout as before
X_train = X[daily_lagged['date'] <= cutoff_date]
X_test = X[daily_lagged['date'] > cutoff_date]
y_train = y[daily_lagged['date'] <= cutoff_date]
y_test = y[daily_lagged['date'] > cutoff_date]

# -----------------------------
# Step 4: Train LightGBM Model
# -----------------------------
with mlflow.start_run(run_name="lightgbm_28_lags"):

    model = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)

    # Log parameters and metric
    mlflow.log_param("model_type", "LightGBM")
    mlflow.log_param("features_used", 28)
    mlflow.log_metric("mae", mae)
    mlflow.set_tag("step", "lightgbm_full")

    print(f"✅ LightGBM MAE logged: {mae:.3f}")


2025-05-09 23:47:46,125 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/dsapthavarni/CDA500BIKE "HTTP/1.1 200 OK"


2025-05-09 23:47:46,129 INFO: Initialized MLflow to track repo "dsapthavarni/CDA500BIKE"


2025-05-09 23:47:46,130 INFO: Repository dsapthavarni/CDA500BIKE initialized!
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000630 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 7140
[LightGBM] [Info] Number of data points in the train set: 969, number of used features: 28
[LightGBM] [Info] Start training from score 262.987616
✅ LightGBM MAE logged: 64.984
🏃 View run lightgbm_28_lags at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0/runs/905bb8792f6447abb778ed9f5a97d9be
🧪 View experiment at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0


In [35]:
# -----------------------------
# Step 1: Get Feature Importances from Full Model
# -----------------------------
import numpy as np

# Reuse previously trained model from Step 2B
importances = model.feature_importances_
feature_importance_df = pd.DataFrame({
    'feature': feature_cols,
    'importance': importances
}).sort_values(by='importance', ascending=False)

# Select top 10 most important lag features
top10_features = feature_importance_df['feature'].iloc[:10].tolist()
print("🔝 Top 10 Features:", top10_features)

# -----------------------------
# Step 2: Train-Test Split with Top 10 Features
# -----------------------------
X_top10 = daily_lagged[top10_features]

X_train_top10 = X_top10[daily_lagged['date'] <= cutoff_date]
X_test_top10 = X_top10[daily_lagged['date'] > cutoff_date]
y_train_top10 = y[daily_lagged['date'] <= cutoff_date]
y_test_top10 = y[daily_lagged['date'] > cutoff_date]

# -----------------------------
# Step 3: Train & Log to MLflow
# -----------------------------
with mlflow.start_run(run_name="lightgbm_top10_features"):

    model_top10 = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, random_state=42)
    model_top10.fit(X_train_top10, y_train_top10)

    y_pred_top10 = model_top10.predict(X_test_top10)
    mae_top10 = mean_absolute_error(y_test_top10, y_pred_top10)

    mlflow.log_param("model_type", "LightGBM")
    mlflow.log_param("features_used", top10_features)
    mlflow.log_metric("mae", mae_top10)
    mlflow.set_tag("step", "lightgbm_top10")

    print(f"✅ LightGBM (Top 10 features) MAE: {mae_top10:.3f}")


🔝 Top 10 Features: ['lag_1', 'lag_11', 'lag_28', 'lag_20', 'lag_6', 'lag_14', 'lag_7', 'lag_18', 'lag_21', 'lag_17']
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000518 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 2550
[LightGBM] [Info] Number of data points in the train set: 969, number of used features: 10
[LightGBM] [Info] Start training from score 262.987616
✅ LightGBM (Top 10 features) MAE: 57.681
🏃 View run lightgbm_top10_features at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0/runs/d3e00ebf30c34b77a508852f0807e4dc
🧪 View experiment at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0


In [37]:
#feature_engineering.py
import pandas as pd
import hopsworks

# -----------------------------
# Step 1: Connect to Hopsworks
# -----------------------------
project = hopsworks.login(project="CDA500FINAL")
fs = project.get_feature_store()

# -----------------------------
# Step 2: Load top 3 station data
# -----------------------------
fg_raw = fs.get_feature_group("citibike_2014_top3", version=1)
df = fg_raw.read()

# -----------------------------
# Step 3: Prepare daily trip counts
# -----------------------------
df['starttime'] = pd.to_datetime(df['starttime'])
df['date'] = df['starttime'].dt.date

daily_counts = (
    df.groupby(['start_station_name', 'date'])
    .size()
    .reset_index(name='trip_count')
    .sort_values(['start_station_name', 'date'])
)

# -----------------------------
# Step 4: Create 28-day lag features
# -----------------------------
for lag in range(1, 29):
    daily_counts[f'lag_{lag}'] = (
        daily_counts.groupby('start_station_name')['trip_count'].shift(lag)
    )

# Drop rows with incomplete lag history
daily_lagged = daily_counts.dropna().reset_index(drop=True)

# Add timestamp column for Hopsworks event_time
daily_lagged['date'] = pd.to_datetime(daily_lagged['date'])

# -----------------------------
# Step 5: Write to Hopsworks Feature Store
# -----------------------------
fg_lagged = fs.get_or_create_feature_group(
    name="citibike_daily_lagged",
    version=1,
    primary_key=["date", "start_station_name"],
    event_time="date",
    description="Daily trip counts with 28 lag features for top 3 stations"
)

fg_lagged.insert(daily_lagged, write_options={"wait_for_job": True})

print("✅ Feature engineering complete and stored in Hopsworks.")


2025-05-10 00:28:01,613 INFO: Closing external client and cleaning up certificates.
Connection closed.
Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated



Paste it here:  ········


2025-05-10 00:28:27,772 INFO: Initializing external client
2025-05-10 00:28:27,773 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 00:28:29,041 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1225937
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (20.91s) 
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1225937/fs/1213520/fg/1454556


Uploading Dataframe: 100.00% |█| Rows 1010/1010 | Elapsed Time: 00:00 | Remainin


Launching job: citibike_daily_lagged_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1225937/jobs/named/citibike_daily_lagged_1_offline_fg_materialization/executions
2025-05-10 00:29:08,501 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-10 00:29:11,665 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-10 00:31:23,319 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-10 00:31:23,397 INFO: Waiting for log aggregation to finish.
2025-05-10 00:31:51,475 INFO: Execution finished successfully.
✅ Feature engineering complete and stored in Hopsworks.


In [47]:
#train_model.py
import pandas as pd
import lightgbm as lgb
from sklearn.metrics import mean_absolute_error
import joblib
import hopsworks
import mlflow
import dagshub

# -----------------------------
# Step 1: Connect to Hopsworks & MLflow
# -----------------------------
project = hopsworks.login(project="CDA500FINAL")
fs = project.get_feature_store()
mr = project.get_model_registry()

# DagsHub + MLflow
dagshub.init(repo_owner="dsapthavarni", repo_name="CDA500BIKE", mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow")
mlflow.set_experiment("citi-bike-trip-prediction")

# -----------------------------
# Step 2: Load lagged features
# -----------------------------
fg = fs.get_feature_group("citibike_daily_lagged", version=1)
df = fg.read()
df['date'] = pd.to_datetime(df['date'])

# -----------------------------
# Step 3: Prepare data
# -----------------------------
feature_cols = [f'lag_{i}' for i in range(1, 29)]
target_col = 'trip_count'

X = df[feature_cols]
y = df[target_col]
cutoff = df['date'].max() - pd.Timedelta(days=14)

X_train = X[df['date'] <= cutoff]
X_test = X[df['date'] > cutoff]
y_train = y[df['date'] <= cutoff]
y_test = y[df['date'] > cutoff]

# -----------------------------
# Step 4: Train and log model
# -----------------------------
with mlflow.start_run(run_name="lightgbm_28lags_final"):
    model = lgb.LGBMRegressor(n_estimators=100, learning_rate=0.1, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)

    # Log to MLflow
    mlflow.log_param("model_type", "LightGBM")
    mlflow.log_param("features_used", 28)
    mlflow.log_metric("mae", mae)

    print(f"✅ Model trained. MAE: {mae:.3f}")

    # Save model locally
    joblib.dump(model, "best_model.pkl")

    # -----------------------------
    # Step 5: Register model in Hopsworks
    # -----------------------------
    model_obj = mr.python.create_model(
        name="citibike_predictor",
        metrics={"mae": float(mae)},
        description="LightGBM with 28 lag features for Citi Bike trip prediction",
        input_example=X_test.head(1)
    )

    model_obj.save("best_model.pkl")
    print("✅ Model registered and saved to Hopsworks.")


2025-05-10 00:39:31,735 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-10 00:39:34,744 INFO: Initializing external client
2025-05-10 00:39:34,744 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 00:39:35,353 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1225937
2025-05-10 00:39:36,175 INFO: HTTP Request: GET https://dagshub.com/api/v1/repos/dsapthavarni/CDA500BIKE "HTTP/1.1 200 OK"


2025-05-10 00:39:36,180 INFO: Initialized MLflow to track repo "dsapthavarni/CDA500BIKE"


2025-05-10 00:39:36,182 INFO: Repository dsapthavarni/CDA500BIKE initialized!
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.52s) 
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000955 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 7114
[LightGBM] [Info] Number of data points in the train set: 969, number of used features: 28
[LightGBM] [Info] Start training from score 259.925697
✅ Model trained. MAE: 64.872




  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /Users/sapthavarnidevineni/CDA 500 final project/best_model.pkl: 0.000%|          | 0/272149 elapsed…

Uploading /Users/sapthavarnidevineni/CDA 500 final project/input_example.json: 0.000%|          | 0/183 elapse…

Model created, explore it at https://c.app.hopsworks.ai:443/p/1225937/models/citibike_predictor/1
✅ Model registered and saved to Hopsworks.
🏃 View run lightgbm_28lags_final at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0/runs/6d8c1a5408fc44b0863f078f0bf4475b
🧪 View experiment at: https://dagshub.com/dsapthavarni/CDA500BIKE.mlflow/#/experiments/0


In [49]:
import pandas as pd
import hopsworks
import joblib
from datetime import timedelta

# -----------------------------
# Step 1: Connect to Hopsworks
# -----------------------------
project = hopsworks.login(project="CDA500FINAL")
fs = project.get_feature_store()
mr = project.get_model_registry()

# -----------------------------
# Step 2: Load latest lag features
# -----------------------------
fg_lagged = fs.get_feature_group("citibike_daily_lagged", version=1)
df = fg_lagged.read()
df['date'] = pd.to_datetime(df['date'])

# Get the most recent day's features
latest_date = df['date'].max()
today_data = df[df['date'] == latest_date]

# -----------------------------
# Step 3: Load registered model
# -----------------------------
model = mr.get_model("citibike_predictor", version=1)
model_dir = model.download()
model = joblib.load(f"{model_dir}/best_model.pkl")

# -----------------------------
# Step 4: Predict next-day trip counts
# -----------------------------
feature_cols = [f'lag_{i}' for i in range(1, 29)]
X_today = today_data[feature_cols]
predictions = model.predict(X_today)

# Create prediction DataFrame
next_day = latest_date + timedelta(days=1)
prediction_df = pd.DataFrame({
    "date": next_day,
    "start_station_name": today_data["start_station_name"].values,
    "predicted_trip_count": predictions
})

# -----------------------------
# Step 5: Insert predictions into Hopsworks
# -----------------------------
fg_pred = fs.get_or_create_feature_group(
    name="citibike_predictions",
    version=1,
    primary_key=["date", "start_station_name"],
    event_time="date",
    description="Predicted trip counts for next day using LightGBM"
)

fg_pred.insert(prediction_df, write_options={"wait_for_job": True})
print("✅ Inference complete. Predictions saved to Hopsworks.")


2025-05-10 00:43:10,148 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-10 00:43:13,157 INFO: Initializing external client
2025-05-10 00:43:13,157 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 00:43:14,129 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1225937
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.54s) 


Downloading: 0.000%|          | 0/272149 elapsed<00:00 remaining<?

Feature Group created successfully, explore it at NE
https://c.app.hopsworks.ai:443/p/1225937/fs/1213520/fg/1454560


Uploading Dataframe: 100.00% |█| Rows 3/3 | Elapsed Time: 00:00 | Remaining Time


Launching job: citibike_predictions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1225937/jobs/named/citibike_predictions_1_offline_fg_materialization/executions
2025-05-10 00:43:30,473 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-10 00:43:33,639 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-10 00:45:07,526 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-10 00:45:07,703 INFO: Waiting for log aggregation to finish.
2025-05-10 00:45:19,446 INFO: Execution finished successfully.
✅ Inference complete. Predictions saved to Hopsworks.
