![](https://www.snowflake.com/wp-content/themes/snowflake/assets/img/brand-guidelines/logo-sno-blue-example.svg)

# Build, Deploy and Monitor your Model in Snowflake

In this demo we will be showcasing how a complete model life cycle looks like in Snowflake. We will be using the following capabilities in Snowflake,

* Snowflake ML Python SDK
* Model Registry
* ML Observability
* Alerts + Stored Procedures

![](https://drive.google.com/file/d/1jWryVEAjyetHMRgTTMo_bnx_BZRdeNuC/view?usp=sharing)

>**Use case:** A bank has been dealing with loss of customers to competition. They want to understand the likelihood of each of their customer's churning so that they can take necessary action for users with high probablity of churning.

### **Features**

* **CREDITSCORE:** Credit score of the customer based on their historical credit behavior and management  
* **GEOGRAPHY:** Country of residence
* **GENDER:** Gender of the customer
* **AGE:** Age of the customer
* **TENURE:** Duration in years that they have been a customer
* **BALANCE:** Current balance of their bankaccount
* **NUMOFPRODUCTS:** Number of products purchased from the bank
* **HASCRCARD:** Does the customer have a credit card? - 1 if they do, 0 if they don't
* **ISACTIVEMEMBER:** Has the customer used their bank account in the last 3 months? - 1 if they did, 0 if they didn't
* **ESTIMATEDSALARY:** Estimated salary of the customer
* **DEBTTOINCOME:** Debt to income ratio

### **Model**

We will build a classification model using XGBoost framework within Snowflake ML

In [58]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark.context import get_active_session
session = get_active_session()
from datetime import datetime, timedelta
from snowflake.ml.registry import Registry
import joblib
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F

from snowflake.snowpark.functions import col, current_date, dateadd, random, floor,current_date, datediff

session.query_tag = {"origin":"sf_sit-is", "name":"mlops_customerchurn", "version":{"major":1, "minor":0}}

import snowflake.snowpark.functions as F
from IPython.display import Markdown, display

solution_prefix = session.get_current_warehouse()
solution_prefix

### Load synthetic data from the data_stage into a Snowflake table using a COPY INTO command.

In [66]:
-- Create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT 
    SKIP_HEADER = 1 
    TYPE = 'CSV';
    
CREATE OR REPLACE STAGE data_stage
    FILE_FORMAT = (TYPE = 'CSV') 
    URL = 's3://sfquickstarts/sfguide_getting_started_with_ml_observability_in_snowflake/mlops_customerchurn.csv';
    
-- Inspect content of stage
LS @data_stage;


Total exited customers: 1714 (Target: ~2000)
   CustomerId  Surname  CreditScore Geography  Gender  Age  Tenure    Balance  \
0           1    Johns          402    France    Male   55       9   91944.03   
1           2  Schultz          735     Spain    Male   59       8  126536.56   
2           3    Jones          570     Spain    Male   54       7  191357.66   
3           4    Baker          406    France  Female   73       3  125263.00   
4           5  Aguirre          371     Spain    Male   88       9  195626.75   

   NumOfProducts  HasCrCard  IsActiveMember  EstimatedSalary  Exited  \
0              1          1               1         36899.18       0   
1              2          0               0         33120.74       0   
2              2          1               1         34751.09       1   
3              4          0               0        169844.77       0   
4              4          0               1         13787.72       0   

  TransactionTimestamp  debttoincom

### Read a CSV file using Snowpark from a stage in Snowflake into a DataFrame. 

In [67]:
spdf = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@data_stage")



In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, current_date, dateadd, to_date,lit

# Step 1: Get today's date
todays_date = datetime.now()

latest_date = max(spdf.select('TRANSACTIONTIMESTAMP').collect())[0]

# Step 3: Calculate the difference in days
diff_days = (todays_date - latest_date).days - 1

df = spdf.with_column(
    "TRANSACTIONTIMESTAMP", 
    dateadd("day", lit(diff_days), col("TRANSACTIONTIMESTAMP"))
)

df.show()

# If you need to update the table in Snowflake:
df.write.mode("overwrite").save_as_table("CUSTOMERS")


In [75]:
spdf= df.drop('ROWNUMBER')



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"CREDITSCORE"  |"GEOGRAPHY"  |"GENDER"  |"AGE"  |"TENURE"  |"BALANCE"  |"NUMOFPRODUCTS"  |"HASCRCARD"  |"ISACTIVEMEMBER"  |"ESTIMATEDSALARY"  |"EXITED"  |"TRANSACTIONTIMESTAMP"  |"DEBTTOINCOME"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1             |402            |France       |Male      |55     |9         |91944.03   |1                |1            |1                 |36899.18           |0         |2022-01-09 14:08:54     |23              |
|2             |735            |Spain        |Male      |59     |8         |126536.56  |2                |0            |0                 |33120.74 

#### Define a preprocessing pipeline using Pipeline with two steps: Ordinal Encoding for categorical columns and Min-Max Scaling for numerical columns. It then splits the data into training and testing sets, applies the preprocessing steps to the training data, and saves the pipeline as a joblib file (preprocessing_pipeline.joblib) .

In [None]:
num_cols = ['ESTIMATEDSALARY', 'BALANCE', 'CREDITSCORE','AGE','TENURE','DEBTTOINCOME']
output_cols=['EstimatedSalary_SS', 'Balance_SS', 'CreditScore_SS','Age_SS','Tenure_SS','debttoincome_SS']

cat_cols = ['HasCrCard', 'IsActiveMember', 'Geography','Gender', 'NumOfProducts']
string_columns = ['GEOGRAPHY', 'GENDER']
preprocessing_pipeline = Pipeline(
    steps=[
            (
                "OE",
                pp.OrdinalEncoder(
                    input_cols=string_columns,
                    output_cols=string_columns,
                )
                
            ),
            (
                "MMS",
                pp.MinMaxScaler(
                    clip=True,
                    input_cols=num_cols,
                    output_cols=output_cols,
                    drop_input_cols= True,
                )
            )
    ]
)

PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first
training, testing = spdf.random_split(weights=[0.8, 0.2], seed=111)
training_spdf = preprocessing_pipeline.fit(training).transform(training)

#### Store the pipeline file in a stage

In [None]:
session.sql("CREATE or replace stage ML_STAGE").collect()
session.file.put(PIPELINE_FILE, "@ML_STAGE", overwrite=True)

In [None]:
ls @ML_STAGE

## Build the XGBClassifier model and train using the training data

In [83]:
num_cols = ['EstimatedSalary', 'Balance', 'CreditScore','Age','Tenure','debttoincome']

cat_cols = ['HasCrCard', 'IsActiveMember', 'Geography','Gender', 'NumOfProducts']
# Split dataset into training and testing 
#training, testing = spdf.random_split(weights=[0.8, 0.2], seed=111)
Target = ["EXITED"]

feature_names_input = [c for c in training_spdf.columns if c not in ["EXITED", "TRANSACTIONTIMESTAMP", "CUSTOMERID"]]

# Define the output column name for the predicted label
output_label = ["PREDICTED_CHURN"]



# Initialize a XGBClassifier object with input, label, and output column names
model = XGBClassifier(
    input_cols=feature_names_input,
    label_cols=Target,
    output_cols=output_label,
)

# Train the classifier model using the training set
_ = model.fit(training_spdf)



<snowflake.snowpark.dataframe.DataFrame at 0x33cc21a60>

### Initalize Snowflake Model Registry

In [None]:
from snowflake.ml.registry import Registry
from snowflake.ml.model import type_hints

reg = Registry(session=session)

MODEL_NAME = "QS_CustomerChurn_classifier"
MODEL_VERSION = "v1"

mv = reg.log_model(model,
                   model_name=MODEL_NAME,
                   version_name=MODEL_VERSION,
                   options={'relax_version': False},
                   task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION)
reg.show_models()


## Stored Procedure for carrying ongoing inference

In [None]:
from snowflake import snowpark
from snowflake.ml.registry import Registry
import joblib
import os
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F

def inference_sp(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:
    reg = Registry(session=session)
    m = reg.get_model(modelname)  # Fetch the model using the registry
    mv = m.version(modelversion)
    input_table_name=table_name
    
    # Load preprocessing pipeline from a file
    session.file.get('@ML_STAGE/preprocessing_pipeline.joblib.gz', '/tmp')
    PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib.gz'
    
    database=session.get_current_database()
    
    schema=session.get_current_schema()
    column_check_sql = f"""
    SELECT COUNT(*) 
    FROM INFORMATION_SCHEMA.COLUMNS 
    WHERE TABLE_NAME = '{input_table_name}'  
    AND COLUMN_NAME = 'PREDICTED_CHURN'
    AND TABLE_SCHEMA = '{schema}' 
    AND TABLE_CATALOG = '{database}';
    """
    
    column_exists = session.sql(column_check_sql).collect()[0][0]

    # Only add the column if it doesn't exist
    if column_exists == 0:
        session.sql(f'ALTER TABLE {input_table_name} ADD COLUMN PREDICTED_CHURN INT').collect()
    
    # Check if the file was downloaded successfully
    if not os.path.exists(PIPELINE_FILE):
        raise FileNotFoundError('Preprocessing pipeline not found in /tmp directory')
    
    
    # Load preprocessing pipeline from the downloaded file
    preprocessing_pipeline = joblib.load(PIPELINE_FILE)  # Load the preprocessing pipeline
    
    # Read the temporary DataFrame
    df = session.table(input_table_name)
    df = df.with_column("PREDICTED_CHURN", F.lit(9999))
    
    df.write.save_as_table("CUSTOMERCHURN_PREDICTIONS_OUTPUT", mode='append')
    # Apply the preprocessing pipeline to the input DataFrame
    testing_spdf = preprocessing_pipeline.fit(df).transform(df)
    
    # Perform prediction using the model
    results = mv.run(testing_spdf, function_name="predict")  # 'results' is the output DataFrame with predictions
   
    temp_results_table = "TEMP_PREDICTION_RESULTS"
    results.write.save_as_table(temp_results_table, mode='overwrite')
    
    update_sql = f"""
    UPDATE CUSTOMERCHURN_PREDICTIONS_OUTPUT t
    SET PREDICTED_CHURN = r.PREDICTED_CHURN
    FROM TEMP_PREDICTION_RESULTS r
    WHERE t.CUSTOMERID = r.CUSTOMERID;
    """
    
    # Execute the update statement
    session.sql(update_sql).collect()

    update_sql1 = f"""
    UPDATE {input_table_name} t
    SET PREDICTED_CHURN = r.PREDICTED_CHURN
    FROM TEMP_PREDICTION_RESULTS r
    WHERE t.CUSTOMERID = r.CUSTOMERID
    AND t.TRANSACTIONTIMESTAMP=r.TRANSACTIONTIMESTAMP ;
    """
    
    # Execute the update statement
    session.sql(update_sql1).collect()

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=inference_sp,
    name="inference_sp",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)


In [None]:
testing.write.save_as_table("customer_churn", mode="overwrite")

In [None]:
CALL inference_sp('CUSTOMERS','QS_CUSTOMERCHURN_CLASSIFIER', 'v1');
CALL inference_sp('CUSTOMER_CHURN','QS_CUSTOMERCHURN_CLASSIFIER', 'v1');

# Enable Monitoring
Create a model monitor using the CREATE MODEL MONITOR command. The monitor object automatically refreshes the monitor logs by querying source data and updates the monitoring reports based on the logs.

In [None]:
query = f"""
CREATE OR REPLACE MODEL MONITOR CHURN_MODEL_MONITOR
WITH
    MODEL=QS_CustomerChurn_classifier
    VERSION=v1
    FUNCTION=predict
    SOURCE=CUSTOMER_CHURN
    BASELINE=CUSTOMERS
    TIMESTAMP_COLUMN=TRANSACTIONTIMESTAMP
    PREDICTION_CLASS_COLUMNS=(PREDICTED_CHURN)  
    ACTUAL_CLASS_COLUMNS=(EXITED)
    ID_COLUMNS=(CUSTOMERID)
    WAREHOUSE=ML_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""
session.sql(query).collect()

Open the Dashboard by navigating to Studio->Models

Click on your model and choose the monitor that you just added above.Change the date range to "Last 12 months".