## <span style="color:#ff5f27">📝 Imports </span>

In [1]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.metrics import mean_squared_error
import os
import joblib
import plotly.io as pio
from features.price import plot_prediction_test
from functions import predict_id

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [2]:
import hopsworks

project = hopsworks.login(
    host="34.34.32.154",
    project="lynx",
    api_key_value="bAQw5w2gJPuo3Zha.2o2wR0ewGljAy9wa66pM3AXgYxT7GNz4si4FvhV8GAdj6OySAgI0jE8CKI8tuguj"
)

fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://34.34.32.154:443/p/119
Connected. Call `.close()` to terminate connection gracefully.


In [3]:
averages_fg = fs.get_or_create_feature_group(
    name='averages',
    version=1,
)

prices_fg = fs.get_or_create_feature_group(
    name='prices',
    version=1,
)

## <span style="color:#ff5f27">🔪 Feature Selection </span>

In [5]:
query = prices_fg.select_all() \
    .join(averages_fg.select_except(['date']))
#query.show(5)

## <span style="color:#ff5f27">🤖 Transformation Functions </span>

In [8]:
# Load transformation function
min_max_scaler = fs.get_transformation_function(name="min_max_scaler")

feature_names = [
    'ma_7', 'ma_14', 'ma_30', 'daily_rate_of_change', 'volatility_30_day', 'ema_02', 'ema_05', 'rsi'
]

# Map features to transformations
transformation_functions = {
    feature_name: min_max_scaler
    for feature_name in feature_names
}
transformation_functions

{'ma_7': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'ma_14': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'ma_30': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'daily_rate_of_change': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'volatility_30_day': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'ema_02': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'ema_05': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>,
 'rsi': <hsfs.transformation_function.TransformationFunction at 0x7fb39e341100>}

## <span style="color:#ff5f27">⚙️ Feature View Creation </span>

In [10]:
feature_view = fs.get_or_create_feature_view(
    name='price_fv',
    version=1,
    query=query,
    labels=["price"],
    transformation_functions=transformation_functions,
)

FeatureStoreException: Feature name 'price' could not found be found in query.

## <span style="color:#ff5f27">🏋️ Training Dataset Creation </span>

In [None]:
X_train, X_test, y_train, y_test = feature_view.train_test_split(
    description='Prices Dataset',
    train_start='2022-09-01',
    train_end='2023-07-01',
    test_start='2023-07-01',
    test_end=datetime.today().strftime("%Y-%m-%d"),
)

In [None]:
X_train.head()

In [None]:
y_train.head()

In [None]:
X_train = X_train.sort_values("date")
y_train = y_train.reindex(X_train.index)

X_test = X_test.sort_values("date")
y_test = y_test.reindex(X_test.index)

train_date = pd.DataFrame(X_train.pop("date"))
test_date = pd.DataFrame(X_test.pop("date"))

## <span style="color:#ff5f27">🧬 Modeling </span>

We will use the XGBoost Regressor. XGBoost regressor is a powerful and highly effective machine learning algorithm for regression problems. XGBoost is known for its ability to handle complex relationships in the data, handle missing values, and provide accurate predictions. It's a popular choice in the data science community due to its robustness and excellent predictive performance, making it well-suited for our specific problem.

In [None]:
# Initialize the XGBoost regressor
model = xgb.XGBRegressor()

# Train the model on the training data
model.fit(X_train, y_train)

# Make predictions on the validation set
y_test_pred = model.predict(X_test)

# Calculate RMSE on the validation set
mse = mean_squared_error(y_test, y_test_pred, squared=False)
print(f"Mean Squared Error (MSE): {mse}")

In [None]:
prediction_for_id = predict_id(1, X_test, model)

fig = plot_prediction_test(1, X_train, X_test, y_train, y_test, train_date, test_date, prediction_for_id)
fig.show()

## <span style="color:#ff5f27">⚙️ Model Schema </span>

In [None]:
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

input_schema = Schema(X_train.values)
output_schema = Schema(y_train)
model_schema = ModelSchema(input_schema=input_schema, output_schema=output_schema)

model_schema.to_dict()

## <span style="color:#ff5f27">📝 Register model </span>

In [None]:
model_dir="price_model"
if os.path.isdir(model_dir) == False:
    os.mkdir(model_dir)

joblib.dump(model, model_dir + '/xgboost_price_model.pkl')
fig.write_image(f'{model_dir}/model_prediction.png')

In [None]:
mr = project.get_model_registry()

price_model = mr.python.create_model(
    name="xgboost_price_model", 
    metrics={"MSE": mse},
    model_schema=model_schema,
    input_example=X_train.sample(), 
    description="Price Predictor")

price_model.save(model_dir)

## <span style="color:#ff5f27">🚀 Model Deployment</span>

**About Model Serving**

Models can be served via KFServing or "default" serving, which means a Docker container exposing a Flask server. For KFServing models, or models written in Tensorflow, you do not need to write a prediction file (see the section below). However, for sklearn models using default serving, you do need to proceed to write a prediction file.

In order to use KFServing, you must have Kubernetes installed and enabled on your cluster.

## <span style="color:#ff5f27">📎 Predictor script for Python models</span>

Scikit-learn and XGBoost models are deployed as Python models, in which case you need to provide a Predict class that implements the predict method. The `predict()` method invokes the model on the inputs and returns the prediction as a list.

The `init()` method is run when the predictor is loaded into memory, loading the model from the local directory it is materialized to, ARTIFACT_FILES_PATH.

The directive **"%%writefile"** writes out the cell before to the given Python file. We will use the **predict_example.py** file to create a deployment for our model.

In [None]:
%%writefile predict_example.py
import os
import numpy as np
import hsfs
import joblib


class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model"""        
        # get feature store handle
        fs_conn = hsfs.connection()
        self.fs = fs_conn.get_feature_store()
        
        # get feature view
        self.fv = self.fs.get_feature_view("price_fv", 1)
        
        # initialize serving
        self.fv.init_serving(1)

        # load the trained model
        self.model = joblib.load(os.environ["ARTIFACT_FILES_PATH"] + "/xgboost_price_model.pkl")
        print("Initialization Complete")

    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        print(inputs)
        print(inputs[0][0])
        feature_vector = self.fv.get_feature_vector({"id": inputs[0][0]})
        return self.model.predict(np.asarray(feature_vector).reshape(1, -1)).tolist() 

This script needs to be put into a known location in the Hopsworks file system. Let's call the file predict_example.py and put it in the Models directory.

In [None]:
dataset_api = project.get_dataset_api()

uploaded_file_path = dataset_api.upload("predict_example.py", "Models", overwrite=True)
predictor_script_path = os.path.join("/Projects", project.name, uploaded_file_path)

## <span style="color:#ff5f27">🚀 Create the deployment</span>

Here, you fetch the model you want from the model registry and define a configuration for the deployment. For the configuration, you need to specify the serving type (default or KFserving).

In [None]:
deployment = price_model.deploy(
    name="priceonlinemodeldeployment",
    script_file=predictor_script_path
)

In [None]:
deployment.start(await_running=180)

In [None]:
deployment.get_state().describe()

---