In [1]:
# push the model to the hopsworks
%load_ext autoreload
%autoreload 2

In [None]:
# Note:
# Lightgbm works, but xgboost not working
# get the feature group,
# create the feature view. 
# we select all the data from feature group and create the feature view based on the selected data

In [2]:
import lightgbm

In [3]:
import src.config as config

In [4]:
import hopsworks

# connect to the project
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY
)

# connect to the feature store
feature_store = project.get_feature_store()

# connect to the feature group
feature_group = feature_store.get_feature_group(
    name=config.FEATURE_GROUP_NAME,
    version=config.FEATURE_GROUP_VERSION,
)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/578708
Connected. Call `.close()` to terminate connection gracefully.


In [3]:

# import os
# import sys
# script_dir = os.getcwd()
# sys.path.append(os.path.abspath(os.path.join(script_dir, '..')))

In [4]:
# define feature view (get data from multple feature groups and merge it
 # to get the final data set

In [5]:
# FEATURE_VIEW_NAME = 'time_series_hourly_feature_view'
# FEATURE_VIEW_VERSION = 1

In [6]:
# # create feature view (if it doesn't exist yet)
# # This feature view only uses on feature group, so the query is trivial
# try:
#     # create feature view if it doesn't exist yet
#     feature_store.create_feature_view(
#         name=FEATURE_VIEW_NAME,
#         version=FEATURE_VIEW_VERSION,
#         query=feature_group.select_all()
#     )
# except:
#     print('Feature view already existed. Skip creation.')


# # get feature view
# feature_view = feature_store.get_feature_view(
#     name=FEATURE_VIEW_NAME,
#     version=FEATURE_VIEW_VERSION
# )

In [5]:
# create feature view (if it doesn't exist yet)
# This feature view only uses on feature group, so the query is trivial
try:
    # create feature view if it doesn't exist yet
    feature_store.create_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION,
        query=feature_group.select_all()
    )
except:
    print('Feature view already existed. Skip creation.')


# get feature view
feature_view = feature_store.get_feature_view(
    name=config.FEATURE_VIEW_NAME,
    version=config.FEATURE_VIEW_VERSION
)

Feature view already existed. Skip creation.


AttributeError: module 'src.config' has no attribute 'FEATURE_VIEW_NAME'

In [8]:
ts_data, _ = feature_view.training_data(
    description='Time-series hourly taxi rides',
)

Finished: Reading data from Hopsworks, using ArrowFlight (27.52s) 




In [9]:
# drop `pickup_ts` column
# ts_data.drop('pickup_ts', axis=1, inplace=True)

# sort by `pickup_location_id` and `pickup_hour`
ts_data.sort_values(by=['pickup_location_id', 'pickup_hour'], inplace=True)
ts_data

Unnamed: 0,pickup_hour,rides,pickup_location_id
3355951,2022-01-01 00:00:00+00:00,0,1
238606,2022-01-01 01:00:00+00:00,0,1
1319982,2022-01-01 02:00:00+00:00,0,1
3644763,2022-01-01 03:00:00+00:00,0,1
3854116,2022-01-01 04:00:00+00:00,1,1
...,...,...,...
4208411,2024-03-30 14:00:00+00:00,0,265
4209348,2024-03-30 15:00:00+00:00,3,265
4208069,2024-03-30 16:00:00+00:00,4,265
4210440,2024-03-30 17:00:00+00:00,4,265


In [10]:
from src.data import transform_ts_data_into_features_and_target

features, targets = transform_ts_data_into_features_and_target(
    ts_data,
    input_seq_len=24*28, # one month
    step_size=23,
)

features_and_target = features.copy()
features_and_target['target_rides_next_hour'] = targets

print(f'{features_and_target.shape=}')

100%|██████████| 265/265 [01:01<00:00,  4.33it/s]


features_and_target.shape=(202195, 675)


In [11]:
from datetime import date, timedelta
from pytz import timezone
import pandas as pd
from src.data_split import train_test_split

# training data -> from January 2022 up until 2 months ago
# test data -> last 2 months
cutoff_date = pd.to_datetime(date.today() - timedelta(days=28*1), utc=True)

print(f'{cutoff_date=}')

X_train, y_train, X_test, y_test = train_test_split(
    features_and_target,
    cutoff_date,
    target_column_name='target_rides_next_hour'   
)

print(f'{X_train.shape=}')
print(f'{y_train.shape=}')
print(f'{X_test.shape=}')
print(f'{y_test.shape=}')

cutoff_date=Timestamp('2024-03-02 00:00:00+0000', tz='UTC')
X_train.shape=(194245, 674)
y_train.shape=(194245,)
X_test.shape=(7950, 674)
y_test.shape=(7950,)


In [12]:
# X_train[X_train.select_dtypes(include=['float32']).columns] = \
#     X_train.select_dtypes(include=['float32']).astype(float)

# y_train = y_train.astype(float)

In [14]:
import numpy as np
from sklearn.model_selection import KFold, TimeSeriesSplit
from sklearn.pipeline import make_pipeline
from sklearn.metrics import mean_absolute_error
import optuna

from src.model import get_pipeline

def objective(trial: optuna.trial.Trial) -> float:
    """
    Given a set of hyper-parameters, it trains a model and computes an average
    validation error based on a TimeSeriesSplit
    """
    # pick hyper-parameters
    hyperparams = {
        "metric": 'mae',
        "verbose": -1,
        "num_leaves": trial.suggest_int("num_leaves", 2, 256),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.2, 1.0),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.2, 1.0),
        "min_child_samples": trial.suggest_int("min_child_samples", 3, 100),   
    }
    
    # sort X_train by `pikup_hour` inplace
    # so the TimeSeriesSplit will split the data in a consistent way
    X_train.sort_values('pickup_hour', inplace=True)

    tss = TimeSeriesSplit(n_splits=2)
    scores = []
    for train_index, val_index in tss.split(X_train):

        # split data for training and validation
        X_train_, X_val_ = X_train.iloc[train_index, :], X_train.iloc[val_index,:]
        y_train_, y_val_ = y_train.iloc[train_index], y_train.iloc[val_index]
        
        # train the model
        pipeline = get_pipeline(**hyperparams)
        pipeline.fit(X_train_, y_train_)
        
        # evaluate the model
        y_pred = pipeline.predict(X_val_)
        mae = mean_absolute_error(y_val_, y_pred)

        scores.append(mae)
   
    # Return the mean score
    return np.array(scores).mean()

In [15]:
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=1)

[I 2024-03-30 18:57:51,187] A new study created in memory with name: no-name-75fd1fe9-cacd-46bc-9868-3957ed140202
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
Parameters: { "bagging_fraction", "feature_fraction", "metric", "min_child_samples", "num_leaves", "verbose" } are not used.

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
A 

In [16]:
best_params = study.best_trial.params
print(f'{best_params=}')

best_params={'num_leaves': 116, 'feature_fraction': 0.30982326672613864, 'bagging_fraction': 0.351551055548769, 'min_child_samples': 71}


In [17]:
# retrain the model based on the best parameter

pipeline = get_pipeline(**best_params)
pipeline.fit(X_train, y_train)

See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
Parameters: { "bagging_fraction", "feature_fraction", "min_child_samples", "num_leaves" } are not used.



In [18]:
predictions = pipeline.predict(X_test)
test_mae = mean_absolute_error(y_test, predictions)
print(f'{test_mae=:.4f}')

test_mae=25.2864


See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)


In [19]:
import joblib
from src.paths import MODELS_DIR

joblib.dump(pipeline, MODELS_DIR / 'model.pkl')

['/Users/yenchunchen/Desktop/Project/machine_leraning/taxi_demand_predictor/models/model.pkl']

In [99]:
# X_train[X_train.select_dtypes(include=['float32']).columns] = \
#     X_train.select_dtypes(include=['float32']).astype(float)

# y_train = y_train.astype(float)

In [None]:
X_train.dtypes

rides_previous_672_hour                   float32
rides_previous_671_hour                   float32
rides_previous_670_hour                   float32
rides_previous_669_hour                   float32
rides_previous_668_hour                   float32
                                     ...         
rides_previous_2_hour                     float32
rides_previous_1_hour                     float32
pickup_hour                   datetime64[ns, UTC]
pickup_location_id                          int64
average_rides_last_4_weeks                float32
Length: 675, dtype: object

In [20]:
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

input_schema = Schema(X_train)
output_schema = Schema(y_train)
model_schema = ModelSchema(input_schema=input_schema, output_schema=output_schema)

In [21]:
# push the model to registrymodel_registry = project.get_model_registry()

model_registry = project.get_model_registry()

model = model_registry.sklearn.create_model(
    name="taxi_demand_predictor_next_hour",
    metrics={"test_mae": test_mae},
    description="Xgboost regressor with a bit of hyper-parameter tuning",
    input_example= X_train.sample(),
    model_schema=model_schema
)


Connected. Call `.close()` to terminate connection gracefully.


In [22]:

model.save(MODELS_DIR / 'model.pkl')

  0%|          | 0/6 [00:00<?, ?it/s]

TypeError: Object of type float32 is not JSON serializable