# 3. ML Modeling

In this notebook, we will illustrate how to train an XGBoost model with the scooby doo dataset using the Snowpark ML Model API. <br>
We also show how to do inference and deploy the model as a UDF.

The Snowpark ML Model API currently supports sklearn, xgboost, and lightgbm models.

## Import libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV

# data science libs
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error

# misc
import json
import joblib
import cachetools

# warning suppresion
import warnings; warnings.simplefilter('ignore')

## 1. Establish secure connection to snowflake

In [None]:
with open('creds.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

In [None]:
# Specify the table name where we stored the scooby_doo dataset
DEMO_TABLE = 'scooby_clean'
input_tbl = f"{session.get_current_database()}.{session.get_current_schema()}.{DEMO_TABLE}"

## 2. Data loading

In [None]:
# First, we read in the data from a Snowflake table into a Snowpark DataFrame
scooby_df = session.table(input_tbl)

# Filter out rows where IMDB is NULL
scooby_df = scooby_df.filter(F.not_(F.is_null(F.col("IMDB"))))
print(scooby_df.count())
scooby_df.show()

In [None]:
# Categorize all the features for modeling

CATEGORICAL_COLUMNS = ["FORMAT","NETWORK","SETTING_TERRAIN","MOTIVE","MONSTER_GENDER","CULPRIT_GENDER"]
CATEGORICAL_COLUMNS_OE = ["FORMAT_OHE","NETWORK_OHE","SETTING_TERRAIN_OHE","MOTIVE_OHE","MONSTER_GENDER_OHE","CULPRIT_GENDER_OHE"]

NUMERICAL_COLUMNS = ["ENGAGEMENT","RUN_TIME","ZOINKS","GROOVY","SCOOBY_DOO_WHERE_ARE_YOU","ROOBY_ROOBY_ROO"]
NUMERICAL_COLUMNS_NORM = ["ENGAGEMENT_NORM","RUN_TIME_NORM","ZOINKS_NORM","GROOVY_NORM","SCOOBY_DOO_WHERE_ARE_YOU_NORM","ROOBY_ROOBY_ROO_NORM"]

LABEL_COLUMNS = ['IMDB']
OUTPUT_COLUMNS = ['IMDB_PRICE']

scooby_ml_df = scooby_df.select(LABEL_COLUMNS + CATEGORICAL_COLUMNS + NUMERICAL_COLUMNS)
scooby_ml_df.show()

In [None]:
print(set(scooby_ml_df.select(F.col("CULPRIT_GENDER")).collect()))

In [None]:
scooby_df = session.read.options({"field_delimiter": ",", 
                                    "skip_header": 1,
                                    "field_optionally_enclosed_by": '"'}).schema(scooby_schema).csv("@SCOOBY_ASSETS")

In [None]:
# Load the preprocessing pipeline object
PIPELINE_FILE = 'preprocessing_pipeline.joblib'
preprocessing_pipeline = joblib.load(PIPELINE_FILE)

## 3. Build a simple XGBoost Regression model


- The `model.fit()` function creates a temporary stored procedure in the background. This means that the model training is a single-node operation. If more memory is needed, Snowflake has the Snowpark Optimised Wharehouse which has 16x more memory than a standard warehouse. For now we are just using a standard x-small warehouse created in the setup step.
- The `model.predict()` function creates a temporary vectorized UDF in the background, which means the input DataFrame is batched as Pandas DataFrames and inference is parallelized across the batches of data.

In [None]:
# Split the data into train and test sets
scooby_train_df, scooby_test_df = scooby_ml_df.random_split(weights=[0.9, 0.1], seed=0)

# Run the train and test sets through the Pipeline object we defined earlier
train_df = preprocessing_pipeline.fit(scooby_train_df).transform(scooby_train_df)
test_df = preprocessing_pipeline.transform(scooby_test_df)

In [None]:
train_df.show()

In [None]:
# Obtain the column names of the dataset for the Regressor, we are going to exclude the None values

CAT_COLS = [k for k in train_df.columns if ('_NORM' in k) & ('None' not in k)]
NUM_COLS = [k for k in train_df.columns if ('_OHE' in k) & ('None' not in k)]

In [None]:
# Define the XGBRegressor
regressor = XGBRegressor(
    input_cols=CAT_COLS + NUM_COLS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
regressor.fit(train_df)

# Predict
result = regressor.predict(test_df)

In [None]:
# We can analyse the results using Snowpark ML's MAPE

mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="IMDB", 
                                        y_pred_col_names="IMDB_PRICE")

result.select("IMDB", "IMDB_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=result["IMDB", "IMDB_PRICE"].to_pandas().astype("float64"), x="IMDB", y="IMDB_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

### Now, let's use Snowpark ML's `GridSearchCV()` function to find optimal model parameters

In [None]:
grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols= CAT_COLS + NUM_COLS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)

We see that the best estimator has the following parameters: `n_estimators=100` & `learning_rate=0.1`.

We can use `to_sklearn()` in order to get the actual xgboost model object, which gives us access to all its attributes.

In [None]:
grid_search.to_sklearn().best_estimator_

In [None]:
# Analyze grid search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})

sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

plt.show()

This is consistent with the `learning_rate=0.1` and `n_estimator=100` chosen as the best estimator with the lowest MAPE.
<br>
Next we predict and analyse the results using the best estimator <br>

The previous mape was: 0.05170313160722504 <br>
With the best estimator it is: 0.050110345648975284


In [None]:
# Predict
result = grid_search.predict(test_df)

# Analyze results
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="IMDB", 
                                        y_pred_col_names="IMDB_PRICE")

result.select("IMDB", "IMDB_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=result["IMDB", "IMDB_PRICE"].to_pandas().astype("float64"), x="IMDB", y="IMDB_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

## 4. Model deployment using permanent Vectorised User-Defined Function (UDF)

One thing to keep in mind is that Snowpark ML's `model.predict()` function creates a ***temporary*** UDF, so in order to persist as a permanent UDF, we'll need to define our own UDF. 

***Note: Once Snowpark ML's native model registry is available, this will be the more streamlined approach to deploy your model.***

We will save the model as an sklearn object so it can be used externally. This is how we'll deploy the model as a UDF in the next step.


In [None]:
# Let's save our optimal model first
optimal_model = grid_search.to_sklearn()
MODEL_FILE = 'model.joblib'
joblib.dump(optimal_model, MODEL_FILE) # we are just pickling it locally first

# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@SCOOBY_ASSETS", overwrite=True)

In [None]:
# Get all relevant column names to pass into the UDF call
feature_cols = test_df[CAT_COLS + NUM_COLS].columns

Create the vectorised User Defined Function (UDF)

In [None]:
# Cache the model load to optimize inference
@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

# Register the UDF via decorator
@udf(name='batch_predict_imdb', 
     session=session, 
     replace=True, 
     is_permanent=True, 
     stage_location='@SCOOBY_ASSETS',
     input_types=[F.FloatType()]*len(feature_cols),
     return_type=F.FloatType(),
     imports=['@SCOOBY_ASSETS/model.joblib.gz'],
     packages=['pandas','joblib','cachetools','xgboost'])
def batch_predict_imdb(test_df: pd.DataFrame) -> pd.Series:
    # Need to name the columns because column names aren't passed in to this function
    test_df.columns = CAT_COLS + NUM_COLS
    model = load_model('model.joblib.gz')
    return model.predict(test_df) # This is using the XGBoost library's model.predict(), not Snowpark ML's

Call Vectorized User-Defined Function (UDF) on test data.

In [None]:
test_df_w_preds = test_df.with_column('PREDICTED_IMDB', batch_predict_imdb(*feature_cols))
test_df_w_preds.show()

In [None]:
# Write predictions to a Snowflake table
test_df_w_preds.write.mode('overwrite').save_as_table('scooby_w_predictions')

In [None]:
session.close()