In [1]:
%reload_ext autoreload
%autoreload 2
import sys
import os
import importlib
script_dir = os.getcwd()  
sys.path.append(os.path.abspath(os.path.join(script_dir, '..')))
# os.getcwd()


In [2]:
import src.config as config

In [3]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/835736
Connected. Call `.close()` to terminate connection gracefully.


In [4]:
# Feature view is defined to read data from feature store, define how you want to fetch data from potentially many different feature groups and merge it to get the final dataset. 
try:
    feature_store.create_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION,
        query=feature_group.select_all()
    )
except:
    print("Feaeture view already existed. Skip creation")

feature_view = feature_store.get_feature_view(
    name=config.FEATURE_VIEW_NAME,
    version=config.FEATURE_VIEW_VERSION
)

Feaeture view already existed. Skip creation


In [5]:
# feature-view-obj.func(): gets training data-frame from hopsworks
ts_data, _ = feature_view.training_data(
    description="Time-series hourly taxi rides"
)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (15.82s) 



In [6]:
# sort ts-data-frame and print few values to make sure its there
ts_data.sort_values(by=["pickup_location_id", "pickup_hour"], inplace=True)
ts_data

Unnamed: 0,pickup_hour,num_of_rides,pickup_location_id
3325035,2022-01-01 00:00:00+00:00,0,1
1294678,2022-01-01 01:00:00+00:00,0,1
4286720,2022-01-01 02:00:00+00:00,0,1
1164401,2022-01-01 03:00:00+00:00,0,1
4142466,2022-01-01 04:00:00+00:00,1,1
...,...,...,...
3125477,2024-07-04 13:00:00+00:00,4,265
3125899,2024-07-04 14:00:00+00:00,3,265
3126100,2024-07-04 15:00:00+00:00,10,265
3260149,2024-07-04 16:00:00+00:00,5,265


In [7]:
from src.data import transform_ts_data_into_features_and_target
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning, module="numpy")


# Transform time-series-data into training-data which is features/targets
features, targets = transform_ts_data_into_features_and_target(
    ts_data,              # pass iong ts-dataframe
    input_seq_len=24*28,  # input is rides of previous month
    step_size=23,         # 
) # returns feautres-df 

features_and_target = features.copy()  # combine feautres/targets, add new column target-rides-next-hour equal to targets-df
features_and_target["target_rides_next_hour"] = targets
print(f"{features_and_target.shape=}") # (examples, idk)
# keep clearing this cells output




 99%|█████████▉| 257/259 [01:01<00:00,  3.38it/s]




100%|█████████▉| 258/259 [01:01<00:00,  3.43it/s]




100%|██████████| 259/259 [01:01<00:00,  4.20it/s]


features_and_target.shape=(218135, 675)


In [8]:
from datetime import date, timedelta, datetime
from pytz import timezone
import pandas as pd
from src.data_split import train_test_split
from datetime import datetime, timedelta, timezone

# HAD SOME TROUBLE WITH THIS THE CUTOFF DATE NOT BEING BEING COMPARABLE TO STRING
# cutoff_date = str((datetime.now(timezone.utc) - timedelta(days=28)).replace(minute=0, second=0, microsecond=0))
# current date minus one month ago, one month ago is cut off date
cutoff_date=pd.to_datetime(date.today() - timedelta(days=28*1), utc=True)
features_and_target['pickup_hour'] = pd.to_datetime(features_and_target['pickup_hour'])  # added this myself

print(f"{cutoff_date=}")
X_train, y_train, X_test, y_test = train_test_split(
    features_and_target,
    cutoff_date,
    target_column_name='target_rides_next_hour'   
)


print(f"X_train: {X_train.shape}") # (examples-in-train, num-features)
print(f"y_train: {y_train.shape}") # (examples-in-train,) 1D beceause only one target
print(f"X_test: {X_test.shape}")   # (examples-in-test, num-features)
print(f"y_test: {y_test.shape}")   # (examples-in-test,) 1D beceause only one target



cutoff_date=Timestamp('2024-06-06 00:00:00+0000', tz='UTC')
X_train: (210747, 674)
y_train: (210747,)
X_test: (7388, 674)
y_test: (7388,)


In [9]:
import numpy as np
from sklearn.model_selection import KFold, TimeSeriesSplit
from sklearn.pipeline import make_pipeline
from sklearn.metrics import mean_absolute_error
import optuna # hyperparameter optimization framework designed to automate the process of hyperparameter tuning in machine learning models. It provides an efficient way to search for the best set of hyperparameters that maximize or minimize a given objective function, such as model accuracy or loss.
from src.model import get_pipeline

# given set of HP, it trains a model and computes a validation error based on a TimeSeriesSplit
def objective(trial): # Optuna-trial
    hyperparams = {
        "metric": "mae",
        "verbose": -1,
        "num_leaves": trial.suggest_int("num_leaves", 2, 256),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.2, 1.0),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.2, 1.0),
        "min_child_samples": trial.suggest_int("min_child_samples", 3, 100), 
    }  # select HP

    tss = KFold(n_splits=3)  # define number of splitsm This initializes a KFold cross-validator with 3 splits, which means the data will be split into 3 parts for cross-validation.
    scores = []   # stores MAE-score of each split

    # iterate through each split in X-train, for each train-index and validation-data-index
    for train_index, val_index in tss.split(X_train):
        # split-train-data = split X-train-df from cur-train-index to end of dataframe
        # split-validation-data = split X-train-df from cur-validaiton-index to end of dataframe
        X_train_, X_val_ = X_train.iloc[train_index, :], X_train.iloc[val_index, :]
        # split-Ytrain-data = split by setting to cur-train-index of Y-train-df  
        # split-Yvalidation-data = split by setting to cur-val-index of Y-train-df  
        y_train_, y_val_ = y_train.iloc[train_index], y_train.iloc[val_index]

        
        pipeline = get_pipeline(**hyperparams)  # create pipeline-obj
        pipeline.fit(X_train_, y_train_)  # call fit on pipeline which performs defiend transformations in sequence and trains passed in model. Train on traing-split-cur-data

        y_pred = pipeline.predict(X_val_)  # compute predictions and error, on cur-validation-data for cur split
        mae = mean_absolute_error(y_val_, y_pred)
        scores.append(mae)

    return np.array(scores).mean()    # averagte the mae scores
        


In [10]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=1)

[I 2024-07-04 13:32:57,669] A new study created in memory with name: no-name-c15f034f-fc13-4efa-bbfb-b19152d45d72


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/u

[I 2024-07-04 13:33:45,224] Trial 0 finished with value: 2.7912554272299936 and parameters: {'num_leaves': 195, 'feature_fraction': 0.8560085629454792, 'bagging_fraction': 0.4497355832151372, 'min_child_samples': 81}. Best is trial 0 with value: 2.7912554272299936.


In [11]:
best_params = study.best_trial.params  # extract best-HP that optuna found
print(f'{best_params=}')

best_params={'num_leaves': 195, 'feature_fraction': 0.8560085629454792, 'bagging_fraction': 0.4497355832151372, 'min_child_samples': 81}


In [12]:
pipeline = get_pipeline(**best_params) # create pipeline-obj, passing in the new best found HP
pipeline.fit(X_train, y_train) # call fit on pipeline which performs defiend transformations in sequence and trains the original train-X, train-y dataframes


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.074109 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 171893
[LightGBM] [Info] Number of data points in the train set: 210747, number of used features: 676
[LightGBM] [Info] Start training from score 18.395802


In [13]:
predictions = pipeline.predict(X_test)
test_mae = mean_absolute_error(y_test, predictions)
print(f'{test_mae=:.4f}')

test_mae=4.8037


In [14]:
import joblib
from src.paths import MODELS_DIR

# save model locally on disk
joblib.dump(pipeline, MODELS_DIR / "model.pkl")

['/Users/pravachanpatra/Documents/PYTHON/AI_ML_DL/Real World ML Tutorial/Taxi Demand Predictor Project/models/model.pkl']

In [15]:
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

# before saving model in hopsworks need to define its schema
input_schema = Schema(X_train)
output_schema = Schema(y_train)
model_schema = ModelSchema(input_schema=input_schema, output_schema=output_schema)


In [17]:
# get point to model registry
model_registry = project.get_model_registry()
# model_registry-obj.create-model() pasing in schema and example
model = model_registry.sklearn.create_model(
    name="taxi_demand_predictor_next_hour",
    metrics={"test_mae":test_mae},
    description="LightGBM regressor with a bit of hyper-parameter tuning",
    input_example=X_train.sample(),
    model_schema=model_schema
)

# model-obj.save(): save model to hopsworks directory
model.save(str(MODELS_DIR / 'model.pkl'))


Connected. Call `.close()` to terminate connection gracefully.


  0%|          | 0/6 [00:00<?, ?it/s]

Uploading: 0.000%|          | 0/1783937 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/3399 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/58136 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/835736/models/taxi_demand_predictor_next_hour/1


Model(name: 'taxi_demand_predictor_next_hour', version: 1)