# Snowflake ML Model Registry

In this worfklow we will work through the following elements of a typical tabular machine learning pipeline.

### 1. Use Feature Store to track engineered features
* Store feature defintions in feature store for reproducible computation of ML features
      
### 2. Train two Models using the Snowflake ML APIs
* Baseline XGboost
* XGboost with optimal hyper-parameters identified via Snowflake ML distributed HPO methods

### 3. Register both models in Snowflake model registry
* Explore model registry capabilities such as **metadata tracking, inference, and explainability**
* Compare model metrics on train/test set to identify any issues of model performance or overfitting
* Tag the best performing model version as 'default' version

In [None]:
!pip install shap

In [None]:
#Update this VERSION_NUM to version your features, models etc!
VERSION_NUM = '0'
DB = "E2E_SNOW_MLOPS_DB" 
SCHEMA = "MLOPS_SCHEMA" 
COMPUTE_WAREHOUSE = "E2E_SNOW_MLOPS_WH" 

In [1]:
import pandas as pd
import numpy as np
import sklearn
import math
import pickle
import shap
from datetime import datetime
import streamlit as st
from xgboost import XGBClassifier

# Snowpark ML
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm

#Snowflake feature store
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

# Snowpark session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import col, to_timestamp, min, max, month, dayofweek, dayofyear, avg, date_add, sql_expr
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark import Window

#setup snowpark session
from snowflake.snowpark.context import get_active_session
session = get_active_session()
session

In [None]:
try:
    print("Reading table data...")
    df = session.table("MORTGAGE_LENDING_DEMO_DATA")
    df.show(5)
except:
    print("Table not found! Uploading data to snowflake table")
    df_pandas = pd.read_csv("data/MORTGAGE_LENDING_DEMO_DATA.csv.zip")
    session.write_pandas(df_pandas, "data/MORTGAGE_LENDING_DEMO_DATA", auto_create_table=True)
    df = session.table("MORTGAGE_LENDING_DEMO_DATA")
    df.show(5)

## Observe Snowflake Snowpark table properties

In [None]:
df.select(min('TS'), max('TS'))

In [None]:
#Get current date and time
current_time = datetime.now()
df_max_time = datetime.strptime(str(df.select(max("TS")).collect()[0][0]), "%Y-%m-%d %H:%M:%S.%f")

#Find delta between latest existing timestamp and today's date
timedelta = current_time- df_max_time

#Update timestamps to represent last ~1 year from today's date
df.select(min(date_add(to_timestamp("TS"), timedelta.days-1)), max(date_add(to_timestamp("TS"), timedelta.days-1)))

## Feature Engineering with Snowpark APIs

In [None]:
#Create a dict with keys for feature names and values containing transform code

feature_eng_dict = dict()

#Timstamp features
feature_eng_dict["TIMESTAMP"] = date_add(to_timestamp("TS"), timedelta.days-1)
feature_eng_dict["MONTH"] = month("TIMESTAMP")
feature_eng_dict["DAY_OF_YEAR"] = dayofyear("TIMESTAMP") 
feature_eng_dict["DOTW"] = dayofweek("TIMESTAMP")

# df= df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

#Income and loan features
feature_eng_dict["LOAN_AMOUNT"] = col("LOAN_AMOUNT_000s")*1000
feature_eng_dict["INCOME"] = col("APPLICANT_INCOME_000s")*1000
feature_eng_dict["INCOME_LOAN_RATIO"] = col("INCOME")/col("LOAN_AMOUNT")

county_window_spec = Window.partition_by("COUNTY_NAME")
feature_eng_dict["MEAN_COUNTY_INCOME"] = avg("INCOME").over(county_window_spec)
feature_eng_dict["HIGH_INCOME_FLAG"] = (col("INCOME")>col("MEAN_COUNTY_INCOME")).astype(IntegerType())

feature_eng_dict["AVG_THIRTY_DAY_LOAN_AMOUNT"] =  sql_expr("""AVG(LOAN_AMOUNT) OVER (PARTITION BY COUNTY_NAME ORDER BY TIMESTAMP  
                                                            RANGE BETWEEN INTERVAL '30 DAYS' PRECEDING AND CURRENT ROW)""")

df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())
df.show(3)

In [None]:
df.explain()

## Create a Snowflake Feature Store

In [None]:
fs = FeatureStore(
    session=session, 
    database=DB, 
    name=SCHEMA, 
    default_warehouse=COMPUTE_WAREHOUSE,
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [None]:
fs.list_entities()

## Feature Store configuration
- create/register entities of interest

In [None]:
#First try to retrieve an existing entity definition, if not define a new one and register
try:
    #retrieve existing entity
    loan_id_entity = fs.get_entity('LOAN_ENTITY') 
    print('Retrieved existing entity')
except:
#define new entity
    loan_id_entity = Entity(
        name = "LOAN_ENTITY",
        join_keys = ["LOAN_ID"],
        desc = "Features defined on a per loan level")
    #register
    fs.register_entity(loan_id_entity)
    print("Registered new entity")

In [None]:
#Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
feature_df = df.select(["LOAN_ID"]+list(feature_eng_dict.keys()))
feature_df.show(5)

Here, the feature store references an existing table. 

We could also define the dataframe via the use of Snowpark APIs, and use that dataframe (or a function that returns a dataframe) as the feature view definition, below.

In [None]:
#define and register feature view
loan_fv = FeatureView(
    name="Mortgage_Feature_View",
    entities=[loan_id_entity],
    feature_df=feature_df,
    timestamp_col="TIMESTAMP",
    refresh_freq="1 day")

#add feature level descriptions

loan_fv = loan_fv.attach_feature_desc(
    {
        "MONTH": "Month of loan",
        "DAY_OF_YEAR": "Day of calendar year of loan",
        "DOTW": "Day of the week of loan",
        "LOAN_AMOUNT": "Loan amount in $USD",
        "INCOME": "Household income in $USD",
        "INCOME_LOAN_RATIO": "Ratio of LOAN_AMOUNT/INCOME",
        "MEAN_COUNTY_INCOME": "Average household income aggregated at county level",
        "HIGH_INCOME_FLAG": "Binary flag to indicate whether household income is higher than MEAN_COUNTY_INCOME",
        "AVG_THIRTY_DAY_LOAN_AMOUNT": "Rolling 30 day average of LOAN_AMOUNT"
    }
)

loan_fv = fs.register_feature_view(loan_fv, version=VERSION_NUM, overwrite=True)

In [None]:
fs.list_feature_views()

In [None]:
#Create link to feature store UI to inspect newly created feature view!
org_name = session.sql('SELECT CURRENT_ORGANIZATION_NAME()').collect()[0][0]
account_name = session.sql('SELECT CURRENT_ACCOUNT_NAME()').collect()[0][0]

st.write(f'https://app.snowflake.com/{org_name}/{account_name}/#/features/database/{DB}/store/{SCHEMA}')

## Retrieve a Dataset from the featureview

Snowflake Datasets are immutable, file-based objects that exist within your Snowpark session. 

They can be written to persistent Snowflake objects as needed. 

In [None]:
ds = fs.generate_dataset(
    name=f"MORTGAGE_DATASET_EXTENDED_FEATURES_{VERSION_NUM}",
    spine_df=df.select("LOAN_ID", "TIMESTAMP", "LOAN_PURPOSE_NAME","MORTGAGERESPONSE"), #only need the features used to fetch rest of feature view
    features=[loan_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["MORTGAGERESPONSE"]
)

In [None]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

In [None]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import StringType

OHE_COLS = ds_sp.select([col.name for col in ds_sp.schema if col.datatype ==StringType()]).columns
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]


# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)

#Rename columns to avoid double nested quotes and white space chars
rename_dict = {}
for i in ds_sp_ohe.columns:
    if '"' in i:
        rename_dict[i] = i.replace('"','').replace(' ', '_')

ds_sp_ohe = ds_sp_ohe.rename(rename_dict)
ds_sp_ohe.columns

In [None]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=0)

In [None]:
train = train.fillna(0)
test = test.fillna(0)

In [None]:
train_pd = train.to_pandas()
test_pd = test.to_pandas()

## Model Training
### Below we will define and fit an xgboost classifier as our baseline model and evaluate the performance
##### Note this is all done with OSS frameworks

In [None]:
#Define model config
xgb_base = XGBClassifier(
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

In [None]:
#Split train data into X, y
X_train_pd = train_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1) #remove
y_train_pd = train_pd.MORTGAGERESPONSE

#train model
xgb_base.fit(X_train_pd,y_train_pd)

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
train_preds_base = xgb_base.predict(X_train_pd) #update this line with correct ata

f1_base_train = round(f1_score(y_train_pd, train_preds_base),4)
precision_base_train = round(precision_score(y_train_pd, train_preds_base),4)
recall_base_train = round(recall_score(y_train_pd, train_preds_base),4)

print(f'F1: {f1_base_train} \nPrecision {precision_base_train} \nRecall: {recall_base_train}')

# Model Registry

- Log models with important metadata
- Manage model lifecycles
- Serve models from Snowflake runtimes

In [None]:
#Create a snowflake model registry object 
from snowflake.ml.registry import Registry

# Define model name
model_name = f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}"

# Create a registry to log the model to
model_registry = Registry(session=session, 
                          database_name=DB, 
                          schema_name=SCHEMA,
                          options={"enable_monitoring": True})

In [None]:
#Log the base model to the model registry (if not already there)
base_version_name = 'XGB_BASE'

try:
    #Check for existing model
    mv_base = model_registry.get_model(model_name).version(base_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    #Log model to registry
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=xgb_base, 
        version_name=base_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100), #using snowpark df to maintain lineage
        comment = f"""ML model for predicting loan approval likelihood.
                    This model was trained using XGBoost classifier.
                    Hyperparameters used were:
                    max_depth={xgb_base.max_depth}, 
                    n_estimators={xgb_base.n_estimators}, 
                    learning_rate = {xgb_base.learning_rate}, 
                    algorithm = {xgb_base.booster}
                    """,
        target_platforms= ["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
        options= {"enable_explainability": True}

    )
    
    #set metrics
    mv_base.set_metric(metric_name="Train_F1_Score", value=f1_base_train)
    mv_base.set_metric(metric_name="Train_Precision_Score", value=precision_base_train)
    mv_base.set_metric(metric_name="Train_Recall_score", value=recall_base_train)

In [None]:
#Create tag for PROD model
session.sql("CREATE OR REPLACE TAG PROD")

In [None]:
#Apply prod tag 
m = model_registry.get_model(model_name)
m.comment = "Loan approval prediction models" #set model level comment
m.set_tag("PROD", base_version_name)
m.show_tags()

In [None]:
model_registry.show_models()

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
print(mv_base)
print(mv_base.show_metrics())

In [None]:
mv_base.show_functions()

In [None]:
reg_preds = mv_base.run(test, function_name = "predict").rename(col('"output_feature_0"'), "MORTGAGE_PREDICTION")
reg_preds.show(10)

In [None]:
#ds_sp_ohe = ds_sp_ohe.rename(col('"LOAN_PURPOSE_NAME_Home improvement"'), "LOAN_PURPOSE_NAME_Home_improvement")

preds_pd = reg_preds.select(["MORTGAGERESPONSE", "MORTGAGE_PREDICTION"]).to_pandas()
f1_base_test = round(f1_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
precision_base_test = round(precision_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
recall_base_test = round(recall_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)

#log metrics to model registry model
mv_base.set_metric(metric_name="Test_F1_Score", value=f1_base_test)
mv_base.set_metric(metric_name="Test_Precision_Score", value=precision_base_test)
mv_base.set_metric(metric_name="Test_Recall_score", value=recall_base_test)

print(f'F1: {f1_base_test} \nPrecision {precision_base_test} \nRecall: {recall_base_test}')