## Building a Predictive Model with Snowpark ML

## 1. Importing Required Libraries:

Imports necessary libraries for data processing, machine learning, and Snowflake Snowpark operations. Suppresses warnings for better readability

In [None]:
# Import of Libraries
import warnings

import pandas as pd
import plotly.express as px
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.metrics import accuracy_score
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.preprocessing import LabelEncoder
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import types as T
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import col, when, lit, floor, to_date, dayofweek, dayofmonth, month, hour
from snowflake.snowpark.functions import col, when, lit, concat_ws, round as round_, log
from snowflake.ml.modeling.metrics import accuracy_score, roc_auc_score, f1_score

from snowflake.ml.modeling.pipeline import Pipeline
import io, joblib

warnings.simplefilter(action="ignore", category=UserWarning)

## 2. Importing the Snowflake ML Registry

Loads the Registry module, which enables storing, managing, and retrieving trained ML models in Snowflake.

In [4]:
from snowflake.ml.registry import Registry

## 3. Getting the Active Snowflake Session

Establishes an active Snowflake session, which is required for executing Snowpark operations.

In [6]:
session = get_active_session()

## 4. Loading Credit Limit features from Snowflake Table

Retrieves data from the CREDIT_LIMIT_MODEL_FEATURES table in Snowflake and loads it into a Snowpark DataFrame. The goal is to predict whetherthe credit limit for a customer can be increased or not. The show() function displays a sample of the dataset.

In [7]:
credit_limit_df = session.table("CREDIT_LIMIT")
credit_limit_df.show()

----------------------------------------------------------------------------------------------------------------------------------------
|"NAME"   |"MARITAL_STATUS"  |"DAYS_ACT_OPEN"  |"AGE"  |"INCOME"  |"ANY_PREVIOUS_DEFAULT"  |"GENDER"  |"OCCUPATION"  |"LOAN_APPROVAL"  |
----------------------------------------------------------------------------------------------------------------------------------------
|John     |Single            |2058             |28     |3000      |False                   |M         |Engineer      |1                |
|Mary     |Married           |2487             |34     |5000      |True                    |F         |Teacher       |0                |
|David    |Single            |1577             |22     |2000      |False                   |M         |Doctor        |1                |
|Sarah    |Married           |2896             |40     |8000      |False                   |F         |Engineer      |1                |
|Mike     |Single            |2349       

## 5. Separating Categorical and Continuous Variables

Group the categorical and continuous feature columns separately to facilitate preprocessing.

In [10]:
cat_cols = ['GENDER', 'EMPLOYMENT_STATUS','HAS_AUTO_PAY_SETUP']
cont_cols = ['AGE', 'ANNUAL_INCOME', 'YEARS_WITH_BANK', 'CREDIT_SCORE',
             'CURRENT_CREDIT_LIMIT',
             'UTILIZATION_RATE','MISSED_PAYMENTS_LAST_12_MONTHS',
             'AVG_PAYMENT_DELAY_DAYS','DEBT_TO_INCOME_RATIO', 'MONTHLY_SPENDING']

## 6. Handling Missing Values using Imputation

Uses SimpleImputer to replace missing values in categorical columns with the most frequently occurring value.

In [11]:
impute_cat = SimpleImputer(
    input_cols=cat_cols,
    output_cols=cat_cols,
    strategy="most_frequent",
    drop_input_cols=True,
)

credit_limit_df = impute_cat.fit(credit_limit_df).transform(credit_limit_df)
credit_limit_df.show()

Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "ANY_PREVIOUS_DEFAULT", Type: StringType(16777216), Input Value: False, Type: <class 'bool'>


----------------------------------------------------------------------------------------------------------------------------------------
|"GENDER"  |"MARITAL_STATUS"  |"ANY_PREVIOUS_DEFAULT"  |"OCCUPATION"  |"NAME"   |"DAYS_ACT_OPEN"  |"AGE"  |"INCOME"  |"LOAN_APPROVAL"  |
----------------------------------------------------------------------------------------------------------------------------------------
|M         |Single            |false                   |Engineer      |John     |2058             |28     |3000      |1                |
|F         |Married           |true                    |Teacher       |Mary     |2487             |34     |5000      |0                |
|M         |Single            |false                   |Doctor        |David    |1577             |22     |2000      |1                |
|F         |Married           |false                   |Engineer      |Sarah    |2896             |40     |8000      |1                |
|M         |Single            |true      

## 7. Label Encoding Categorical Variables

Converts categorical values into numerical representations using label encoding.

In [None]:
output_cat_cols = ['GENDER_LE', 'EMPLOYMENT_STATUS_LE', 'HAS_AUTO_PAY_SETUP_LE']
output_cat_cols

## 8. Saving the Preprocessing Pipeline for Inference

This step is needed to ensure consistency during inference, so the same transformations applied during training are reused later without retraining. It creates multiple label encoding steps for categorical columns, wraps them in a Snowpark Pipeline, fits and transforms the DataFrame, serializes the trained pipeline in memory, and uploads it to a Snowflake stage so it can be applied exactly the same way during future predictions.
 

In [None]:
# Creating a pipeline
# Create a list of label encoding steps for each categorical column, 
pipeline_steps = [
    (f"{input_col}_LE", LabelEncoder(input_cols=[input_col], output_cols=[output_col]))
    for input_col, output_col in zip(cat_cols, output_cat_cols)
]

# Define the preprocessing pipeline
preprocessing_pipeline = Pipeline(steps=pipeline_steps)


# Fit and transform the DataFrame
transformed_df = preprocessing_pipeline.fit(credit_limit_df).transform(credit_limit_df)

# Create an in-memory bytes buffer
buffer = io.BytesIO()
joblib.dump(preprocessing_pipeline, buffer)
buffer.seek(0)

## 9. Creating stage to store the joblib file created above for future use

In [None]:
CREATE OR REPLACE STAGE ML_STG;

In [None]:
# Upload the buffer to stage
session.file.put_stream(
buffer,
"@SNOWPARK_ML_DEMO.PUBLIC.ML_STG/preprocessing_pipeline.joblib",
overwrite=True
)

In [None]:
transformed_df.show()

## 10. Dropping Unused Columns

Let us remove the Categorical Columns as we have encoded columns and CUSTOMER_ID column, as it is irrelevant for predictive modeling.

In [None]:
transformed_df = transformed_df.drop(cat_cols)

## 11. Splitting Data into Training and Testing Sets

Split the dataset into an 80% training set and a 20% test set, using a fixed seed for reproducibility.

In [14]:
train_df, test_df = transformed_df.random_split(weights=[0.8, 0.2], seed=8)

## 12. Initializing a XGB Classifier and fitting it on training data
 
Create a XGBClassifier model and define input features, target label (from the table), and the desired output column name. The model is then fitted against the training dataset

In [None]:
unused_cols = ['ELIGIBLE_FOR_INCREASE', 'CUSTOMER_ID']

In [None]:
# XGBClassifier model
classifier = XGBClassifier(
    input_cols=train_df.drop(unused_cols).columns,
    label_cols="ELIGIBLE_FOR_INCREASE",
    output_cols="PRED_APPROVED"
)
classifier.fit(train_df)

## 13. Compute Predictions against test data

In [None]:
class_preds = classifier.predict(test_df)

In [None]:
class_preds

##  14. Compute Model Metrics

In [None]:
acc = accuracy_score(df=class_preds, y_true_col_names="ELIGIBLE_FOR_INCREASE", 
                     y_pred_col_names="PRED_APPROVED")
auc = roc_auc_score(df=class_preds, y_true_col_names="ELIGIBLE_FOR_INCREASE", 
                    y_score_col_names="PRED_APPROVED")
f1 = f1_score(df=class_preds, y_true_col_names="ELIGIBLE_FOR_INCREASE",
              y_pred_col_names="PRED_APPROVED")
acc, auc, f1

## 15. Setting up the Model Registry

To use a trained machine learning model for predictions within Snowflake, you must first register it in the Snowflake Model Registry. This secure repository allows you to manage your models and their associated information within Snowflake, regardless of the model's origin or type. Once registered, running inference on the model becomes straightforward.
The command below initializes a Snowflake ML model registry and prepares it to log the trained model.

In [17]:
# Create a registry to log the model
reg = Registry(session=session, database_name='SNOWPARK_ML_DEMO', 
               schema_name='PUBLIC' )

## 16. Logging the Model into Registry
Store the trained model in the Snowflake model registry as version V1.

In [None]:
# Logging our model in the Registry

# Define model name and version (use uppercase for name)
model_name = "CREDIT_LIMIT_MODEL"
model_version = 'V1'

# Get sample input data to pass into the registry logging function
X = train_df.drop("ELIGIBLE_FOR_INCREASE")

# Let's first log the very first model we trained
model_ver = reg.log_model(
    model_name=model_name,
    version_name=model_version,
    model=classifier,
    sample_input_data=X, # to provide the feature schema
)

model_ver.set_metric(
        metric_name="ACC",
        value = acc,
    )

## 17. Feature Importance of the Tuned Model
To identify which feature best affects our predictive variable


In [None]:
# Step 1: Reference the model for feature importance extraction
model = classifier.to_xgboost()

# Step 2: Get feature importances from the best estimator
feature_importances = model.feature_importances_

# Step 3: Create feature importance DataFrame
importance_df = pd.DataFrame({
    "Features": train_df.drop(unused_cols).columns,  
    "Importance": feature_importances
}).sort_values("Importance", ascending=False)

# Step 4: Convert to Snowpark DataFrame and write to Snowflake table
importance_spdf = session.create_dataframe(importance_df)
importance_spdf.write.mode("overwrite").save_as_table("TBL_FEATURE_IMP")

## 18. Visualizing Feature Importance

In [None]:
importance_df["Features"] = importance_df["Features"].str.replace(r"_LE$", "", regex=True)

# Create an interactive horizontal bar chart using Plotly Express
fig = px.bar(
    importance_df,
    x="Importance",
    y="Features",
    orientation="h",
    color="Importance",  # color by importance value
    color_continuous_scale="Viridis",# horizontal bars
    title="Feature Importance"
)

# Update layout to order features so the highest importance appears at the top
fig.update_layout(yaxis={'categoryorder': 'total ascending'},height=500)

## 19. Listing Registered Models
Display the models stored in the model registry.



In [18]:
# List model
reg.show_models()

Unnamed: 0,created_on,name,database_name,schema_name,comment,owner,default_version_name,versions
0,2024-03-06 00:02:15.909000-08:00,LOAN,LOAN,DEPARTMENT,,ACCOUNTADMIN,V0,"[""V0""]"


## 20. Listing Model Versions
Retrieve and display the different versions of the CREDIT_LIMIT_MODEL.We have just the one version (V1)

In [31]:
reg.get_model(model_name).show_versions()

Unnamed: 0,created_on,name,comment,database_name,schema_name,module_name,is_default_version,functions,metadata,user_data
0,2024-03-06 00:02:15.942000-08:00,V0,,LOAN,DEPARTMENT,LOAN,True,"[""PREDICT_PROBA"",""PREDICT"",""PREDICT_LOG_PROBA""]",{},"{""snowpark_ml_data"":{""functions"":[{""name"":""PRE..."
1,2024-03-06 00:22:13.240000-08:00,V1,,LOAN,DEPARTMENT,LOAN,False,"[""PREDICT_PROBA"",""PREDICT"",""PREDICT_LOG_PROBA""]","{""metrics"": {""accuracy"": 1.0}, ""snowpark_ml_sc...","{""snowpark_ml_data"":{""functions"":[{""name"":""PRE..."


## 21. Setting a Default Model Version
Assign V1 as the default model version for deployment and predictions.

In [None]:
m = reg.get_model(model_name)
m.default = 'V1'
mv = m.default
mv

## 22. Load the Inference Table into memory

In [None]:
session = get_active_session()
df = session.table("SNOWPARK_ML_DEMO.PUBLIC.CREDIT_LIMIT_MODEL_FEATURES_NEW")
df.show()

## 23. Retrieve the saved Preprocessing Pipeline from Stage and apply it to the inference dataset

In [None]:
# Load preprocessing pipeline from a file
session.file.get('@SNOWPARK_ML_DEMO.PUBLIC.ML_STG/preprocessing_pipeline.joblib.gz', '/tmp')
pipeline_file = '/tmp/preprocessing_pipeline.joblib.gz'

preprocessing_pipeline = joblib.load(pipeline_file)

# Apply preprocessing
testing_spdf = preprocessing_pipeline.fit(df).transform(df)
testing_spdf.show()

## 24. Run the Predictions on Inference Dataaet

In [None]:
# Perform prediction
results = mv.run(testing_spdf, function_name="predict")

## 25. Displaying Predictions

In [None]:
results.show()

## 26. Storing Inference data in a Snowflake table

In [39]:
results.write.mode("overwrite").save_as_table("CREDIT_LIMIT_APPROVAL_INFERENCE")

In [None]:
SELECT * FROM CREDIT_LIMIT_APPROVAL_INFERENCE;