In [None]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark.types import DecimalType, FloatType, IntegerType, DoubleType, LongType
numeric_types = (DecimalType, FloatType, IntegerType, DoubleType, LongType)

import logging
logger = logging.getLogger("e2e-churn-log")

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


## Setting Up the Environment

In [None]:
# Change the schema name to work in a different env

SCHEMA = 'TEST1'

In [None]:

session.sql(f'CREATE OR REPLACE SCHEMA {SCHEMA} ').collect()
logger.info(f'Created new schema {SCHEMA}.')



In [None]:
session.sql (f'USE SCHEMA {SCHEMA} ').collect()


In [None]:
session.sql ('CREATE OR REPLACE STAGE ML_STAGE').collect()

In [None]:
# Columns we are going to be using for training

oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']

ss_input_numerical_cols = [
    "AGE",
    "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
    "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
    "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
    "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
]
columns = oe_input_cols + ss_input_numerical_cols

In [None]:
CREATE or replace TABLE SALES (
    TRANSACTION_ID VARCHAR,
    CUSTOMER_ID VARCHAR,
    TRANSACTION_DATE DATE,
    DISCOUNT_APPLIED BOOLEAN,
    NUM_ITEMS NUMBER,
    PAYMENT_METHOD VARCHAR, 
    TOTAL_AMOUNT FLOAT
);

CREATE or replace TABLE CUSTOMERS (
    CUSTOMER_ID VARCHAR,
    AGE BIGINT,
    CUSTOMER_SEGMENT VARCHAR,
    GENDER VARCHAR,
    LOCATION VARCHAR,
    SIGNUP_DATE DATE
);

CREATE or replace TABLE FEEDBACK_RAW (
    CHAT_DATE DATE,
    COMMENT VARCHAR,
    CUSTOMER_ID VARCHAR,
    FEEDBACK_ID VARCHAR,
    INTERNAL_ID BIGINT
);

CREATE or replace STREAM FEEDBACK_RAW_STREAM 
    ON TABLE FEEDBACK_RAW
    APPEND_ONLY = TRUE;

CREATE or replace TABLE FEEDBACK_SENTIMENT (
    FEEDBACK_ID VARCHAR,
    CHAT_DATE DATE,
    CUSTOMER_ID VARCHAR,
    INTERNAL_ID BIGINT,
    COMMENT VARCHAR,
    SENTIMENT FLOAT
);


In [None]:
-- Us this table to track what monthly files we have processed

CREATE OR REPLACE TABLE FILES_INGESTED (
    YEAR INT,
    MONTH INT,
    FILE_TYPE VARCHAR,
    FILE_NAME VARCHAR,
    STAGE_NAME VARCHAR,
    INGESTED BOOLEAN
)

In [None]:
# Read from the staging area the files that we have

import re
from snowflake.snowpark import Session
from typing import List, Tuple

def get_year_month_files(session: Session, stage_name: str, file_prefix: str) -> List[Tuple[int, int, str]]:
    # List files in the stage
    list_files_query = f"LIST @{stage_name}"
    files = session.sql(list_files_query).collect()

    # Extract file names
    file_names = [file["name"].split("/")[-1] for file in files]

    # Regular expression to extract year and month from filenames
    file_pattern = re.compile(rf"{re.escape(file_prefix)}_(\d+)_(\d+)\.csv")

    # List to store (year, month, filename) tuples
    results = []

    for file_name in file_names:
        match = file_pattern.match(file_name)
        if match:
            year, month = int(match.group(1)), int(match.group(2))
            results.append((year, month, file_name, stage_name))

    # Sort the list by (year, month)
    return sorted(results)

In [None]:
# Function to insert into the FILES_INGESTED table the files that we will process

stage_name = "DATASET.CSV"

db = session.get_current_database()
sc = session.get_current_schema()
print (f'database: {db}, schema: {sc}')


def insert_files (table,db, sc, files):
    for file in files:
        year = file[0]
        month = file[1]
        file_name = file[2]
        stage_name = file[3]
        sql_cmd = f"""
            insert into  {db}.{sc}.FILES_INGESTED
             (YEAR, month, file_type, file_name, stage_name, ingested)
             values 
                ('{year}', '{month}', '{table}', '{file_name}','{stage_name}', False)
        """
        session.sql(sql_cmd).collect()  



In [None]:
sales_month_files = get_year_month_files(session, stage_name, 'sales')
insert_files ('sales', db, sc, sales_month_files)

feedback_sentiment_month_files = get_year_month_files(session, stage_name, 'feedback_raw')
insert_files ('feedback_raw',db, sc, feedback_sentiment_month_files)


In [None]:
select * from FILES_INGESTED;

In [None]:
# For a simple demo, the customers are fixed, we do not ingest them in a monthly basis

sql_cmd = f"""
    COPY INTO {db}.{sc}.customers  
            FROM @{stage_name}/customers.csv  
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"')  
            ON_ERROR = 'CONTINUE'; 
"""

session.sql(sql_cmd).collect()

In [None]:
# Function to copy from the staging area the CSV file into sales and feedback_raw tables
# Copy the next file not yet ingested into the tables

import snowflake.snowpark.functions as F
from snowflake.snowpark import types as T

def load_into_table(session, table_name, file_name):

    sql_cmd = f""" 
        COPY INTO {table_name}
            FROM {file_name}  
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"')  
            ON_ERROR = 'ABORT_STATEMENT';      

    """
    session.sql(sql_cmd).collect()

def copy_next_file( session: Session, db: str, sc: str):
    
    df = session.table(f'{db}.{sc}.FILES_INGESTED')
    
    df_sales = df\
        .filter((F.col("file_type") == 'sales') & (F.col("ingested") == False)) \
        .select(F.col("year"), F.col("month"), F.col("file_name"), F.col("stage_name")) \
        .order_by(F.col("year").asc(), F.col("month").asc()) \
        .limit(1)
    
    df_pd = df_sales.to_pandas()

    if df_pd.empty:
        print("No unprocessed sales files found.")
        return False
    
    year = int(df_pd.YEAR[0])
    month = int(df_pd.MONTH[0])
    file_name = df_pd.FILE_NAME[0]
    stage_name = df_pd.STAGE_NAME[0]
    file_name_path = f'@{stage_name}/{file_name}'
    full_table_name = f'{db}.{sc}.sales'
    load_into_table(session, full_table_name, file_name_path)

    # Update FILES_INGESTED to mark this file as ingested
    update_cmd = f"""
        UPDATE {db}.{sc}.FILES_INGESTED
        SET ingested = TRUE
        WHERE file_name = '{file_name}' AND file_type = 'sales'
    """
    session.sql(update_cmd).collect()
    print(f"Updated FILES_INGESTED for file: {file_name}")

    
    print (year, month)
    
    df_feedback = df\
        .filter((F.col("file_type") == 'feedback_raw') & \
                (F.col("ingested") == False) &\
                (F.col("YEAR") == F.lit(year)) &\
                (F.col("MONTH") == F.lit(month))) \
        .select(F.col("year"), F.col("month"), F.col("file_name"), F.col("stage_name")) \
        .order_by(F.col("year").asc(), F.col("month").asc()) \
        .limit(1)
    
    if (df_feedback.count() > 0):
        df_pd = df_feedback.to_pandas()
        file_name = df_pd.FILE_NAME[0]
        stage_name = df_pd.STAGE_NAME[0]
        file_name_path = f'@{stage_name}/{file_name}'
        full_table_name = f'{db}.{sc}.feedback_raw'   
        load_into_table(session, full_table_name, file_name_path)

       # Update FILES_INGESTED to mark this file as ingested
        update_cmd = f"""
            UPDATE {db}.{sc}.FILES_INGESTED
            SET ingested = TRUE
            WHERE file_name = '{file_name}' AND file_type = 'feedback_raw'
        """
        session.sql(update_cmd).collect()
        print(f"Updated FILES_INGESTED for file: {file_name}")


    return True

#session.sproc.register(
#    func=copy_next_file,
#    name="copy_next_file_sproc",
#    replace=True,
#    is_permanent=True,
#    stage_location="@ML_STAGE",
#    packages=['snowflake-snowpark-python'],
#    return_type=T.BooleanType()
#)

#### Cortex AI to process Unstructured data

Take advantage of the power of LLMs to provide a number (sentiment) for the feedback received in text from the customer. This will be used as one feature to train our ML models (and later predict)

In [None]:
# Leverage sentiment function

from snowflake.cortex import sentiment

def process_sentiment():

    # Read from the Snowflake stream instead of the raw table
    feedback_stream_df = session.table("feedback_raw_stream")  # The stream on feedback_raw

    cols = ['FEEDBACK_ID', 'CHAT_DATE', 'CUSTOMER_ID', 'INTERNAL_ID', 'COMMENT']
    
    # Apply sentiment analysis using Cortex AI:
    
    feedback_sentiment_df = feedback_stream_df.select(cols).with_columns(
        ["sentiment"], [sentiment(F.col("comment"))]
    )
   
    # Write processed data into the target table
    feedback_sentiment_df.write.mode("append").save_as_table("feedback_sentiment")

In [None]:
# Tables that will container the features. Second one contains the labels

stage_name = "DATASET.CSV"
churn_window = 30 ## This is the value we define as churn
table_features = 'churn_baseline'
table_features_labeled = 'churn_baseline_labeled'

session.sql(f'drop table if exists {table_features} ').collect() #fresh start for this demo
session.sql(f'drop table if exists {table_features_labeled} ').collect() #fresh start for this demo


In [None]:
# Loading first 4 months that will be used for training the first model
# Load the file for sales and feedback
# Calculate the features for the latest sales timestamp for each file
# Label the CHURN based on feature transactions

from snowflake.snowpark import functions as F
from datetime import datetime

db = session.get_current_database()
sc = session.get_current_schema()
print (f'database: {db}, schema: {sc}')

# Ingest and process the frist 4 files:
for i in range (0,4):
    t1 = datetime.now()
    
    # STEP 1: COPY NEXT SALES AND FEEDBACK MONTH
    print ("#############################")
    print ("Ingest the next sales and feedback files")
    copy_next_file (session, db, sc)
    
    # STEP 2: PROCESS THE FEEDBACK RECEIVED
    print ("Processing Sentiment")
    process_sentiment()
    
    t2 = datetime.now()
    print (t2 - t1)
    
    # STEP 3: CALCULATE FEATURES FOR LATEST TIMESTAMP
    sales_df = session.table("sales") 
    
    #calculate features for the latest transaction timestamp
    latest_transaction = sales_df.select(F.max(F.col("transaction_date"))).collect()[0][0]
    
    print (f'Calculating features for timestamp : {latest_transaction} ')
    session.call('UTILS.uc01_feature_engineering_sproc', db, sc, latest_transaction, table_features)
    
    t3 = datetime.now()
    print (t3 - t2)
       
    # STEP 4: SET RIGHT CHURN LABEL BASED ON NEW SALES DATA
    
    print (f'Adding labels for window: {churn_window} ')
    session.call('UTILS.uc_01_label_churn_sproc', db, sc, table_features, table_features_labeled, 30 )

    t4 = datetime.now()
    print (t4 - t3)
    


In [None]:
# Look at the balance. We will use the second one for training and the thrid for validation. 
# Note that labels for the last one are incorrect as wew still need the next month to have real values

sql_cmd = f"""
            SELECT 
                TIMESTAMP,
                SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
                SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
            FROM {table_features_labeled}
            GROUP BY TIMESTAMP
            ORDER BY TIMESTAMP;
            """
session.sql(sql_cmd).collect()

In [None]:
-- Let's take a look to one record to see how itlooks like

select TIMESTAMP, CUSTOMER_ID, LAST_PURCHASE_DATE, DAYS_SINCE_LAST_PURCHASE,  NEXT_TRANSACTION_DATE
, NEXT_TRANSACTION_DATE - LAST_PURCHASE_DATE ,  CHURNED from churn_baseline_labeled  where customer_id = 'CUST-11'
ORDER BY TIMESTAMP;

## Feature Store and Model Registry

In [None]:
# Create the Feature Store and Model Registry. Will use the sufix name of the SCHEMA being used by this Notebook

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode)

# Snowflake Model Registry
from snowflake.ml.registry import Registry

db = session.get_current_database()
sc = session.get_current_schema()
print (sc)

mr_schema = f'{sc}_MODEL_REGISTRY'
mr_schema = mr_schema.replace('"', '')
fs_schema = f'{sc}_FEATURE_STORE'
fs_schema = fs_schema.replace('"', '')

warehouse = 'COMPUTE_WH'  #modify as needed.This one is standard in quickstarts

print (mr_schema)

#cleanup - When running this notebook, we are starting from scratch for the demo

session.sql(f'drop schema if exists {mr_schema}').collect()
session.sql(f'drop schema if exists {fs_schema}').collect()


# Create the Model Registry
try:
    cs = session.get_current_schema()
    session.sql(f''' create schema {mr_schema} ''').collect()
    mr = Registry(session=session, database_name= db, schema_name=mr_schema)
    session.sql(f''' use schema {cs}''').collect()
except:
    print(f"Model Registry ({mr_schema}) already exists")   
    mr = Registry(session=session, database_name= db, schema_name=mr_schema)
else:
    print(f"Model Registry ({mr_schema}) created")


# Create the Feature Store
try:
    fs = FeatureStore(session=session, database=db, name=fs_schema, 
                          default_warehouse=warehouse, 
                          creation_mode=CreationMode.FAIL_IF_NOT_EXIST)
    print(f"Feature Store ({fs_schema}) already exists") 
except:
    fs = FeatureStore(session=session, database=db, name=fs_schema, 
                          default_warehouse=warehouse, 
                          creation_mode=CreationMode.CREATE_IF_NOT_EXIST)
    print(f"Feature Store ({fs_schema}) created")   


session.use_schema(sc)

### Feature Store: Entity -> Feature View -> Dataset

In [None]:
# Define Entity

import json

if "CUSTOMER_ENT" not in json.loads(fs.list_entities().select(F.to_json(F.array_agg("NAME", True))).collect()[0][0]):
    customer_entity = Entity(
        name="CUSTOMER_ENT", 
        join_keys=["CUSTOMER_ID"],
        desc="Primary Key for CUSTOMER")
    fs.register_entity(customer_entity)
else:
    customer_entity = fs.get_entity("CUSTOMER_ENT")

fs.list_entities().show()

Define the Feature View based on the table where features are being calculated.

In [None]:
sc = session.get_current_schema()

churn_df = session.table(f'{sc}.{table_features_labeled}')

preprocess_features_desc = {  
}

ppd_fv_name    = "FV_UC01_PREPROCESS"
ppd_fv_version = "V_1"

try:
   # If FeatureView already exists just return the reference to it
   fv_uc01_preprocess = fs.get_feature_view(name=ppd_fv_name,version=ppd_fv_version)
except:
   # Create the FeatureView instance
   fv_uc01_preprocess_instance = FeatureView(
      name=ppd_fv_name, 
      entities=[customer_entity], 
      feature_df=churn_df,      # <- We can use the snowpark dataframe as-is from our Python
      timestamp_col="TIMESTAMP",
      refresh_freq=None,  # The refresh will be external to feature view (maintained in the pipeline)
      desc="Features to support Churn Detection").attach_feature_desc(preprocess_features_desc)

   # Register the FeatureView instance.  Creates  object in Snowflake
   fv_uc01_preprocess = fs.register_feature_view(
      feature_view=fv_uc01_preprocess_instance, 
      version=ppd_fv_version, 
      block=True
   )
   print(f"Feature View : {ppd_fv_name}_{ppd_fv_version} created")   
else:
   print(f"Feature View : {ppd_fv_name}_{ppd_fv_version} already created")
finally:
   fs.list_feature_views().show(5)

In [None]:
# We can take a look to how the features looks like:

fv_uc01_preprocess.feature_df.filter(F.col("CUSTOMER_ID") == "CUST-1").show(5)
fv_uc01_preprocess.feature_df.show(5)


In [None]:
# Look at the balance. We will use the second one for training and the thrid for validation. 
# Note that labels for the last one are incorrect as wew still need the next month to have real values

sql_cmd = f"""
            SELECT 
                TIMESTAMP,
                SUM(CASE WHEN churned = 0 THEN 1 ELSE 0 END) AS not_churned,
                SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) AS churned
            FROM {table_features_labeled}
            GROUP BY TIMESTAMP
            ORDER BY TIMESTAMP;
            """
session.sql(sql_cmd).collect()


In [None]:
# Take the timestamps we are going to be used for training and validation
# Note both of them should have real labels so we can measure performance

timestamps = session.table(table_features_labeled).select("TIMESTAMP").distinct().sort("TIMESTAMP").collect()

timestamp_training = timestamps[1]["TIMESTAMP"]
timestamp_testing = timestamps[2]["TIMESTAMP"]

print (f'Timestamp used for training: {timestamp_training} ')
print (f'Timestamp used for validation: {timestamp_testing} ')


In [None]:
# Common function to generate and return a dataset

def fs_generate_dataset (fs, fv, name, timestamp):
        
    spine_sdf =  fv.feature_df.filter(F.col("TIMESTAMP") == F.lit(timestamp)) \
                                .group_by('CUSTOMER_ID').agg(F.max('TIMESTAMP').as_('TIMESTAMP'))
    
    
    dataset = fs.generate_dataset( name = name, version='v1',
                                            spine_df = spine_sdf, features = [fv_uc01_preprocess], 
                                            spine_timestamp_col = 'TIMESTAMP'
                                            )                                     
    # Create a snowpark dataframe reference from the Dataset
    dataset_sdf = dataset.read.to_snowpark_dataframe()
    
    decimal_columns = [ field.name for field in dataset_sdf.schema.fields
            if isinstance(field.datatype, numeric_types)]
    
    for column_name in decimal_columns:
        dataset_sdf = dataset_sdf.with_column(
            column_name,
            F.col(column_name).cast("float")  # or "float" or DoubleType()
        )
    
    return dataset_sdf

In [None]:
training_dataset_sdf = fs_generate_dataset(fs, fv_uc01_preprocess, 'UC01_TRAINING_INITIAL', timestamp_training)
testing_dataset_sdf = fs_generate_dataset(fs, fv_uc01_preprocess, 'UC01_VALIDATION', timestamp_testing)

## Training

We are going to explore first different ways to train models within Snowflake and the Container Runtime, which also offers new capabilities for:
- optimized data ingestion: https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml#optimized-data-loading
- Distributed Model Training: https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-runtime-ml#optimized-training

In [None]:
## Classic Snowflake ML using ml.modeling libraries

from snowflake.ml.modeling.preprocessing import StandardScaler as sml_StandardScaler
from snowflake.ml.modeling.preprocessing import OrdinalEncoder as sml_OrdinalEncoder
from snowflake.ml.modeling.pipeline import Pipeline as sml_Pipeline
from snowflake.ml.modeling import metrics as snowml_metrics
from snowflake.ml.modeling.xgboost import XGBClassifier

from snowflake.snowpark.types import DecimalType, FloatType, IntegerType, DoubleType, LongType
numeric_types = (DecimalType, FloatType, IntegerType, DoubleType, LongType)

def uc01_train(feature_df):

    decimal_columns = [ field.name for field in feature_df.schema.fields
            if isinstance(field.datatype, numeric_types)]

    for column_name in decimal_columns:
        feature_df = feature_df.with_column(
            column_name,
            F.col(column_name).cast("float")  # or "float" or DoubleType()
            )
    
    train_df, testing_df = feature_df.random_split(weights=[0.8, 0.2], seed=111)
    
    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']

    ss_input_numerical_cols = [
        "AGE",
        "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
        "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
    
    ss_output_numerical_cols = [col + "_SS" for col in ss_input_numerical_cols]
    
    label = ['CHURNED']
    output_label = ['CHURNED_PRED']

    input_columns = oe_input_cols + ss_input_numerical_cols + label
    output_columnss = oe_output_cols + ss_output_numerical_cols
    
    pipeline_purchases = sml_Pipeline(
        steps=[ ("OE",
                    sml_OrdinalEncoder(
                        input_cols=oe_input_cols,
                        output_cols=oe_output_cols)),
                ("SS",
                    sml_StandardScaler(
                        input_cols=ss_input_numerical_cols,
                        output_cols=ss_output_numerical_cols)),
                ("XGB",
                    XGBClassifier(
                        input_cols=output_columnss,
                        label_cols=label,
                        output_cols=output_label))
                ])
 
    pipeline_purchases.fit(train_df.select(input_columns))

    predictions = pipeline_purchases.predict(train_df)

    
    train_f1_score = snowml_metrics.f1_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   

    predictions = pipeline_purchases.predict(testing_df)
    
    test_f1_score = snowml_metrics.f1_score (df=predictions, 
                                                    y_true_col_names=['CHURNED'],
                                                    y_pred_col_names=['CHURNED_PRED'])   

    
    return {'MODEL': pipeline_purchases,
            'train_f1_score': train_f1_score ,
            'test_f1_score': test_f1_score
           }
     

In [None]:
trained_model = uc01_train(training_dataset_sdf)


In [None]:
trained_model


In [None]:
# As we have included the pre-preprocesing + Model within the pipeline, we can call predict
# using a different dataset (this is for the next month that the model has not seen yet)
model= trained_model['MODEL']

predictions = model.predict(testing_dataset_sdf)

testing_f1_score = snowml_metrics.f1_score (df=predictions, 
                                                y_true_col_names=['CHURNED'],
                                                y_pred_col_names=['CHURNED_PRED'])   

print (f'testing_f1_score = {testing_f1_score}')

In [None]:
# As we are performing the training using CPUs or GPUs (depending
# where we are running this notebook), we are going to use just OSS packages
# given that the datasets are not big and will fit well in our compute resourdes

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score
from xgboost import XGBClassifier

def uc01_oss_train(feature_df):

    decimal_columns = [ field.name for field in feature_df.schema.fields
            if isinstance(field.datatype, numeric_types)]

    for column_name in decimal_columns:
        feature_df = feature_df.with_column(
            column_name,
            F.col(column_name).cast("float")  # or "float" or DoubleType()
            )

    feature_df = feature_df.to_pandas()
    
    # Split data into train and test sets
    train_df, testing_df = train_test_split(feature_df, test_size=0.2, random_state=111)
    
    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']

    ss_input_numerical_cols = [
        "AGE",
        "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
        "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
      
    label = ['CHURNED']
 
    # Create the preprocessing and model pipeline
    preprocessor = ColumnTransformer(
        transformers=[
            ("ordinal", OrdinalEncoder(), oe_input_cols),
            ("scaler", StandardScaler(), ss_input_numerical_cols)
        ]
    )

    # Create the model pipeline
    pipeline_purchases = Pipeline(
        steps=[ 
            ("preprocessor", preprocessor),
            ("model", XGBClassifier())
        ]
    )
    
    # Split the training data into X (features) and y (target)
    X_train = train_df[oe_input_cols + ss_input_numerical_cols]
    y_train = train_df[label]

    # Train the pipeline
    pipeline_purchases.fit(X_train, y_train)
    
    # Predictions on the training set
    train_predictions = pipeline_purchases.predict(X_train)
    
    # Compute training F1 score
    train_f1_score = f1_score(y_train, train_predictions)

    # Split the test data into X (features) and y (target)
    X_test = testing_df[oe_input_cols + ss_input_numerical_cols]
    y_test = testing_df[label]
    
    # Predictions on the testing set
    test_predictions = pipeline_purchases.predict(X_test)
    
    # Compute test F1 score
    test_f1_score = f1_score(y_test, test_predictions)
    
    return {
        'MODEL': pipeline_purchases,
        'train_f1_score': train_f1_score,
        'test_f1_score': test_f1_score
    }


In [None]:
trained_oss_model = uc01_oss_train(training_dataset_sdf)


In [None]:
trained_oss_model

In [None]:
# As we have included the pre-preprocesing + Model within the pipeline, we can call predict
# using a different dataset (this is for the next month that the model has not seen yet)
model= trained_oss_model['MODEL']

df = testing_dataset_sdf.to_pandas()

val_predictions = model.predict(df)

label = ['CHURNED']

y_val = df[label]
 
# Compute validation F1 score
testing_f1_score = f1_score(y_val, val_predictions)

print (f'testing_f1_score = {testing_f1_score}')

### Parallel Hyperparameter Optimization (HPO) on Container Runtime for ML

Let's explore how we can run HPO in the Container Runtime
https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-hpo


In [None]:
# Here we are going to separate the pipeline pre-processing from model training:

from snowflake.ml.modeling.preprocessing import StandardScaler as sml_StandardScaler
from snowflake.ml.modeling.preprocessing import OrdinalEncoder as sml_OrdinalEncoder
from snowflake.ml.modeling.pipeline import Pipeline as sml_Pipeline
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling import tune
from snowflake.ml.modeling.tune import get_tuner_context
import xgboost as xgb
from entities import search_algorithm
from sklearn.metrics import f1_score


import joblib

def uc01_hpo_train(feature_df):

    decimal_columns = [ field.name for field in feature_df.schema.fields
            if isinstance(field.datatype, numeric_types)]

    for column_name in decimal_columns:
        feature_df = feature_df.with_column(
            column_name,
            F.col(column_name).cast("float")  # or "float" or DoubleType()
            )
    
    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']

    ss_input_numerical_cols = [
        "AGE",
        "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
        "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
    
    ss_output_numerical_cols = [col + "_SS" for col in ss_input_numerical_cols]
    
    label = ['CHURNED']
    output_label = ['CHURNED_PRED']

    input_columns = oe_input_cols + ss_input_numerical_cols + label
    output_columns = oe_output_cols + ss_output_numerical_cols
    
    preprocessing_pipeline = sml_Pipeline(
        steps=[ ("OE",
                sml_OrdinalEncoder(
                    input_cols=oe_input_cols,
                    output_cols=oe_output_cols,
                    drop_input_cols = True)),
            ("SS",
                sml_StandardScaler(
                    input_cols=ss_input_numerical_cols,
                    output_cols=ss_output_numerical_cols,
                    drop_input_cols = True))
            ])            

    
    train_df, testing_df = feature_df.select(input_columns).random_split(weights=[0.8, 0.2], seed=111)
    
    PIPELINE_FILE = f'/tmp/preprocessing_pipeline.joblib'
    joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first
    training_spdf = preprocessing_pipeline.fit(train_df).transform(train_df)
    testing_spdf = preprocessing_pipeline.fit(testing_df).transform(testing_df)

    session.file.put(PIPELINE_FILE, "@ML_STAGE", overwrite=True)

    dataset_map = {
        "x_train": DataConnector.from_dataframe(training_spdf.drop("CHURNED")),
        "y_train": DataConnector.from_dataframe(training_spdf.select("CHURNED")),
        "x_test": DataConnector.from_dataframe(testing_spdf.drop("CHURNED")),
        "y_test": DataConnector.from_dataframe(testing_spdf.select("CHURNED")),
    }

    def train_func():
        tuner_context = get_tuner_context()
        config = tuner_context.get_hyper_params()
        dm = tuner_context.get_dataset_map()
        model = xgb.XGBClassifier(
   #         **{k: int(v) if k != "learning_rate" else v for k, v in config.items()},
            random_state=42,
        )
        model.fit(dm["x_train"].to_pandas(), dm["y_train"].to_pandas())
        test_f1_score = f1_score(
            dm["y_test"].to_pandas(), model.predict(dm["x_test"].to_pandas())
        )
        train_f1_score = f1_score(
            dm["y_train"].to_pandas(), model.predict(dm["x_train"].to_pandas())
        )
         
        tuner_context.report(metrics={"test_f1_score": test_f1_score,
                                     "train_f1_score": train_f1_score}, model=model)

    
    tuner = tune.Tuner(
        train_func=train_func,
        search_space={
            "n_estimators": tune.uniform(50, 200),
            "max_depth": tune.uniform(3, 10),
            "learning_rate": tune.uniform(0.01, 0.3),
        },
        tuner_config=tune.TunerConfig(
            metric="test_f1_score",
            mode="max",
            search_alg=search_algorithm.BayesOpt(),
            num_trials=2,
            max_concurrent_trials=1,
        ),
    )

    tuner_results = tuner.run(dataset_map=dataset_map)
    
    return {'MODEL': tuner_results.best_model,
            'train_f1_score': tuner_results.best_result['train_f1_score'] ,
            'test_f1_score': tuner_results.best_result['test_f1_score'] 
           }
    
  

In [None]:
hpo_trained_model_results = uc01_hpo_train(training_dataset_sdf)


In [None]:
hpo_trained_model_results
print (hpo_trained_model_results["test_f1_score"][0])

Lets see what is the performance on the validation data. As we have separate pipeline transormations, we are going to define a inference function that perform the transformations first

In [None]:
def inference_hpo (session, model, df):

    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']

    ss_input_numerical_cols = [
        "AGE",
        "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
        "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
    
    ss_output_numerical_cols = [col + "_SS" for col in ss_input_numerical_cols]
    
    label = ['CHURNED']
    output_label = ['CHURNED_PRED']

    input_columns = oe_input_cols + ss_input_numerical_cols + label
    output_columns = oe_output_cols + ss_output_numerical_cols

    
    session.file.get("@ML_STAGE/preprocessing_pipeline.joblib.gz", '/tmp')

    pipeline_file_name = f'/tmp/preprocessing_pipeline.joblib'

    preprocesing_pipeline = joblib.load(pipeline_file_name)

    df = df.select(input_columns)
    
    preprocessed_df = preprocesing_pipeline.fit(df).transform(df)


    dm = {
        "x_val": DataConnector.from_dataframe(preprocessed_df.drop("CHURNED")),
        "y_val": DataConnector.from_dataframe(preprocessed_df.select("CHURNED")),
    }
    
    val_f1_score = f1_score(
            dm["y_val"].to_pandas(), model.predict(dm["x_val"].to_pandas())
    )

    return val_f1_score


In [None]:
inference_hpo (session, hpo_trained_model_results["MODEL"], testing_dataset_sdf)

## Distriburted Modeling Training
Now let's explore how we can run Distributed Model Training for XGBoost

In [None]:
from snowflake.ml.modeling.preprocessing import StandardScaler as sml_StandardScaler
from snowflake.ml.modeling.preprocessing import OrdinalEncoder as sml_OrdinalEncoder
from snowflake.ml.modeling.pipeline import Pipeline as sml_Pipeline
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from sklearn.metrics import f1_score

import joblib


from snowflake.snowpark.types import DecimalType, FloatType, IntegerType, DoubleType, LongType
numeric_types = (DecimalType, FloatType, IntegerType, DoubleType, LongType)


def uc01_dist_train(feature_df):

    decimal_columns = [ field.name for field in feature_df.schema.fields
            if isinstance(field.datatype, numeric_types)]

    for column_name in decimal_columns:
        feature_df = feature_df.with_column(
            column_name,
            F.col(column_name).cast("float")  # or "float" or DoubleType()
            )
    
    oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']
    oe_output_cols = ['GENDER_OE', 'LOCATION_OE', 'CUSTOMER_SEGMENT_OE']

    ss_input_numerical_cols = [
        "AGE",
        "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
        "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
        "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
        "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
    ]
    
    ss_output_numerical_cols = [col + "_SS" for col in ss_input_numerical_cols]
    
    label = ['CHURNED']
    output_label = ['CHURNED_PRED']

    input_columns = oe_input_cols + ss_input_numerical_cols + label
    output_columns = oe_output_cols + ss_output_numerical_cols
    
    preprocessing_pipeline = sml_Pipeline(
        steps=[ ("OE",
                sml_OrdinalEncoder(
                    input_cols=oe_input_cols,
                    output_cols=oe_output_cols,
                    drop_input_cols = True)),
            ("SS",
                sml_StandardScaler(
                    input_cols=ss_input_numerical_cols,
                    output_cols=ss_output_numerical_cols,
                    drop_input_cols = True))
            ])            

    
    train_df, testing_df = feature_df.select(input_columns).random_split(weights=[0.8, 0.2], seed=111)

    
    PIPELINE_FILE = f'/tmp/preprocessing_pipeline.joblib'
    joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first
    training_spdf = preprocessing_pipeline.fit(train_df).transform(train_df)
    testing_spdf = preprocessing_pipeline.fit(testing_df).transform(testing_df)

    
    session.file.put(PIPELINE_FILE, "@ML_STAGE", overwrite=True)   

    dm = {
        "x_train": DataConnector.from_dataframe(training_spdf),
        "y_train": DataConnector.from_dataframe(training_spdf.select("CHURNED")),
        "x_test": DataConnector.from_dataframe(testing_spdf),
        "y_test": DataConnector.from_dataframe(testing_spdf.select("CHURNED")),
    }

    dist_gxb = XGBEstimator(
        objective = 'binary:logistic'
    )

    dist_gxb.fit(dm["x_train"],
                 input_cols=output_columns + label,
                 label_col=label[0]) 


    predict_test = dist_gxb.predict(dm["x_test"].to_pandas())
    predict_train = dist_gxb.predict(dm["x_train"].to_pandas())

    y_predict_test = (predict_test > 0.5).astype(int)
    y_predict_train = (predict_train > 0.5).astype(int)
    
    test_f1_score = f1_score(
            dm["y_test"].to_pandas(), y_predict_test
    )

    train_f1_score = f1_score(
            dm["y_train"].to_pandas(), y_predict_train
    )

    
    return {'MODEL': dist_gxb,
            'train_f1_score': train_f1_score ,
            'test_f1_score': test_f1_score
           }



In [None]:
dist_model_trained = uc01_dist_train(training_dataset_sdf)



In [None]:
dist_model_trained

## Model Registry

Register some of the models we have being using

In [None]:
from snowflake.ml.model import type_hints

model_logged = mr.log_model(model= trained_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "v0",
                #conda_dependencies=["snowflake-ml-python"],
                sample_input_data = training_dataset_sdf.limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

model_logged.set_metric(metric_name="train_f1_score", value=trained_model['train_f1_score'])
model_logged.set_metric(metric_name="test_f1_score", value=trained_model['train_f1_score'])

In [None]:
from snowflake.ml.model import type_hints

oe_input_cols = ['GENDER', 'LOCATION', 'CUSTOMER_SEGMENT']

ss_input_numerical_cols = [
    "AGE",
    "SENTIMENT_MIN_2", "SENTIMENT_MIN_3", "SENTIMENT_MIN_4", "SENTIMENT_AVG_2",
    "SENTIMENT_AVG_3", "SENTIMENT_AVG_4",
    "SUM_TOTAL_AMOUNT_PAST_7D", "SUM_TOTAL_AMOUNT_PAST_1MM", "SUM_TOTAL_AMOUNT_PAST_2MM", "SUM_TOTAL_AMOUNT_PAST_3MM",
    "COUNT_ORDERS_PAST_7D", "COUNT_ORDERS_PAST_1MM", "COUNT_ORDERS_PAST_2MM", "COUNT_ORDERS_PAST_3MM"
]
columns = oe_input_cols + ss_input_numerical_cols

model_oss_logged = mr.log_model(model= trained_oss_model['MODEL'],
                model_name= "ChurnDetector",
                version_name= "base",
                conda_dependencies=["snowflake-ml-python", "xgboost", "scikit-learn"],
                sample_input_data = training_dataset_sdf.select(columns).limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )

model_oss_logged.set_metric(metric_name="train_f1_score", value=trained_oss_model['train_f1_score'])
model_oss_logged.set_metric(metric_name="test_f1_score", value=trained_oss_model['train_f1_score'])



In [None]:
sc = session.get_current_schema()

session.use_schema(mr_schema)

session.sql('ALTER MODEL ChurnDetector SET DEFAULT_VERSION = base;').collect()

session.use_schema(sc)

In [None]:
---These are the tables that will be used for model monitoring:
--- Cleanup to start fresh

drop table if exists customer_churn_baseline_predicted;
drop table if exists customer_churn_predicted;
drop table if exists customer_churn_predicted_prod;
drop table if exists customer_churn_predicted_proba;

In [None]:
# Create the predictions tables. The one for the baseline and the one where we keep adding predicitions

tables = ["customer_churn_baseline_predicted", "CUSTOMER_CHURN_PREDICTED_PROD2"]

for table in tables:
    sql_cmd = f"""       
        create or replace TABLE {table} (
        	CUSTOMER_ID VARCHAR(16777216),
        	TIMESTAMP TIMESTAMP_NTZ(9),
        	GENDER VARCHAR(16777216),
        	LOCATION VARCHAR(16777216),
        	CUSTOMER_SEGMENT VARCHAR(16777216),
        	LAST_PURCHASE_DATE DATE,
        	NEXT_TRANSACTION_DATE DATE,
        	AGE FLOAT,
        	SENTIMENT_MIN_2 FLOAT,
        	SENTIMENT_MIN_3 FLOAT,
        	SENTIMENT_MIN_4 FLOAT,
        	SENTIMENT_AVG_2 FLOAT,
        	SENTIMENT_AVG_3 FLOAT,
        	SENTIMENT_AVG_4 FLOAT,
        	SUM_TOTAL_AMOUNT_PAST_7D FLOAT,
        	SUM_TOTAL_AMOUNT_PAST_1MM FLOAT,
        	SUM_TOTAL_AMOUNT_PAST_2MM FLOAT,
        	SUM_TOTAL_AMOUNT_PAST_3MM FLOAT,
        	COUNT_ORDERS_PAST_7D FLOAT,
        	COUNT_ORDERS_PAST_1MM FLOAT,
        	COUNT_ORDERS_PAST_2MM FLOAT,
        	COUNT_ORDERS_PAST_3MM FLOAT,
        	DAYS_SINCE_LAST_PURCHASE FLOAT,
        	CHURNED FLOAT,
        	CHURNED_PRED_PROD FLOAT,
            CHURNED_PRED_BASE FLOAT,
            CHURNED_PRED_RETRAIN FLOAT,
            CHURNED_PRED_PROBABILITY FLOAT,
        	VERSION_NAME VARCHAR(50)
        );
        """
    session.sql(sql_cmd).collect()

In [None]:
def inference_oss (model, inference_df, output_table, col_name, is_prod):
    
    predictions = model.run(inference_df, function_name="predict")

    predictions = predictions.select([F.col(c).alias(c.replace('"', '')) for c in predictions.columns])

    predictions_out = predictions.rename("output_feature_0", col_name)

    model_version = model.version_name
    predictions_out = predictions_out.with_column("version_name", F.lit(model_version))
    
    predictions_out.write.mode("overwrite").save_as_table('temp_predictions')

    # Get full schema of output table
    output_columns = [field.name for field in session.table(output_table).schema]

    # Insert clause: match columns from temp_predictions, else use NULL
    insert_columns = ", ".join(output_columns)
    insert_values = ", ".join([
        f"t.{col}" if col in predictions_out.columns else "NULL" for col in output_columns
    ])

   # Merge statement: update col_name if record exists, insert all columns if not
    merge_statement = f"""
        MERGE INTO {output_table} o
        USING temp_predictions t
        ON o.CUSTOMER_ID = t.CUSTOMER_ID AND o.TIMESTAMP = t.TIMESTAMP
        WHEN MATCHED THEN
            UPDATE SET o.{col_name} = t.{col_name},
                       o.version_name = t.version_name
        WHEN NOT MATCHED THEN
            INSERT ({insert_columns})
            VALUES ({insert_values})
    """
    
    session.sql(merge_statement).collect()    
    print (f'Predictions added to table {output_table} ')

    

In [None]:
select * from CUSTOMER_CHURN_PREDICTED_PROD2 limit 5;

In [None]:
## Feed the predictiosn table with the data we already have.

base_model = mr.get_model("ChurnDetector").version("base")

inference_oss(base_model, training_dataset_sdf, 'customer_churn_baseline_predicted', 'CHURNED_PRED_BASE', True)
inference_oss(base_model, testing_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_PROD', True)
inference_oss(base_model, testing_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_BASE', True)
inference_oss(base_model, testing_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_RETRAIN', True)


In [None]:
select * from CUSTOMER_CHURN_PREDICTED_PROD2 limit 5;

## Model Monitors

In [None]:
## Monitor definition for the predictions done on customer_churn_predicted table
## Associated to first base model 

sc = session.get_current_schema()

session.use_schema(mr_schema)

cmd_sql = f"""
CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_Base
WITH
    MODEL=ChurnDetector
    VERSION=base
    FUNCTION=predict
    SOURCE={sc}.CUSTOMER_CHURN_PREDICTED_PROD2
    BASELINE={sc}.customer_churn_baseline_predicted
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED_BASE)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""

session.sql(cmd_sql).collect()

session.use_schema(sc)


In [None]:
fake_prod_model = mr.get_model("ChurnDetector").version("base")

fake_prod_logged = mr.log_model(model= fake_prod_model,
                        model_name= "ChurnDetector",
                        version_name= "PRODMONITOR",
                        #conda_dependencies=["snowflake-ml-python"],
                        sample_input_data = training_dataset_sdf.select(columns).limit(100),
                        #options={"relax_version": False, "enable_explainability": True},
                        task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                        comment="Model to detect what customers will not buy again"
                        )


sc = session.get_current_schema()

session.use_schema(mr_schema)

cmd_sql = f"""
CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_Prod
WITH
    MODEL=ChurnDetector
    VERSION=PRODMONITOR
    FUNCTION=predict
    SOURCE={sc}.CUSTOMER_CHURN_PREDICTED_PROD2
    BASELINE={sc}.customer_churn_baseline_predicted
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED_PROD)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""

session.sql(cmd_sql).collect()

session.use_schema(sc)

In [None]:
fake_retrain_model = mr.get_model("ChurnDetector").version("base")

fake_retrain_logged = mr.log_model(model= fake_retrain_model,
                        model_name= "ChurnDetector",
                        version_name= "RETRAIN",
                        #conda_dependencies=["snowflake-ml-python"],
                        sample_input_data = training_dataset_sdf.select(columns).limit(100),
                        #options={"relax_version": False, "enable_explainability": True},
                        task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                        comment="Model to detect what customers will not buy again"
                        )

sc = session.get_current_schema()

session.use_schema(mr_schema)

cmd_sql = f"""
CREATE OR REPLACE MODEL MONITOR Monitor_ChurnDetector_Retrain
WITH
    MODEL=ChurnDetector
    VERSION=RETRAIN
    FUNCTION=predict
    SOURCE={sc}.CUSTOMER_CHURN_PREDICTED_PROD2
    BASELINE={sc}.customer_churn_baseline_predicted
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(CHURNED_PRED_RETRAIN)  
    ACTUAL_CLASS_COLUMNS=(CHURNED)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""

session.sql(cmd_sql).collect()

session.use_schema(sc)

In [None]:
def get_last_labeled_f1_score(model_monitor):
    
    session.sql(f'use schema {SCHEMA} ').collect()
    
    timestamps = session.table(table_features_labeled).select("TIMESTAMP").distinct().sort("TIMESTAMP").collect()
    
    timestamp_1 = timestamps[-2]["TIMESTAMP"]
        
    session.sql(f'use schema {mr_schema} ').collect()
 
    sql_cmd = f"""
        SELECT * FROM TABLE(MODEL_MONITOR_PERFORMANCE_METRIC(
        '{model_monitor}', 'F1_SCORE', '1 DAY', '{timestamp_1}', '{timestamp_1}'));
    """
    output = session.sql(sql_cmd).collect()
    
    if not output or output[0]["METRIC_VALUE"] is None:
        print("No metric data returned for given timestamp. Probably too early.")
        session.sql(f'use schema {SCHEMA} ').collect()

        return 1  # If we have no info, let's do not force re-train for now...
        
    metric = output[0]["METRIC_VALUE"]
    
    session.sql(f'use schema {SCHEMA} ').collect()

    return metric



In [None]:
from snowflake.ml import dataset

def get_inference_dataset():

    ts_inference_tb = session.table(table_features_labeled).select("TIMESTAMP").distinct().sort("TIMESTAMP").collect()
    
    ts_inference = ts_inference_tb[-2]["TIMESTAMP"]

    date_name = "v_" + str(ts_inference.date()).replace("-", "_")
    ds_name = f'{fs_schema}.UC01_DATASET_{date_name}'

    inference_dataset = dataset.load_dataset(session, ds_name, 'v1')

    inference_dataset_sdf = inference_dataset.read.to_snowpark_dataframe()

    return inference_dataset_sdf


In [None]:
def set_default_model(name):

    sc = session.get_current_schema()
    session.use_schema(mr_schema)

    session.sql(f'ALTER MODEL ChurnDetector SET DEFAULT_VERSION = {name};').collect()

    session.use_schema(sc)

    print (f"Setting default PROD Model Version to {name} ")

In [None]:
#Loading all other moths, predict using first model and re-train the model

from snowflake.snowpark import functions as F
from datetime import datetime
import time

stage_name = "DATASET.CSV"
churn_window = 30 ## This is the value we define as churn
table_features = 'churn_baseline'
table_features_labeled = 'churn_baseline_labeled'


def process_one_month (db, sc, fs, fv, mr, columns):
    
    t1 = datetime.now()

    # STEP 1: COPY NEXT SALES AND FEEDBACK MONTH
    print ("######################################################################")
    if (copy_next_file (session, db, sc) == False):
        print ('No more data to be ingested')
        return False
    
    # STEP 2: PROCESS THE FEEDBACK RECEIVED
    print ("Processing Sentiment")
    process_sentiment()
    
    t2 = datetime.now()
    print (f'STEP 1, 2: File ingested and sentiment calculated: {t2 - t1}')
    
    # STEP 3: CALCULATE FEATURES FOR LATEST TIMESTAMP (DATA JUST INGESTED)
    sales_df = session.table("sales") 
    
    #calculate features for the latest transaction timestamp
    latest_transaction = sales_df.select(F.max(F.col("transaction_date"))).collect()[0][0]
    
    session.call('UTILS.uc01_feature_engineering_sproc', db, sc, latest_transaction, table_features)
    
    t3 = datetime.now()
    print (f'STEP 3: Calculated features for timestamp : {latest_transaction}, {t3 - t2} ')
       
    # STEP 4: SET RIGHT CHURN LABEL BASED ON NEW SALES DATA
    
    session.call('UTILS.uc_01_label_churn_sproc', db, sc, table_features, table_features_labeled, 30 )

    t4 = datetime.now()
    print (f'STEP 4: Added real labels for churn  window: {churn_window}, {t4 - t3}  ')

    # STEP 5: INFERENCE ON THE NEW DATA. FIRST LETS USE THE BASE MODEL
    
    # Get the latest timestamp from the features table
    churn_baseline_labeled_df = session.table(table_features_labeled)
                      
    latest_feature_timestamp = churn_baseline_labeled_df.select(F.max(F.col("timestamp"))).collect()[0][0]

    print (f'latest feature timestamp:{latest_feature_timestamp} ')

    date_name = "v_" + str(latest_feature_timestamp.date()).replace("-", "_")
    ds_name = f'UC01_DATASET_{date_name}'
    
    new_dataset_sdf = fs_generate_dataset(fs, fv, ds_name, latest_feature_timestamp)

    t5 = datetime.now()
    print (f'STEP 5: Data set generated for timestamp: {latest_feature_timestamp}, {t5 - t4} ')

    # STEP 6: RUN INFERENCE USING THE BASE MODEL ON NEW DATA
    ## Run inference using the first model training just for demo purposes so we can compare using the Monitors
    
    base_model = mr.get_model("ChurnDetector").version("base")
    
    #After inference, let's update the previous labels as now we know if there were more sales    
    inference_oss(base_model, new_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_BASE', False)
    
    # Update the predictions for the Model Monitor
    session.call('UTILS.uc01_update_label_churn_sproc', db, sc, 'CUSTOMER_CHURN_PREDICTED_PROD2', churn_window )

    t6 = datetime.now()
    print (f'STEP 6: Inference run using Base model {t6 - t5} ')


    # Wait at least 1 min for the monitor to refresh statistics
    print ('--- Waiting 70 seconds for Monitors to be updated')
    time.sleep(70)
    
    t6 = datetime.now()
   
    # STEP 7: NOW THAT WE HAVE THE LABELS OF THE PREVIOUS PERIOD, WE CAN CHECK THE MODEL MONITOR FOR PERFORMANCE

    f1_score_last_period_base = get_last_labeled_f1_score('Monitor_ChurnDetector_Base')
    f1_score_last_period_prod = get_last_labeled_f1_score('Monitor_ChurnDetector_Prod')
    f1_score_last_period_retrain = 0 # do not use unless we have to retrain
    
    t7 = datetime.now()
    print (f'Got Monitor Statistics,   {t7 - t6}')
    
    print (f"BASE Model: f1_score for previous period is: {f1_score_last_period_base} ")
    print (f"PROD Model: f1_score for previous period is: {f1_score_last_period_prod} ")


    # STEP 7: DECIDE IF WE TRAIN ANOTHER MODEL

    # If the base monitor is less than 
    if (f1_score_last_period_prod < 0.8):
    
        print ('Training new model because performance drop')
        
        new_trained_model = uc01_oss_train(new_dataset_sdf)
    
        print ("Inferencing with the re-trained model")
    
        model_logged = mr.log_model(model= new_trained_model['MODEL'],
                        model_name= "ChurnDetector",
                        version_name= date_name,
                        conda_dependencies=["snowflake-ml-python", "xgboost", "scikit-learn"],
                        sample_input_data = new_dataset_sdf.select(columns).limit(100),
                        #options={"relax_version": False, "enable_explainability": True},
                        task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                        comment="Model to detect what customers will not buy again"
                        )
        
        model_logged.set_metric(metric_name="train_f1_score", value=new_trained_model['train_f1_score'])
        model_logged.set_metric(metric_name="test_f1_score", value=new_trained_model['test_f1_score'])

        t8 = datetime.now()

        print (f'STEP 7(Op): Trained new model {date_name},  {t8 - t7} ')
        print (f"train_f1_score: {new_trained_model['train_f1_score']} ")
        print (f"test_f1_score: {new_trained_model['test_f1_score']} ")
    
        # We have to see how this model works on the previous dataset ()

        prev_inference_dataset_sdf = get_inference_dataset()
        
        trained_model = mr.get_model("ChurnDetector").version(date_name)

        inference_oss(trained_model, prev_inference_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_RETRAIN', True)     
        print ('--- Waiting 70 seconds for Monitors to be updated')
        time.sleep(70)
        
        f1_score_last_period_retrain = get_last_labeled_f1_score('Monitor_ChurnDetector_Retrain')
        print (f"NEW Model {date_name} : f1_score for previous period is: {f1_score_last_period_retrain} ")

    else:
        print ("STEP 7: No need for training a new model")

    ## STEP 8: Decide what is the model that best performance on the :

    # If base model is better than Prod and Retrain model we set it as default

    if (f1_score_last_period_base > f1_score_last_period_prod) & (f1_score_last_period_base > f1_score_last_period_retrain):
        set_default_model ('base')
    ## if new model is better than current and base, set it as default
    elif (f1_score_last_period_retrain > f1_score_last_period_base) & (f1_score_last_period_retrain > f1_score_last_period_prod):
        set_default_model (date_name)

    t9 = datetime.now()

    # Get whatever is the PROD default model to run next inference on the new arrived data
    
    prod_model = mr.get_model("ChurnDetector").default

    print (f"Running Inference for Prod Label with model {prod_model.version_name}")
    
    inference_oss(prod_model, new_dataset_sdf, 'CUSTOMER_CHURN_PREDICTED_PROD2', 'CHURNED_PRED_PROD', True)
    #After inference, let's update the previous labels as now we know if there were more sales
    #session.call('UTILS.uc01_update_label_churn_sproc', db, sc, 'CUSTOMER_CHURN_PREDICTED_PROD2', churn_window )

    t10 = datetime.now()
    print (f'Inference done using {prod_model.version_name} Model, {t10 - t9}')

    return True


In [None]:
# columns were defined at the very beginnign and are the columns we are using for training. 

db = session.get_current_database()
sc = session.get_current_schema()

print (f'database: {db}, schema: {sc}')
process_one_month(db, sc, fs, fv_uc01_preprocess, mr,  columns)


In [None]:

db = session.get_current_database()
sc = session.get_current_schema()

print (f'database: {db}, schema: {sc}')

ret = True

while (ret == True):
    ret = process_one_month(db, sc, fs, fv_uc01_preprocess, mr,  columns)


In [None]:
select DISTINCT(TIMESTAMP) as TS, VERSION_NAME from CUSTOMER_CHURN_PREDICTED_PROD2
order by TS ASC;

### Add Data Drift
We are going to add new sales where we have changes in locations and customer_segements


In [None]:

sql_cmd = f"""
    COPY INTO {db}.{sc}.customers  
            FROM @{stage_name}/new_customers.csv  
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"')  
            ON_ERROR = 'CONTINUE'; 
"""

session.sql(sql_cmd).collect()

In [None]:
new_sales_month_files = get_year_month_files(session, stage_name, 'new_sales')
insert_files ('sales', db, sc, new_sales_month_files)

new_feedback_sentiment_month_files = get_year_month_files(session, stage_name, 'new_feedback_raw2')
insert_files ('feedback_raw',db, sc, new_feedback_sentiment_month_files)


In [None]:
select * from FILES_INGESTED where INGESTED = False;

In [None]:
## Add the new files:
db = session.get_current_database()
sc = session.get_current_schema()

print (f'database: {db}, schema: {sc}')

ret = True

while (ret == True):
    ret = process_one_month(db, sc, fs, fv_uc01_preprocess, mr,  columns)

In [None]:
select DISTINCT(TIMESTAMP) as TS, VERSION_NAME from CUSTOMER_CHURN_PREDICTED_PROD2
order by TS ASC;

In [None]:
f1_score_last_period_base = get_last_labeled_f1_score('Monitor_ChurnDetector_Prod')
print (f1_score_last_period_base)


## Observability Metrics

We have used observability above to detect performance drops but let's take a look here too. Also, you can use Snowsight to see the model performance and compare with others


In [None]:
def get_model_metric(model_monitor, metric):
    
    session.sql(f'use schema {SCHEMA} ').collect()

    first_transaction = session.table('CUSTOMER_CHURN_PREDICTED_PROD2').select(F.min(F.col("TIMESTAMP"))).collect()[0][0] 
    last_transaction = session.table('CUSTOMER_CHURN_PREDICTED_PROD2').select(F.max(F.col("TIMESTAMP"))).collect()[0][0]

    
    print (f'Calculating {metric} from model {model_monitor} from timestamp: {first_transaction} to {last_transaction} ')
    
    session.sql(f'use schema {mr_schema} ').collect()
 
    sql_cmd = f"""
        SELECT * FROM TABLE(MODEL_MONITOR_PERFORMANCE_METRIC(
        '{model_monitor}', '{metric}', '1 DAY', '{first_transaction}', '{last_transaction}'));
    """
    output = session.sql(sql_cmd).collect()

    session.sql(f'use schema {SCHEMA} ').collect()
 
    return output

In [None]:
out = get_model_metric('Monitor_ChurnDetector_Prod', 'F1_SCORE')
out

In [None]:

out = get_model_metric('Monitor_ChurnDetector_Prod', 'CLASSIFICATION_ACCURACY')
out


In [None]:
select current_schema()

In [None]:

CREATE or replace TABLE ALERTS_NOTIFICATION(
    notification varchar (100),
    created_at timestamp);

In [None]:

def create_alert(model_monitor, metric, value):
    
    session.sql(f'use schema {mr_schema}').collect()

    alert_name = f'{model_monitor}_{metric}_alert'

    sql_cmd = f"""
        create or replace ALERT {alert_name}
        WAREHOUSE = COMPUTE_WH
        SCHEDULE = '1 minute' --- Note this is just for this demo so I can see the alert live
        IF (EXISTS (
            SELECT METRIC_VALUE
            FROM TABLE(
                MODEL_MONITOR_PERFORMANCE_METRIC(
                    '{model_monitor}', 
                    '{metric}', 
                    '1 DAY', 
                    (
                        SELECT ts 
                        FROM (
                            SELECT DISTINCT TIMESTAMP AS ts 
                            FROM {SCHEMA}.CUSTOMER_CHURN_PREDICTED_PROD2 
                            ORDER BY ts DESC 
                            LIMIT 2
                        ) AS sub 
                        ORDER BY ts 
                        LIMIT 1
                    ) , 
                    (
                        SELECT ts 
                        FROM (
                            SELECT DISTINCT TIMESTAMP AS ts 
                            FROM {SCHEMA}.CUSTOMER_CHURN_PREDICTED_PROD2 
                            ORDER BY ts DESC 
                            LIMIT 2
                        ) AS sub 
                        ORDER BY ts 
                        LIMIT 1
                    ) 
                )
            ) where METRIC_VALUE < {value})
         )
         THEN
            INSERT INTO {SCHEMA}.ALERTS_NOTIFICATION (notification, created_at) VALUES
            ('{metric} performance dropped less than {value} on monitor {model_monitor} ', (SELECT CURRENT_TIMESTAMP));
    """

    session.sql(sql_cmd).collect()

    session.sql(f'alter ALERT {alert_name} RESUME').collect()
    session.sql(f'execute ALERT {alert_name}').collect()

    session.sql(f'use schema {SCHEMA} ').collect()

    print (f'Created and Executed Alert {alert_name} ')

    return alert_name


In [None]:

alert_name = create_alert ('Monitor_ChurnDetector_Prod', 'F1_SCORE', 0.99)


In [None]:
select * from ALERTS_NOTIFICATION;
        

In [None]:
session.sql(f'use schema {mr_schema}').collect()

session.sql(f'alter alert {alert_name} suspend ').collect()

session.sql(f'use schema {SCHEMA} ').collect()


## Registering a Model within Snowpark Container Services for Online Inference

In [None]:

 CREATE COMPUTE POOL IF NOT EXISTS CPU_XS_INFERENCE2
    MIN_NODES = 1
    MAX_NODES = 2
    INSTANCE_FAMILY = 'CPU_X64_S'
    AUTO_RESUME = TRUE;

In [None]:
CREATE IMAGE REPOSITORY IF NOT EXISTS inference_images


In [None]:
db = session.get_current_database()
sc = session.get_current_schema()
print (f'database: {db}, schema: {sc}')

default_model = mr.get_model("ChurnDetector").default
img_repo = f'{db}.{sc}.inference_images'

reg_prod_model = mr.log_model(model= default_model,
                model_name= "ChurnDetector",
                version_name= "SPCS_INF",
                conda_dependencies=["snowflake-ml-python", "scikit-learn","xgboost"],
                sample_input_data = training_dataset_sdf.select(columns).limit(100),
                #options={"relax_version": False, "enable_explainability": True},
                task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION,
                comment="Model to detect what customers will not buy again"
                )


reg_prod_model.create_service(service_name="ChurnDetector_Prod",
                  service_compute_pool="CPU_XS_INFERENCE2",
                  image_repo=img_repo,
                  ingress_enabled=True,
                  gpu_requests=None)