In [1]:
import pandas as pd
import numpy as np
import sklearn
import math
import pickle
import datetime

# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor, XGBClassifier
from snowflake.ml._internal.utils import identifier
from snowflake.ml.registry import Registry

#Snowflake feature store
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

# Snowpark session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import col, to_timestamp
from snowflake.snowpark.types import IntegerType



# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()
session


In [None]:
df = session.table("MORTGAGE_LENDING_RAW")
df.show(5)

In [None]:
from snowflake.snowpark.functions import min, max
df.select(min('TIMESTAMP'), max('TIMESTAMP'))

In [None]:
#Create a dict with keys for feature names and values containing transform code

feature_eng_dict = dict()

feature_eng_dict["TIMESTAMP"] = to_timestamp("TIMESTAMP")
feature_eng_dict["LOAN_AMOUNT"] = col("LOAN_AMOUNT_000s")*1000
feature_eng_dict["INCOME"] = col("APPLICANT_INCOME_000s")*1000
feature_eng_dict["HIGH_INCOME_FLAG"] = (col("INCOME")>col("HUD_MEDIAN_FAMILY_INCOME")).astype(IntegerType())


df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

In [None]:
df.explain()

In [None]:
fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=session.get_current_schema(), 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [None]:
fs.list_entities()

In [None]:
#First try to retrieve an existing entity definition, if not define a new one and register
try:
    #retrieve existing entity
    loan_id_entity = fs.get_entity('LOAN_ID_ENTITY') 
    print('Retrieved existing entity')
except:
    #define new entity
    loan_id_entity = Entity(
        name = "LOAN_ID_ENTITY",
        join_keys = ["LOAN_ID"],
        desc = "Features defined on a per loan level")
    #register
    fs.register_entity(loan_id_entity)
    print("Registered new entity")

In [None]:
#Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
feature_df = df.select(["LOAN_ID"]+list(feature_eng_dict.keys()))
feature_df.show(5)

In [None]:
#define and register feature view
loan_fv = FeatureView(
    name="Mortgage_Feature_View",
    entities=[loan_id_entity],
    feature_df=feature_df,
    timestamp_col="TIMESTAMP")

loan_fv = fs.register_feature_view(loan_fv, version="1", overwrite=True)

In [None]:
fs.list_feature_views()

In [None]:
ds = fs.generate_dataset(
    name="MORTGAGE_DATASET_V1",
    spine_df=df.drop("LOAN_AMOUNT_000S", "LOAN_AMOUNT", "APPLICANT_INCOME_000S", "INCOME", "HIGH_INCOME_FLAG"), #only need the features used to fetch rest of feature view
    features=[loan_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["MORTGAGERESPONSE"]
)

In [None]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

In [None]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import StringType

OHE_COLS = ds_sp.select([col.name for col in ds_sp.schema if col.datatype ==StringType()]).columns
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]


# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)
ds_sp_ohe.columns

In [None]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=0)

In [None]:
train = train.fillna(0)
test = test.fillna(0)

In [None]:
from snowflake.ml.modeling.xgboost import XGBClassifier

snow_xgb_tree = XGBClassifier(
    input_cols=train.drop(["PRICE", "TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).columns,
    label_cols=train.select("MORTGAGERESPONSE").columns,
    output_cols="MORTGAGE_PREDICTION",
    # tree_method="exact",
    # n_estimators=10,
    booster = 'gbtree'
)


snow_xgb_linear = XGBClassifier(
    input_cols=train.drop(["PRICE", "TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).columns,
    label_cols=train.select("MORTGAGERESPONSE").columns,
    output_cols="MORTGAGE_PREDICTION",
    # tree_method="hist",
    # n_estimators=10
    booster= 'gblinear'
)


In [None]:
snow_xgb_tree.fit(train)

In [None]:
snow_xgb_linear.fit(train)

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
test_preds_tree = snow_xgb_tree.predict(test).select(["MORTGAGERESPONSE", "MORTGAGE_PREDICTION"]).to_pandas()
test_preds_linear = snow_xgb_linear.predict(test).select(["MORTGAGERESPONSE", "MORTGAGE_PREDICTION"]).to_pandas()

f1_tree = f1_score(test_preds_tree.MORTGAGERESPONSE, test_preds_tree.MORTGAGE_PREDICTION)
f1_linear = f1_score(test_preds_linear.MORTGAGERESPONSE, test_preds_linear.MORTGAGE_PREDICTION)

precision_tree = precision_score(test_preds_tree.MORTGAGERESPONSE, test_preds_tree.MORTGAGE_PREDICTION)
precision_linear = precision_score(test_preds_linear.MORTGAGERESPONSE, test_preds_linear.MORTGAGE_PREDICTION)

recall_tree = recall_score(test_preds_tree.MORTGAGERESPONSE, test_preds_tree.MORTGAGE_PREDICTION)
recall_linear = recall_score(test_preds_linear.MORTGAGERESPONSE, test_preds_linear.MORTGAGE_PREDICTION)

print(f'GB Tree: \n f1: {f1_tree} \n precision {precision_tree} \n recall: {recall_tree}')
print(f'GB Linear: \n f1: {f1_linear} \n precision {precision_linear} \n recall: {recall_linear}')


# Model Registry

In [None]:
#Create a snowflake model registry object 
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier
from snowflake.ml.model import model_signature

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())


# Define model name
model_name = "MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO"

# Create a registry to log the model to
model_registry = Registry(session=session, 
                          database_name=db, 
                          schema_name=schema,
                          options={"enable_monitoring": True})

In [None]:
#Deploy the tree booster model to the model registry

tree_version_name = 'GB_TREE'

try:
    mv_tree = model_registry.get_model(model_name).version(tree_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_tree = model_registry.log_model(
        model_name=model_name,
        model=snow_xgb_tree, 
        version_name=tree_version_name,
        comment = "snow ml model built off feature store using tree booster",
    )
    mv_tree.set_metric(metric_name="F1_score", value=f1_tree)
    mv_tree.set_metric(metric_name="Precision_score", value=precision_tree)
    mv_tree.set_metric(metric_name="Recall_score", value=recall_tree)

#Now the linear booster model
linear_version_name = 'GB_LINEAR'

try:
    mv_linear = model_registry.get_model(model_name).version(linear_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_linear = model_registry.log_model(
        model_name=model_name,
        model=snow_xgb_linear, 
        version_name=linear_version_name,
        comment = "snow ml model built off feature store using linear booster",
    )
    mv_linear.set_metric(metric_name="F1_score", value=f1_linear)
    mv_linear.set_metric(metric_name="Precision_score", value=precision_linear)
    mv_linear.set_metric(metric_name="Recall_score", value=recall_linear)

In [None]:
model_registry.show_models()

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
print(mv_tree)
print(mv_tree.show_metrics())
print(mv_linear)
print(mv_linear.show_metrics())

In [None]:
mv_tree.show_functions()

In [None]:
reg_preds = mv_tree.run(test, function_name = "predict")
reg_preds.show(10)

In [None]:
shap_vals = mv_tree.run(test.sample(n=1000), function_name="explain")

In [None]:
shap_pd = shap_vals.to_pandas()

In [None]:
import shap 
just_shap = shap_pd.iloc[:, 10:]
just_input_vals = shap_pd.iloc[:, :10].drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1)

shap.summary_plot(np.array(just_shap), just_input_vals, feature_names = just_input_vals.columns)

In [None]:
shap_pd.iloc[:, 10:].mean(axis=0).sort_values(ascending=False)

In [None]:
import seaborn as sns

sns.scatterplot(data = shap_pd, x ="LOAN_PURPOSE_NAME_Home purchase", y = "LOAN_PURPOSE_NAME_Home purchase_explanation")

In [None]:
import seaborn as sns

income_0_to_1M = shap_pd[(shap_pd.INCOME>0) & (shap_pd.INCOME<1000000)]
sns.scatterplot(data = income_0_to_1M, x ="INCOME", y = "INCOME_explanation")

# Model Monitoring setup

In [None]:
train.write.save_as_table("DEMO_MORTGAGE_LENDING_TRAIN", mode="overwrite")
test.write.save_as_table("DEMO_MORTGAGE_LENDING_TEST", mode="overwrite")

In [None]:
session.sql("CREATE stage IF NOT EXISTS ML_STAGE").collect()

In [None]:
from snowflake import snowpark
from snowflake.ml.registry import Registry
import joblib
import os
import logging
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F


def demo_inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:
    
    database=session.get_current_database()
    schema=session.get_current_schema()
    reg = Registry(session=session)
    m = reg.get_model(modelname)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the temporary DataFrame
    df = session.table(input_table_name)

    # Perform prediction using the model
    results = mv.run(df, function_name="predict")  # 'results' is the output DataFrame with predictions
    results = results.withColumnRenamed("MORTGAGE_PREDICTION", pred_col)

    # Write results to a temporary Snowflake table
    temp_results_table = "DEMO_TEMP_PREDICTION_RESULTS"
    results.write.save_as_table(temp_results_table, mode='overwrite')

    
    # # Execute the update statement

    df = df.with_column(pred_col, F.lit(9999))
    df.write.save_as_table(input_table_name, mode='overwrite')
    update_sql1 = f"""
    UPDATE {input_table_name} t
    SET {pred_col} = r.{pred_col}
    FROM DEMO_TEMP_PREDICTION_RESULTS r
    WHERE t.LOAN_ID = r.LOAN_ID
    AND t.TIMESTAMP=r.TIMESTAMP ;
    """
    
    # Execute the update statement
    session.sql(update_sql1).collect()

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=demo_inference_sproc,
    name="demo_mortgage_lending_inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)


In [None]:
CALL demo_mortgage_lending_inference_sproc('DEMO_MORTGAGE_LENDING_TRAIN','MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO', 'GB_TREE');

In [None]:
CALL demo_mortgage_lending_inference_sproc('DEMO_MORTGAGE_LENDING_TEST','MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO', 'GB_TREE');

In [None]:
CALL demo_mortgage_lending_inference_sproc('DEMO_MORTGAGE_LENDING_TRAIN','MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO', 'GB_LINEAR');

In [None]:
CALL demo_mortgage_lending_inference_sproc('DEMO_MORTGAGE_LENDING_TEST','MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO', 'GB_LINEAR');

In [None]:
select * FROM DEMO_MORTGAGE_LENDING_TEST limit 5

In [None]:
# from snowflake.ml.monitoring.entities.model_monitor_config import ModelMonitorConfig, ModelMonitorSourceConfig


# # Set up source/baseline table config for tree booster
# tree_source_config = ModelMonitorSourceConfig(
#     baseline = "DEMO_MORTGAGE_LENDING_TRAIN",
#     source="DEMO_MORTGAGE_LENDING_TEST",
#     timestamp_column="TIMESTAMP",
#     prediction_score_columns=["GB_TREE_PREDICTION"],
#     actual_score_columns=["MORTGAGERESPONSE"],
#     id_columns=["LOAN_ID"]
# )

# # Set up model config for tree booster
# tree_monitor_config = ModelMonitorConfig(
#     model_version=mv_tree,
#     model_function_name="predict",
#     background_compute_warehouse_name="ML_WH"
# )

# # Set up source/baseline table config for linear booster
# linear_source_config = ModelMonitorSourceConfig(
#     baseline = "DEMO_MORTGAGE_LENDING_TRAIN",
#     source="DEMO_MORTGAGE_LENDING_TEST",
#     timestamp_column="TIMESTAMP",
#     prediction_score_columns=["GB_LINEAR_PREDICTION"],
#     actual_score_columns=["MORTGAGERESPONSE"],
#     id_columns=["LOAN_ID"]
# )

# # Set up model config for linear booster
# linear_monitor_config = ModelMonitorConfig(
#     model_version=mv_linear,
#     model_function_name="predict",
#     background_compute_warehouse_name="ML_WH"
# )

In [None]:
# # Add a new ModelMonitor
# model_monitor = model_registry.add_monitor(
#     name="GB_TREE_MORTGAGE_LENDING_MODEL_MONITOR", 
#     source_config=tree_source_config,
#     model_monitor_config=tree_monitor_config,
# )


# model_monitor = model_registry.add_monitor(
#     name="GB_MORTGAGE_LENDING_MODEL_MONITOR", 
#     source_config=linear_source_config,
#     model_monitor_config=linear_monitor_config,
# )

In [None]:
CREATE OR REPLACE MODEL MONITOR GB_TREE_MORTGAGE_LENDING_MODEL_MONITOR_SQL
WITH
    MODEL=MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO
    VERSION=GB_TREE
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(GB_TREE_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE=ML_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
CREATE OR REPLACE MODEL MONITOR DEMO_MORTGAGE_LENDING_MODEL_MONITOR_SQL
WITH
    MODEL=MORTGAGE_LENDING_MLOPS_AB_TESTING_DEMO
    VERSION=GB_LINEAR
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(GB_LINEAR_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE=ML_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

# Debug Montitoring Findings [WIP]

In [None]:
# debug_df = session.table("DEMO_MORTGAGE_LENDING_TEST").to_pandas()
# low_accuracy_period= debug_df[(debug_df.TIMESTAMP>datetime.datetime(2024,9,1)) & (debug_df.TIMESTAMP<datetime.datetime(2024,9,30))]
# f1_score(low_accuracy_period.MORTGAGERESPONSE, low_accuracy_period.GB_TREE_PREDICTION)

## Conclusion

#### 🛠️ Snowflake Feature Store tracks feature definitions and maintains lineage of sources and destinations 🛠️
#### 🚀 Snowflake Model Registry gives users a secure and flexible framework to deploy track and monitor models 🚀
#### 🔮 All model versions logged in the Model Registry can be accessed for inference, explainability, lineage tracking, visibility and more 🔮
