# GPU Based XGBoost Training
## In the following notebook we will leverage Snowpark Container Services (SPCS) to run a notebook on a GPU cluster within Snowflake

### * Workflow* 
- Inspect GPU resources available - for this exercise we will use four NVIDIA A10G GPUs
- Load in data from Snowflake table
- Set up data for modeling
- Train two XGBoost models - one trained with open source xgboost (single GPU) and one distributing across the full GPU cluster
- Log the model into Snowflake's model registry then test out built-in inference and explainability capabilities on the model object

### * Key Takeaways* 
- SPCS allows users to run notebook workloads that execute on containers, rather than virtual warehouses in Snowflake
- While Open Source XGBoost is compatible with GPUs, by default it is restricted to a single GPU. Snowflake enables users to easily distribute their training jobs across all available GPUs which can greatly speed up execution time 🔥
- Snowflake's model registry provides a secure and flexible framework for users to deploy, track, and access models
- Bringing in third party python libraries offers flexibility to leverage great contirbutions to the OSS ecosystem


### Note - In order to successfully run !pip installs make sure you have enabled the external access integration with pypi
- Do so by clicking on the drop down of the 🟢 Active kernel settings button, clicking Edit Compute Settings, then turning on the PYPI_ACCESS_INTEGRATION radio button in the external access tab

In [None]:
!pip install plotnine

In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import sys
import seaborn as sns
import matplotlib.pyplot as plt

# xgboost libraries
import xgboost
from xgboost import XGBRegressor

# Snowpark libraries & session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import col
from snowflake.snowpark.context import get_active_session
session = get_active_session()
session

In [None]:
import torch

# Get the list of GPUs
if torch.cuda.is_available():
    # Get the number of GPUs
    num_gpus = torch.cuda.device_count()

    print(f'{num_gpus} GPU Device(s) Found')
    # Print the list of GPUs
    for i in range(num_gpus):
        print("Name:", torch.cuda.get_device_name(i), "  Index:", i)
else:
    print("No GPU available")


In [None]:
#Load in data from Snowflake table into a Snowpark dataframe
table = "XGB_GPU_DATABASE.XGB_GPU_SCHEMA.VEHICLES_TABLE"
df = session.table(table)
df.count(), len(df.columns)

In [None]:
#Note the maximum price - a $3B car must be quite a spectacle, but we don't want to use that for our model
df.select('PRICE').describe()

In [None]:
#Lets filter down to cars $100k or less - note that we only filter out ~1% of our data here
df = df.filter(col('PRICE')<100000)
df.select('PRICE').describe()

In [None]:
#View data schema
list(df.schema)

In [None]:
#Drop some columns that won't be helpful for modeling
drop_cols = ["ID","URL", "REGION_URL", "IMAGE_URL", "DESCRIPTION", "VIN", "POSTING_DATE", 'COUNTY']
df = df.drop(drop_cols)

In [None]:
#Fill NULL values with "NA" for string columns and 0 for numerical columns
from snowflake.snowpark.types import StringType
string_cols = df.select([col.name for col in df.schema if col.datatype ==StringType()]).columns
non_string_cols = df.drop(string_cols).columns

df = df.fillna("NA", subset=string_cols)
df = df.fillna(0, subset= non_string_cols)

In [None]:
#Use pandas to find the top 100 car models and top 100 regions and cast any extra values to 'INFREQUENT' to avoid excessive dimensionality
df_pd = df.to_pandas()
top_n_models = df_pd.MODEL.value_counts().keys()[0:100]
top_n_regions = df_pd.REGION.value_counts().keys()[0:100]
df_pd['MODEL'] = df_pd.MODEL.apply(lambda x: x if x in top_n_models else 'INFREQUENT')
df_pd['REGION'] = df_pd.REGION.apply(lambda x: x if x in top_n_regions else 'INFREQUENT')

df = session.create_dataframe(df_pd)

In [None]:
#Union the data to itself a few times to go from 400k rows to 1.7M rows. This lab's purpose is to test performance so we want to have a decently large dataset!
for i in range(1,3):
    df = df.unionAll(df)

df.count()

In [None]:
import snowflake.ml.modeling.preprocessing as snowml

OHE_COLS = string_cols
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]


# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
transformed_df = snowml_ohe.fit(df).transform(df)
transformed_df.columns

In [None]:
#Rename columns to avoid issues with " characters later on

#Create dict replacing bad column names
renaming_dict = {}
for n, col in enumerate(transformed_df.columns):
    double_quote_spot = col.find('"')
    if double_quote_spot==0:
        renaming_dict[col] = col[double_quote_spot+1:col.find("_")]+f"__{n}"
    else:
        renaming_dict[col] = col


#Create new df with renamed and sorted columns
df_renamed = transformed_df.rename(renaming_dict)
df_renamed = df_renamed.select(sorted(df_renamed.columns))

In [None]:
# Split the data into train and test sets
train, test = df_renamed.random_split(weights=[0.95, 0.05], seed=0)

In [None]:
#Convert snowpark tables to pandas for use later on
train_pd = train.to_pandas()
test_pd = test.to_pandas()

## Model Training

### Now that our data is all set up - we will train a Distributed GPU-powered XGBoost model
#### The parameter that instructs our model to leverage GPUs is *tree_method*. 
--- When *tree_method* is set to *hist* the model will not attempt to use GPUs

--- When *tree_method* is set to *gpu_hist* the model will leverage any available GPUs found

--- Snowflake offers the ability to leverage multi-GPU training (i.e. using all 4 of our A10G GPUs we have available) for optimized performance

--- Open Source XGBoost will only use a single GPU

In [None]:
#Define oss xgboost model
oss_xgb_gpu = XGBRegressor(
    tree_method="gpu_hist",
    n_estimators=2000,
)

In [None]:
#train oss xgboost model
oss_xgb_gpu.fit(
    X=train_pd.drop("PRICE", axis=1),
    y=train_pd["PRICE"],)

In [None]:
#compute predictions and performance metrics
from sklearn.metrics import r2_score
xgb_yhat = oss_xgb_gpu.predict(test_pd.drop("PRICE",axis=1))
print(r2_score(test_pd.PRICE, xgb_yhat))

In [None]:
#Clear cache to make sure we have as much free memory as possible for modeling

import gc

gc.collect()

torch.cuda.empty_cache()

In [None]:
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
dc = DataConnector.from_dataframe(train)

#Specify GPU usage 
gpu_scaling_config = XGBScalingConfig(use_gpu=True)

#Define distributed xgb estimator
dist_gpu_xgb = XGBEstimator(
    params = {"tree_method": "gpu_hist",
              "n_estimators":2000,},
    scaling_config = gpu_scaling_config)

In [None]:
#Train distributed xgb estimator
dist_gpu_xgb.fit(dc,
                 input_cols = train.drop("PRICE").columns,
                 label_col = "PRICE")

## While the model is training, you can see a live look at resource utilization by hovering your mouse over the 🟢 Active button that controls the kernel settings for your notebook.
### Notice the memory, CPU utilziation and GPU utilization while the model training executes

## While results aren't entirely determinstic, you should have seen a 3-4x speedup in model training time from OSS (single GPU) to Snowflake-Distributed (four GPUs) Training. 
### Note that while the wall time difference is not as pronounced, the pure training time is the key piece to consider here. 
## You can inspect the train time of your distributed training job by reading through the cell output of the above cell labeled *train_distributed_gpu_model*. Note the run time of the first and last iteration of the training job.  
### The pure training time of the distributed model should be ~25-30 seconds (compared to ~90s for single GPU training)

#### For a more comprehensive performance comparison please see this [engineering blog](https://www.snowflake.com/en/engineering-blog/machine-learning-container-runtime/) comparing the performance of various Snowflake Container Runtime functions including Distributed Multi-GPU Model Training

In [None]:
#Compute predictions and performance metrics
dist_xgb_yhat = dist_gpu_xgb.predict(test_pd.drop("PRICE",axis=1))
print(r2_score(test_pd.PRICE, dist_xgb_yhat))

In [None]:
#In our visualization below we can see that there is a reasonably tight correlation between predicted and actual prices for cars 
test_vis = test_pd[test_pd.PRICE>0]
sns.scatterplot(x=test_vis.PRICE, y = dist_gpu_xgb.predict(test_vis.drop("PRICE",axis=1)))

In [None]:
#Extract xgb booster object from Snowflake optimized XGB model
gpu_booster = dist_gpu_xgb.get_booster()
gpu_booster.predict(xgboost.DMatrix(test_pd.drop("PRICE", axis=1)))[0:5]

## Now we will log our model to Snowflake's Model Registry
### To learn more about the Model Registry please see our [documentation](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/overview)

In [None]:
from snowflake.ml.registry import Registry
from snowflake.ml.model import model_signature

# Define model name
model_name = "DISTRIBUTED_XGB_ON_GPU_QUICKSTART"
version_name = "DIST_XGB"

# Create a registry and log the model
model_registry = Registry(session=session, 
                          database_name=session.get_current_database(), 
                          schema_name=session.get_current_schema())

In [None]:
#Log model to Model Registry  (or retrieve model if already registered)
try: 
    logged_model = model_registry.log_model(
        model_name=model_name,
        version_name = version_name,
        model=gpu_booster,
        sample_input_data = test.drop("PRICE"),
        target_platforms= {"WAREHOUSE"}
        # options={"cuda_version": torch.version.cuda}) #Can add this line in for GPU inference support (see conclusion section of notebook for more info!)
    )
    print("Logged new model...")
except ValueError:
    logged_model = model_registry.get_model(model_name).version(version_name)
    print("Retrieved existing model!")
    

In [None]:
#Run inference against model in model registry
registry_preds = logged_model.run(test.drop("PRICE"), function_name="PREDICT")

In [None]:
#Show a few predictions
registry_preds.select([registry_preds.columns[-1]]).show(10)

In [None]:
#Use built in model registry functionality to compute Shapley values (row-level explanations)
exps = logged_model.run(test_pd.drop("PRICE",axis=1)[-1000:], function_name="EXPLAIN")

In [None]:
#Plot feature values and shapley values for YEAR
#Note that cars from recent years cost significantly more than older cars which makes sense!
from plotnine import ggplot, aes, geom_point, labs, theme_minimal, scale_x_continuous

exps["YEAR"] = test_pd.YEAR[-1000:].values

p = (ggplot(exps, aes(x='YEAR', y='YEAR_explanation')) +
     geom_point(size=3, color="#1E90FF", alpha=0.25) +
     labs(title="Influence of Year on Car Sales Price",
          x="Year",
          y="Year Influence")+
     scale_x_continuous(limits=(1980, 2022)))

# Show the plot
p.draw()

# Conclusion
## We have now completed our workflow which involved the below stages - 
### 👩‍💻 Read in data from an s3 bucket into a Snowflake Table and into our Notebook👩‍💻

### 🛠️ Filtered null values, identified & selected relevant columns, performed One-hot encoding and more to get our data model-ready 🛠️

### 🔮 Trained an XGBoost model on a single GPU 🔮

### 🔥 Easily accelerated the model training process using Snowflake's distributed model training framework to train our XGBoost model on multiple GPUs! 🔥
### 🚀 Logged our model to the Snowflake Model Registry where our model will be securely stored, versioned, and maintained 🚀
### 🔎 Finally we performed model inference and explainability on our model from the Model Registry 🔎
#### Here in our [docs](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/container) you can read up on how we could take this a step further and deploy this model to a GPU-container-based inference service!