# Retail Customers Churn Prediction

This notebook will use a fictition dataset that contains cusomer data, purchases done by the customer and any feedback provided about the service. The goal is to understand customer purchase behavior, analyze the feedback received and automaticaly identify what customers will not be willing to make more purchases in the shop (churn). 

The demo shows how to combine both structured and unstructured data and process it all securely within Snowflake. It will show how to:

- Use Snowflake Analytical transformation capabilities to understand customer behavior
- Analyze unstructured data (feedback text, commments, emails, etc) to understand customer sentiment
- Use Snowflake Feature Store to serve features for training, testing and inference
- Train Machine Learning models and store them in Snowflake Model Registry
- Serve models for inference from the Model Registry
- Use Model Monitoring to detect data drift and models performance
- Lastly, review Lineage to undertand all process end-to-end

It will show a fictitious scenario where based on a frequency (dayly, weekly, monthly) sales data is ingested, and customer churn will be predicted so actions can be taken.

A Strealit App will show for the last data ingestion, what are the top customers we should engage and review as they have the greatest possibility of now buying again.

In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


This notebook will be using the DEV schema. Separation of diffent environts can be done using Schemas and RBAC to grant access to different roles. It also simpliffies all CI/CD and MLOps steps.

sales_churn_baseline will be the table where features showing custoemr behavior will be created. For a given timestamp, we will have each customer profile of previous sales and feedback provided.

This will be used to feed th ML models and inference prediction of buying agian.

In [None]:
session.sql('use schema DEV').collect()
session.sql('drop table if exists sales_churn_baseline').collect()

sales is the table with all transactions. Let's see what is the first and last one we have:

In [None]:
from snowflake.snowpark import functions as F

sales_df = session.table("sales")

first_sale_timestamp = sales_df.select(F.min(F.col("transaction_date"))).collect()[0][0]

last_sale_timestamp = sales_df.select(F.max(F.col("transaction_date"))).collect()[0][0]

days_between = (last_sale_timestamp - first_sale_timestamp).days

print(f'First sale: {first_sale_timestamp}')
print(f'Last sale: {last_sale_timestamp}')
print(f'Days between first and last sale: {days_between}')


### Training and Testing

We collect training and testing timestamps and we use them to create a customer profile that will be used to train and test the model. Testing data is after training.

Features are created usng the Stored Procedure uc01_feature_engineering that has been defined in the previous notebook. It creates the customer profile for a given timestamp. With that customer profile, we will run predict to determine if a customer is going to churn or not.

After creating the profile, we will take a look to the next transactions to determine if the cusotmer really churned or not.

Labeling will create the right CHURN label so we can continuosly monitor the model performance




In [None]:
from datetime import datetime, timedelta
from snowflake.snowpark import functions as F

db = session.get_current_database()
sc = session.get_current_schema()
print (f'database: {db}, schema: {sc}')

churn_window = 30 ## This is the value we define as churn

windows_back =  6 # until streams and tasks are implemented I just add all data from now, so we look back

table_features = 'churn_baseline'

session.sql(f'drop table if exists {table_features} ').collect()

sales_df = session.table("sales")

#Find the most recent sales transaction to take it as starting point
latest_transaction = sales_df.select(F.max(F.col("transaction_date"))).collect()[0][0]

#Take testing data churn_window before last transaction so we can label it correctly later
timestamp_testing = latest_transaction - timedelta(days=churn_window + churn_window * windows_back)

print (f'timestamp for testing: {timestamp_testing}')

# This will create the profile (feature engineering) for each customer at that timestamp
session.call('uc01_feature_engineering_sproc', db, sc, timestamp_testing, table_features)

#Now create the baseline data for training (this happens churn_window days before testing)
timestamp_training = timestamp_testing - timedelta(days=churn_window)

print (f'timestamp for training: {timestamp_training}')

#Perform feature engineering for training data
session.call('uc01_feature_engineering_sproc', db, sc, timestamp_training, table_features)


### Label Training and Testing dataset

We have to label the training and testing data using the function defined in the previous notebook. We call the stored procedure uc_01_label_churn_sproc, pass the baseline table we have created (table_features) and create a new table churn_baseline_labeled with the right CHURN label. 30 is the number of days we use for churn definition

In [None]:
session.call('uc_01_label_churn_sproc',table_features, 'churn_baseline_labeled', 30 )

Let's review how many churns we have in the training and testing dataset.

In [None]:
SELECT 
    TIMESTAMP,
    SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
    SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
FROM churn_baseline_labeled
GROUP BY TIMESTAMP
ORDER BY TIMESTAMP;

One of the key things is understanding customer feedback. As part of the transformation pipeline, we have already used Snowflake Cortex AI to extract sentiment from customer interactions. Let's take a look:

In [None]:
select * from FEEDBACK_SENTIMENT where customer_id = 'CUST-96';

## Feature Store & Model Registry

Here we create a new Feature Store and Model Registry

In [None]:
from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode)

# Snowflake Model Registry
from snowflake.ml.registry import Registry

db = session.get_current_database()
sc = session.get_current_schema()

mr_schema = 'MODEL_REGISTRY'
fs_schema = 'FEATURE_STORE'
warehouse = 'COMPUTE_WH'  #modify as needed.This one is standard in quickstarts

#cleanup - When running this notebook, we are starting from scratch for the demo

session.sql(f''' drop schema if exists {mr_schema}''').collect()
session.sql("drop schema if exists FEATURE_STORE").collect()


# Create the Model Registry
try:
    cs = session.get_current_schema()
    session.sql(f''' create schema {mr_schema} ''').collect()
    mr = Registry(session=session, database_name= db, schema_name=mr_schema)
    session.sql(f''' use schema {cs}''').collect()
except:
    print(f"Model Registry ({mr_schema}) already exists")   
    mr = Registry(session=session, database_name= db, schema_name=mr_schema)
else:
    print(f"Model Registry ({mr_schema}) created")


# Create the Feature Store
try:
    fs = FeatureStore(session=session, database=db, name=fs_schema, 
                          default_warehouse=warehouse, 
                          creation_mode=CreationMode.FAIL_IF_NOT_EXIST)
    print(f"Feature Store ({fs_schema}) already exists") 
except:
    fs = FeatureStore(session=session, database=db, name=fs_schema, 
                          default_warehouse=warehouse, 
                          creation_mode=CreationMode.CREATE_IF_NOT_EXIST)
    print(f"Feature Store ({fs_schema}) created")   


#### CUSTOMER Entity
Now that we have a Feature Store, the first step is to define the Entities we are going to be using. For this demo, all will be around the customer, and CUSTOMER_ID will be used to join customer, sales and feedback data. When selecting features, will be for customer ids. 

In [None]:
import json

if "CUSTOMER_ENT" not in json.loads(fs.list_entities().select(F.to_json(F.array_agg("NAME", True))).collect()[0][0]):
    customer_entity = Entity(
        name="CUSTOMER_ENT", 
        join_keys=["CUSTOMER_ID"],
        desc="Primary Key for CUSTOMER")
    fs.register_entity(customer_entity)
else:
    customer_entity = fs.get_entity("CUSTOMER_ENT")

fs.list_entities().show()

#### Feature View

For the previous Entity we create a Feature View. Feature transformations can happen within the Feature View or externaly. Parameter feature_df points to either a dataframe with transformations, or a SQL. In that case, Feature Store will be in charge of maintaining those features.

Another option (like here) is that transformations are done outside. That can be the use case for DBT or using Streams and Tasks within Snowflake. This last is what we have in this demo, as we are pointing the Feature View directly to the sales_churn_baseline were we are storing the features for a given customer and timestamp.

In [None]:
session.sql ('use schema dev').collect()

churn_df = session.table("dev.churn_baseline_labeled")

preprocess_features_desc = {  
}

ppd_fv_name    = "FV_UC01_PREPROCESS"
ppd_fv_version = "V_1"

try:
   # If FeatureView already exists just return the reference to it
   fv_uc01_preprocess = fs.get_feature_view(name=ppd_fv_name,version=ppd_fv_version)
except:
   # Create the FeatureView instance
   fv_uc01_preprocess_instance = FeatureView(
      name=ppd_fv_name, 
      entities=[customer_entity], 
      feature_df=churn_df,      # <- We can use the snowpark dataframe as-is from our Python
      timestamp_col="TIMESTAMP",
      refresh_freq=None,  # The refresh will be external to feature view (maintained in the pipeline)
      desc="Features to support Churn Detection").attach_feature_desc(preprocess_features_desc)

   # Register the FeatureView instance.  Creates  object in Snowflake
   fv_uc01_preprocess = fs.register_feature_view(
      feature_view=fv_uc01_preprocess_instance, 
      version=ppd_fv_version, 
      block=True
   )
   print(f"Feature View : {ppd_fv_name}_{ppd_fv_version} created")   
else:
   print(f"Feature View : {ppd_fv_name}_{ppd_fv_version} already created")
finally:
   fs.list_feature_views().show(5)

In [None]:
fv_uc01_preprocess.feature_df.show(5)


The Feature Store uses an ASOF join in order to quickly extract features. Here we define the spine for the training and testing datasets using their timestamps:

In [None]:
# Create Spine for training
# Timestamp for training was set before when we did the labeling.

training_spine_sdf =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(timestamp_training)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))


training_spine_sdf.sort('CUSTOMER_ID').show(5)

#### Training & Testing Dataset

This is extracted from the Feature View using the spine defined for training and testing. As we create this dataset, it will be kept. This is important so we can understand the lineaage from our Moels and we can review the dataset used for one specific model later if needed.

In [None]:
# Generate_Dataset
training_dataset = fs.generate_dataset( name = 'UC01_TRAINING',
                                        spine_df = training_spine_sdf, features = [fv_uc01_preprocess], 
                                        spine_timestamp_col = 'TIMESTAMP'
                                        )                                     
# Create a snowpark dataframe reference from the Dataset
training_dataset_sdf = training_dataset.read.to_snowpark_dataframe()
# Display some sample data
training_dataset_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
# Create Spine for training this happens after the training one
testing_spine_sdf =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(timestamp_testing)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))


testing_spine_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
# Generate_Dataset for Testing
testing_dataset = fs.generate_dataset( name = 'UC01_TESTING',
                                        spine_df = testing_spine_sdf, features = [fv_uc01_preprocess], 
                                        spine_timestamp_col = 'TIMESTAMP'
                                        )                                     
# Create a snowpark dataframe reference from the Dataset
testing_dataset_sdf = testing_dataset.read.to_snowpark_dataframe()
# Display some sample data
testing_dataset_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
session.sql('use schema DEV').collect()

## Training the Model

This defines our function for Training. Using snowflake.ml.modeling for preprocesing and trainign brings the advantage of using Snowflake compute resources. Data is not leaving Snowflake platform and take advantage of Snowflake scalability.

A pipeline is defined that performs some transformations and train the model. Using the pipeline brings the advantage of not having to tranform the features for inference, as that will be done automaticaly.

Snowflake compute is also used to get metrics from the model predictions

In [None]:
from snowflake.ml.modeling.preprocessing import StandardScaler as sml_StandardScaler
from snowflake.ml.modeling.preprocessing import OrdinalEncoder as sml_OrdinalEncoder
from snowflake.ml.modeling.pipeline import Pipeline as sml_Pipeline
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling import metrics as snowml_metrics
from snowflake.ml.modeling.xgboost import XGBClassifier


def uc01_train(feature_df):

    train_df, testing_df = feature_df.random_split(weights=[0.8, 0.2], seed=111)
    
    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']

    ss_input_numerical_cols = [
        "AGE",
        "COUNT_SENTIMENT_PAST_7D", "COUNT_SENTIMENT_PAST_1MM", "COUNT_SENTIMENT_PAST_2MM", "COUNT_SENTIMENT_PAST_3MM",
        "AVG_SENTIMENT_PAST_7D", "AVG_SENTIMENT_PAST_1MM", "AVG_SENTIMENT_PAST_2MM", "AVG_SENTIMENT_PAST_3MM",
        "MIN_SENTIMENT_PAST_7D", "MIN_SENTIMENT_PAST_1MM", "MIN_SENTIMENT_PAST_2MM", "MIN_SENTIMENT_PAST_3MM",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
    
    ss_output_numerical_cols = [col + "_SS" for col in ss_input_numerical_cols]
    
    label = ['CHURNED']
    output_label = ['CHURNED_PRED']

    input_columns = oe_input_cols + ss_input_numerical_cols + label
    output_columnss = oe_output_cols + ss_output_numerical_cols
    
    pipeline_purchases = sml_Pipeline(
        steps=[ ("OE",
                    sml_OrdinalEncoder(
                        input_cols=oe_input_cols,
                        output_cols=oe_output_cols)),
                ("SS",
                    sml_StandardScaler(
                        input_cols=ss_input_numerical_cols,
                        output_cols=ss_output_numerical_cols)),
                ("XGB",
                    XGBClassifier(
                        input_cols=output_columnss,
                        label_cols=label,
                        output_cols=output_label))
                ])
 
    pipeline_purchases.fit(train_df.select(input_columns))

    predictions = pipeline_purchases.predict(train_df)

    accuracy_score = snowml_metrics.accuracy_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])
                                                        
    precision_score = snowml_metrics.precision_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])

    recall_score = snowml_metrics.recall_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   
    
    f1_score = snowml_metrics.f1_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   


    predictions = pipeline_purchases.predict(testing_df)

    t_accuracy_score = snowml_metrics.accuracy_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])
                                                        
    t_precision_score = snowml_metrics.precision_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])

    t_recall_score = snowml_metrics.recall_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   
    
    t_f1_score = snowml_metrics.f1_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   


    
    return {'MODEL': pipeline_purchases,
            'accuracy_score': accuracy_score,
            'precision_score': precision_score,
            'recall_score': recall_score,
            'f1_score': f1_score ,

            't_accuracy_score': t_accuracy_score,
            't_precision_score': t_precision_score,
            't_recall_score': t_recall_score,
            't_f1_score': t_f1_score 
           }

In [None]:
# Train the model
trained_model = uc01_train(training_dataset_sdf)

In [None]:
# Lets see the results
trained_model

We had created a dataset for the testing data. Remember that testing data are transactions that did happen after the training

In [None]:
model= trained_model['MODEL']

predictions = model.predict(testing_dataset_sdf)

testing_accuracy_score = snowml_metrics.accuracy_score (df=predictions, 
                                                y_true_col_names=['CHURNED'],
                                                y_pred_col_names=['CHURNED_PRED'])
                                                    
testing_precision_score = snowml_metrics.precision_score (df=predictions, 
                                                y_true_col_names=['CHURNED'],
                                                y_pred_col_names=['CHURNED_PRED'])

testing_recall_score = snowml_metrics.recall_score (df=predictions, 
                                                y_true_col_names=['CHURNED'],
                                                y_pred_col_names=['CHURNED_PRED'])   

testing_f1_score = snowml_metrics.f1_score (df=predictions, 
                                                y_true_col_names=['CHURNED'],
                                                y_pred_col_names=['CHURNED_PRED'])   

print (f'testing_accuracy_score = {testing_accuracy_score}')
print (f'testing_precision_score = {testing_precision_score}')
print (f'testing_recall_score = {testing_recall_score}')
print (f'testing_f1_score = {testing_f1_score}')


In [None]:
predictions.limit(5)

### Login the Model

We had created the model registry before. Now se use it to log the model we have just trained. We also store the metrics we have calculated for the model.

The model name and version are unique

In [None]:
from snowflake.ml.model import type_hints

model_logged = mr.log_model(model= trained_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "v0",
                #conda_dependencies=["snowflake-ml-python"],
                sample_input_data = training_dataset_sdf.limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

model_logged.set_metric(metric_name="accuracy_score", value=trained_model['accuracy_score'])
model_logged.set_metric(metric_name="precision_score", value=trained_model['precision_score'])
model_logged.set_metric(metric_name="recall_score", value=trained_model['recall_score'])
model_logged.set_metric(metric_name="f1_score", value=trained_model['f1_score'])

model_logged.set_metric(metric_name="testing_accuracy_score", value=testing_accuracy_score)
model_logged.set_metric(metric_name="testing_precision_score", value=testing_precision_score)
model_logged.set_metric(metric_name="testing_recall_score", value=testing_recall_score)
model_logged.set_metric(metric_name="testing_f1_score", value=testing_f1_score)

### Inference

We define two functions to provide inference for a model and dataframe. We use predict to calculate performance metrics and also predict_proba that will be used by the Streamlit App to detect customer with highest probability of not buying again.

Data is appended to a table and we filter only relevant columns

In [None]:
def inference(model, inference_df, output_table):

    inference_result_sdf = model.run(inference_df, function_name="predict")

    columns = [
        "CUSTOMER_ID", "AGE", "GENDER", "LOCATION", "CUSTOMER_SEGMENT", 
        "LAST_PURCHASE_DATE", "DAYS_SINCE_LAST_PURCHASE", 
        "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "AVG_SENTIMENT_PAST_1MM", "AVG_SENTIMENT_PAST_2MM", 
        "COUNT_SENTIMENT_PAST_1MM", "COUNT_SENTIMENT_PAST_2MM",
        "TIMESTAMP", "CHURNED", "CHURNED_PRED"
    ]
        
    inference_result_sdf_final = inference_result_sdf.select(columns)

    inference_result_sdf_final.write.mode("append").save_as_table(output_table)

In [None]:
def inference_proba(model, inference_df, output_table):

    inference_result_sdf = model.run(inference_df, function_name="predict_proba")

    
    columns = [
        "CUSTOMER_ID", "AGE", "GENDER", "LOCATION", "CUSTOMER_SEGMENT", 
        "LAST_PURCHASE_DATE", "DAYS_SINCE_LAST_PURCHASE", 
        "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "AVG_SENTIMENT_PAST_1MM", "AVG_SENTIMENT_PAST_2MM", 
        "COUNT_SENTIMENT_PAST_1MM", "COUNT_SENTIMENT_PAST_2MM",
        "TIMESTAMP", "CHURNED", "PREDICT_PROBA_1"
    ]
          
    inference_result_sdf_final = inference_result_sdf.select(columns)

    inference_result_sdf_final.write.mode("append").save_as_table(output_table)

In [None]:
---These are the tables that will be used for model monitoring:
--- Cleanup to start fresh

drop table if exists dev.customer_churn_baseline_predicted;
drop table if exists dev.customer_churn_predicted;
drop table if exists dev.customer_churn_predicted_proba;

#### Baselines for Model Monitoring

customer_churn_baseline_predicted is the table that will be used as baseline for Model Monitoring. This is what we used for training, so the Model Monitor can detect any dift on data an generate alarms or re-training.  It will also get the base performance metrics

In [None]:
model = mr.get_model("ChurnDetector").version("v0")

inference(model, training_dataset_sdf, 'dev.customer_churn_baseline_predicted')
inference(model, testing_dataset_sdf, 'dev.customer_churn_predicted')

### Simulate to generate new data on a monthly basis

Here we just simulate every week we generate new features and run inference to detect churn for customers that will not buy again in the near future.

At this point we just create data but will modify it to use streams and task. Goal is to go step by step and show the demo

In [None]:
select count(*) from dev.sales;

In [None]:
-- Reminder of the features we have calculated:
SELECT 
    TIMESTAMP,
    SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
    SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
FROM churn_baseline_labeled
GROUP BY TIMESTAMP
ORDER BY TIMESTAMP;

In [None]:

churn_baseline_labeled_df = session.table('churn_baseline_labeled')
                      
latest_feature_timestamp = churn_baseline_labeled_df.select(F.max(F.col("timestamp"))).collect()[0][0]

print (f'latest feature timestamp:{latest_feature_timestamp} ')

latest_sales_transaction = session.table('sales').select(F.max(F.col("transaction_date"))).collect()[0][0]

print (f'latest transaction in sales table:{latest_sales_transaction} ')


Get a pointer to the Feature View to be used

In [None]:
ppd_fv_name    = "FV_UC01_PREPROCESS"
ppd_fv_version = "V_1"

try:
   # If FeatureView already exists just return the reference to it
   fv_uc01_preprocess = fs.get_feature_view(name=ppd_fv_name,version=ppd_fv_version)
except:
    print ('Check this error as the feature view should be already created')


This is a simulation for adding new transactions. These are the  steps:

- Add new transactions
- Predict on those new transactions
- Correct the churn label on the previous transactions

This will be used by the model monitor later


In [None]:
session.sql('use schema DEV').collect()

db = session.get_current_database()
sc = 'DEV'

print (f'latest feature timestamp:{latest_feature_timestamp} ')

new_predict_timestamp = latest_feature_timestamp

churn_window = 30

for i in range(5):
    # This will add 7 more days of sales to the sales table
   
    new_predict_timestamp = new_predict_timestamp + timedelta(days=churn_window)

    # Build features
    print (f'Building features for timestamp {new_predict_timestamp} ')
    session.call('uc01_feature_engineering_sproc', db, sc, new_predict_timestamp, table_features)

    
    # Update the table used by the feature store (label for latest will not be correct)
    print ('Updating known labels ')
    session.call('uc_01_label_churn_sproc',table_features, 'churn_baseline_labeled', 30 )

    #features_to_inference = session.table(churn_baseline_labeled)
    
    # Create a dataset with those features

    inference_spine_sdf =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(new_predict_timestamp)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))

    formatted_date = str(new_predict_timestamp).replace("-", "_")

    print ('generating dataset for inference')
    name_dataset = f'UC01_INFERENCE_{i}'
    inference_dataset = fs.generate_dataset( name = name_dataset,
                                            version = 'v1',
                                            spine_df = inference_spine_sdf, features = [fv_uc01_preprocess], 
                                            spine_timestamp_col = 'TIMESTAMP'
                                            )                                     
    # Create a snowpark dataframe reference from the Dataset
    inference_dataset_sdf = inference_dataset.read.to_snowpark_dataframe()

    #Run the inference for that dataset
    inference(model, inference_dataset_sdf, 'dev.customer_churn_predicted')

    inference_proba(model, inference_dataset_sdf, 'dev.customer_churn_predicted_proba')





In [None]:
-- Check there is only one combination for customer_id and timestamp:

with t as
(select customer_id, timestamp,
    count(*) as num_comb 
from dev.customer_churn_predicted 
group by customer_id, timestamp)

select num_comb, count(*) as num from t
group by num_comb;

In [None]:
use schema DEV;
-- Reminder of the features we have calculated:
SELECT 
    TIMESTAMP,
    SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
    SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
FROM churn_baseline_labeled
GROUP BY TIMESTAMP
ORDER BY TIMESTAMP;

In [None]:
select timestamp, sum(churned), sum(CHURNED_PRED) from dev.customer_churn_predicted
group by timestamp
order by timestamp;

In [None]:
select min(timestamp), max(timestamp) from dev.customer_churn_predicted;

### Model Monitor

With a single SQL (we can also use Python API) define the Model Monitor. We provide the table we had created with predictiosn on the training data and the table where we are storign all new predic tions. Create the Model Monitor for the model we had previously trained and testing. 

def inference(model, inference_df, output_table):

    inference_result_sdf = model.run(inference_df, function_name="predict")

    mms_input_numerical_cols = [
            "AGE",
            "COUNT_SENTIMENT_PAST_7D", "COUNT_SENTIMENT_PAST_1MM", "COUNT_SENTIMENT_PAST_2MM", "COUNT_SENTIMENT_PAST_3MM",
            "AVG_SENTIMENT_PAST_7D", "AVG_SENTIMENT_PAST_1MM", "AVG_SENTIMENT_PAST_2MM", "AVG_SENTIMENT_PAST_3MM",
            "MIN_SENTIMENT_PAST_7D", "MIN_SENTIMENT_PAST_1MM", "MIN_SENTIMENT_PAST_2MM", "MIN_SENTIMENT_PAST_3MM",
            "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
            "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
        ]
        
    mms_output_numerical_cols = [col + "_MMS" for col in mms_input_numerical_cols]
    
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']
    
    drop_columns = mms_output_numerical_cols + oe_output_cols
    
    inference_result_sdf_final = inference_result_sdf
    
    for column in drop_columns: 
        inference_result_sdf_final = inference_result_sdf_final.drop(F.col(column))

    inference_result_sdf_final.write.mode("append").save_as_table(output_table)

In [None]:
use schema MODEL_REGISTRY;

CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector
WITH
    MODEL=ChurnDetector
    VERSION=v0
    FUNCTION=predict
    SOURCE=dev.customer_churn_predicted
    BASELINE=dev.customer_churn_baseline_predicted
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
use schema MODEL_REGISTRY;

DESC MODEL MONITOR Monitor_ChurnDetector;

In [None]:
select DAYS_SINCE_LAST_PURCHASE, CHURNED, PREDICT_PROBA_1,  from dev.customer_churn_predicted_proba 
order by PREDICT_PROBA_1 desc
limit 50;


In [None]:
select * from dev.customer_churn_baseline_predicted limit 2;

In [None]:
select * from dev.customer_churn_predicted limit 2;

In [None]:
describe table dev.customer_churn_predicted;

In [None]:
#retrain model with more recent data

# Create Spine for training
# Timestamp for training was set before when we did the labeling.

new_training_spine_sdf =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(timestamp_training)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))


new_training_spine_sdf.sort('CUSTOMER_ID').show(5)


In [None]:
SELECT 
    TIMESTAMP,
    SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
    SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
FROM dev.churn_baseline_labeled
GROUP BY TIMESTAMP
ORDER BY TIMESTAMP;

### Training a 2nd Model Version

In [None]:
churn_baseline_labeled_df = session.table('dev.churn_baseline_labeled')
                      
distinct_timestamps = (
    churn_baseline_labeled_df.select(F.col("timestamp"))
    .distinct()
    .sort(F.col("timestamp"))
)

# Fetch the second minimum timestamp
second_min_timestamp = distinct_timestamps.limit(2).collect()[1][0]

print(second_min_timestamp)

In [None]:
# Create Spine for training based on a more recent month
# Timestamp for training was set before when we did the labeling.

training_spine_sdf2 =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(second_min_timestamp)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))



# Generate_Dataset
training_dataset = fs.generate_dataset( name = 'UC02_TRAINING',
                                        spine_df = training_spine_sdf2, features = [fv_uc01_preprocess], 
                                        spine_timestamp_col = 'TIMESTAMP'
                                        )                                     
# Create a snowpark dataframe reference from the Dataset
training_dataset_sdf = training_dataset.read.to_snowpark_dataframe()
# Display some sample data
training_dataset_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
trained_model = uc01_train(training_dataset_sdf)

In [None]:
trained_model

In [None]:
from snowflake.ml.model import type_hints

model_logged = mr.log_model(model= trained_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "v1",
                #conda_dependencies=["snowflake-ml-python"],
                sample_input_data = training_dataset_sdf.limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

model_logged.set_metric(metric_name="accuracy_score", value=trained_model['accuracy_score'])
model_logged.set_metric(metric_name="precision_score", value=trained_model['precision_score'])
model_logged.set_metric(metric_name="recall_score", value=trained_model['recall_score'])
model_logged.set_metric(metric_name="f1_score", value=trained_model['f1_score'])

In [None]:
---These are the tables that will be used for model monitoring:
--- Cleanup to start fresh

drop table if exists dev.customer_churn_predicted2;
drop table if exists dev.customer_churn_baseline_predicted2;

In [None]:
model = mr.get_model("ChurnDetector").version("v1")

inference(model, training_dataset_sdf, 'dev.customer_churn_baseline_predicted2')
#inference(model, testing_dataset_sdf, 'dev.customer_churn_predicted')

In [None]:
from snowflake.ml import dataset

session.sql('use schema DEV').collect()

db = session.get_current_database()
sc = 'DEV'

print (f'latest feature timestamp:{latest_feature_timestamp} ')

new_predict_timestamp = latest_feature_timestamp

churn_window = 30

for i in range(5):
   
    print ('Read dataset for inference')
    name_dataset = f'FEATURE_STORE.UC01_INFERENCE_{i}'

    inference_dataset = dataset.load_dataset(session, name_dataset, 'v1')
    # Create a snowpark dataframe reference from the Dataset
    inference_dataset_sdf = inference_dataset.read.to_snowpark_dataframe()

    #Run the inference for that dataset
    inference(model, inference_dataset_sdf, 'dev.customer_churn_predicted2')

    inference_proba(model, inference_dataset_sdf, 'dev.customer_churn_predicted_proba2')




In [None]:
use schema MODEL_REGISTRY;

CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_v1
WITH
    MODEL=ChurnDetector
    VERSION=v1
    FUNCTION=predict
    SOURCE=dev.customer_churn_predicted2
    BASELINE=dev.customer_churn_baseline_predicted2
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

### Training 3rd Model

In [None]:
churn_baseline_labeled_df = session.table('dev.churn_baseline_labeled')
                      
distinct_timestamps = (
    churn_baseline_labeled_df.select(F.col("timestamp"))
    .distinct()
    .sort(F.col("timestamp"))
)

# Fetch the second minimum timestamp
third_min_timestamp = distinct_timestamps.limit(3).collect()[2][0]

print(second_min_timestamp)

In [None]:
# Create Spine for training based on a more recent month
# Timestamp for training was set before when we did the labeling.

training_spine_sdf3 =  fv_uc01_preprocess.feature_df.filter(F.col("TIMESTAMP") == F.lit(third_min_timestamp)) \
                                    .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))



# Generate_Dataset
training_dataset = fs.generate_dataset( name = 'UC03_TRAINING',
                                        spine_df = training_spine_sdf3, features = [fv_uc01_preprocess], 
                                        spine_timestamp_col = 'TIMESTAMP'
                                        )                                     
# Create a snowpark dataframe reference from the Dataset
training_dataset_sdf = training_dataset.read.to_snowpark_dataframe()
# Display some sample data
training_dataset_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
trained_model = uc01_train(training_dataset_sdf)

In [None]:
trained_model

In [None]:
from snowflake.ml.model import type_hints

model_logged = mr.log_model(model= trained_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "v2",
                #conda_dependencies=["snowflake-ml-python"],
                sample_input_data = training_dataset_sdf.limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

model_logged.set_metric(metric_name="accuracy_score", value=trained_model['accuracy_score'])
model_logged.set_metric(metric_name="precision_score", value=trained_model['precision_score'])
model_logged.set_metric(metric_name="recall_score", value=trained_model['recall_score'])
model_logged.set_metric(metric_name="f1_score", value=trained_model['f1_score'])

In [None]:

drop table if exists dev.customer_churn_predicted3;
drop table if exists dev.customer_churn_baseline_predicted3;

In [None]:
model = mr.get_model("ChurnDetector").version("v2")

inference(model, training_dataset_sdf, 'dev.customer_churn_baseline_predicted3')
#inference(model, testing_dataset_sdf, 'dev.customer_churn_predicted')

In [None]:
from snowflake.ml import dataset

session.sql('use schema DEV').collect()

for i in range(1, 5):

    
    print ('Read dataset for inference')
    name_dataset = f'FEATURE_STORE.UC01_INFERENCE_{i}'

    inference_dataset = dataset.load_dataset(session, name_dataset, 'v1')
    # Create a snowpark dataframe reference from the Dataset
    inference_dataset_sdf = inference_dataset.read.to_snowpark_dataframe()

    #Run the inference for that dataset
    inference(model, inference_dataset_sdf, 'dev.customer_churn_predicted3')

    inference_proba(model, inference_dataset_sdf, 'dev.customer_churn_predicted_proba3')

In [None]:
use schema MODEL_REGISTRY;

CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_v2
WITH
    MODEL=ChurnDetector
    VERSION=v2
    FUNCTION=predict
    SOURCE=dev.customer_churn_predicted3
    BASELINE=dev.customer_churn_baseline_predicted3
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
SELECT 
    TIMESTAMP,
    SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
    SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
FROM dev.customer_churn_predicted3
GROUP BY TIMESTAMP
ORDER BY TIMESTAMP;

### Simulate predictions update the default model 

In [None]:
drop table if exists dev.customer_churn_prod_predicted;


In [None]:
model = mr.get_model("ChurnDetector")
vers = model.show_versions()

num_model_versions = vers['name'].count()

In [None]:
from snowflake.ml import dataset

session.sql('use schema DEV').collect()

num_model_versions = 3

for i in range(5):

    if i < num_model_versions:
        version=f'V{i}'
    else:
        version=f'V{num_model_versions - 1}'

    print (f'Using model version: {version} ')

    model = mr.get_model("ChurnDetector").version(version)
    
    print ('Read dataset for inference')
    name_dataset = f'FEATURE_STORE.UC01_INFERENCE_{i}'

    inference_dataset = dataset.load_dataset(session, name_dataset, 'v1')
    # Create a snowpark dataframe reference from the Dataset
    inference_dataset_sdf = inference_dataset.read.to_snowpark_dataframe()

    #Run the inference for that dataset
    inference(model, inference_dataset_sdf, 'dev.customer_churn_prod_predicted')

    inference_proba(model, inference_dataset_sdf, 'dev.customer_churn_prod_predicted_proba')



In [None]:
from snowflake.ml.model import type_hints

model_logged = mr.log_model(model= trained_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "v3",
                #conda_dependencies=["snowflake-ml-python"],
                sample_input_data = training_dataset_sdf.limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

In [None]:
use schema MODEL_REGISTRY;

CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_prod
WITH
    MODEL=ChurnDetector
    VERSION=v3
    FUNCTION=predict
    SOURCE=dev.customer_churn_prod_predicted
    BASELINE=dev.customer_churn_baseline_predicted
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';