# End-to-End ML: Deep Learning Recommendation Model
This recommendation engine will provide recommendation scores to loyalty customers for each menu item sold by Tasty Bytes food trucks. The output will be used for personalized outreach, increasing the truck brands visited by customers, and increasing traffic to underperforming trucks.

Model training for the recommendation engine leverages distributed training across GPU devices. End-to-end model development and deployment is simplified and streamlined using the following Snowflake features:

- Snowflake Notebooks with GPU Container Runtime (PuPr)
- Snowflake Feature Store (GA)
- Snowflake Modeling API (GA) - Preprocessing, Training (PyTorch API PuPr), Evaluation
- Snowflake Model Registry (GA)
- Model Deployment from Registry to SPCS (PuPr)

## Setup
Import the Snowflake libraries to support the end-to-end model development and deployment. 

In [None]:
!pip install torch==2.2.2

In [1]:
# Standard library imports
import os
import time
import math
#import sys

# Third-party library imports
import pandas as pd
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.utils.data import DataLoader, TensorDataset
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif 

# Snowflake library imports
import streamlit as st
from snowflake.ml.modeling.preprocessing import LabelEncoder, MinMaxScaler
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
)
#from snowflake.ml.modeling.data import MLRuntimeDataset
#from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

# from snowflake.ml.modeling.pytorch.context import getContext
from snowflake.ml.modeling.distributors.pytorch import get_context

from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
)
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.metrics import (
roc_auc_score,  
precision_score, 
recall_score, 
confusion_matrix
)
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark.context import get_active_session
session = get_active_session()
import warnings
warnings.filterwarnings('ignore')

# Add a query tag to the session. This helps with debugging and performance monitoring.
session.query_tag = {"origin":"sf_sit", "name":"tasty_bytes_e2e_ml", "version":{"major":1, "minor":0}}

db = str(session.get_current_database().strip('"'))
solution_prefix = (db.upper()).split('_PROD')[0]



### GPU Device Info
View the number of available GPU devices in the notebook.

**Snowflake Feature**: Snowflake GPU Notebooks (PuPr) - easily access GPU compute from a Snowflake Notebook.

In [None]:
# Get device info
if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print("Number of GPU devices available:", num_gpus)
    
    for i in range(num_gpus):
        print("Device", i, ":", torch.cuda.get_device_name(i))
    
    #Set a default device
    torch.cuda.set_device(0)
else:
    print("CUDA is not available. Check your installation or GPU setup.")

## Feature Store
The feature store contains feature views for customers, menu items, and purchases. Model features will be accessed from the feature store.

**Snowflake Feature:** Feature Store - Easily find features that work with your data

In [None]:
# Access feature views
FS=FeatureStore(
session=session,
database=f"{solution_prefix}_PROD",
    name="FS_SCHEMA",
    default_warehouse=f"{solution_prefix}_DS_WH")

customer_fv : FeatureView = FS.get_feature_view(
    name='CUSTOMER_FEATURES',
    version='V1'
)
print(customer_fv)

menu_fv : FeatureView = FS.get_feature_view(
    name='MENU_FEATURES',
    version='V1'
)
print(menu_fv)

purchase_fv : FeatureView = FS.get_feature_view(
   name='PURCHASE_FEATURES',
   version='V1'
)
print(purchase_fv)


### Build Datasets from Feature Store
The interaction dataset contains a purchase flag for each menu item/customer pair. This interaction dataset is split into train, validation, and test. The features are brought into the datasets from the feature store.

In [None]:
# Split the interaction dataset and get features from the feature store
def create_dataset(spine_df, name):
    data = FS.generate_dataset(
    name=name,
    spine_df=spine_df,
    features=[customer_fv, menu_fv, purchase_fv]
    )
    df = data.read.to_snowpark_dataframe().drop("BIRTHDAY_DATE")
    return df
    
interaction_df = session.table('analytics.loyalty_purchased_items')

# Split into train/validation/test
datasets = interaction_df.random_split([.1, .1, .8])

# Build training tables
train_df = create_dataset(datasets[0], "train")
val_df = create_dataset(datasets[1], "validation")
    
train_df.show()

In [None]:
train_df.count() + val_df.count()

## Feature Engineering
The model creates embeddings of categorical (sparse) features. It expects categorical values to be encoded as unique integers. The preprocessing applies label encoding to sparse features and min-max scaling to numeric (dense) features.

**Snowflake Feature:** Snowpark ML Modeling API - Feature engineering and preprocessing (GA) - Improve performance and scalability with distributed execution for frequently-used scikit-learn preprocessing functions.

In [6]:
# Preprocess sparse and dense features
sparse_features = ['MENU_ITEM_NAME', 
                   'MENU_TYPE', 
                   'TRUCK_BRAND_NAME', 
                   'ITEM_CATEGORY', 
                   'ITEM_SUBCATEGORY',
                   'CITY',
                   'COUNTRY',
                   'GENDER',
                   'MARITAL_STATUS',]

dense_features = ['SALE_PRICE_USD',
                  'AGE',
                  'AVG_MONTHLY_PURCHASE_AMOUNT',
                  'AVG_WEEKLY_PURCHASE_AMOUNT',
                  'AVG_YEARLY_PURCHASE_AMOUNT',
                 ]

label_col = "PURCHASED"

# Create pipeline
pipeline_steps = []

# Label encode sparse features
for i, feat in enumerate(sparse_features):
    le_step = (
        f"LE{i+1}",
        LabelEncoder(input_cols=[feat], output_cols=[feat]),
    )
    pipeline_steps.append(le_step)

# Scale dense features
pipeline_steps.append(
    (
        "MMS",
        MinMaxScaler(
            feature_range=(0, 1),
            input_cols=dense_features,
            output_cols=dense_features
        )
    )
)

# Preprocessing pipeline
preprocessing_pipeline = Pipeline(steps=pipeline_steps)
train_data = preprocessing_pipeline.fit(train_df).transform(train_df)
val_data = preprocessing_pipeline.transform(val_df)

### Save the Pipeline
The saved pipeline will be used for feature transformations in inference.

In [None]:
# Save pipeline to a stage where it can be centrally accessed
pipeline_local_path = f'/tmp/dlrm_preprocessor_v1.joblib'
joblib.dump(preprocessing_pipeline, open(pipeline_local_path, 'wb'))
session.file.put(pipeline_local_path, 
                 '@ML.ML_STAGE/dlrm_preprocessor_v1.joblib', 
                 auto_compress=False, 
                 overwrite=True)


In [None]:
USE SCHEMA ML;
CREATE or replace STAGE UDF_STAGE;

In [None]:
import json

data = train_df[dense_features + sparse_features + [label_col]]
data = data.with_columns(sparse_features,
                        [F.col(c).cast(T.StringType()) for c in sparse_features])

def serialize_label_encoders(label_encoders):
    serialized_label_encoders = {}
    for feat, lbe in label_encoders.items():
        serialized_label_encoders[feat] = {
            'input_cols': lbe.input_cols,
            'output_cols': lbe.output_cols,
            'classes_': lbe.classes_.tolist()
        }
    return serialized_label_encoders

def save_label_encoders_to_stage(label_encoders, stage_name, dir_name):
        serialized_label_encoders = json.dumps(label_encoders)
        # Write serialized encoders to a local file first
        with open('/tmp/label_encoders.json', 'w') as f:
            f.write(serialized_label_encoders)
        # Upload the local file to the Snowflake stage
        session.file.put('/tmp/label_encoders.json', f'@{stage_name}/{dir_name}',auto_compress=False)
        return f'Uploaded to @{stage_name}/{dir_name}'
    
label_encoders = {}

# Iterate over each sparse feature
for feat in sparse_features:
    # Initialize LabelEncoder for the current feature
    lbe = LabelEncoder(input_cols=[feat], output_cols=[feat+'_ENCODED'],drop_input_cols=True)
    
    # Fit LabelEncoder to the data
    lbe.fit(data)
    
    # Store the LabelEncoder object for reference
    label_encoders[feat] = lbe
    data = lbe.transform(data)
# Serialize label encoders
serialized_label_encoders = serialize_label_encoders(label_encoders)
stage_name="UDF_STAGE"
dir_name="dlrm_label_encoders"
# Save serialized label encoders to a file
with open('/tmp/label_encoders.json', 'w') as f:
    json.dump(serialized_label_encoders, f)
save_label_encoders_to_stage(serialized_label_encoders, stage_name, dir_name)


## Model Definition
This PyTorch model is a deep learning recommendation model (DLRM). It is being used to provide a recommendation score for every menu item to each loyalty customer. 
- The embedding layer converts categorical features to dense vectors. 
- Numeric features are processed through Multi-Layer Perceptron (MLP) layers. 
- The feature interaction layer captures complex relationships between pairs of input features. 
- The final dense layers produce the recommendation score.

In [None]:
# PyTorch DLRM
class FeatureInteraction(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, inputs):
        feature_dim = inputs.shape[1]
        concat_features = inputs.view(-1, feature_dim, 1)
        dot_products = torch.matmul(concat_features, concat_features.transpose(1, 2))
        ones = torch.ones_like(dot_products) 
        mask = torch.triu(ones)
        out_dim = feature_dim * (feature_dim + 1) // 2
        flat_result = dot_products[mask.bool()]
        reshape_result = flat_result.view(-1, out_dim)
        return reshape_result

class DLRM(nn.Module):
    
    def __init__(self, sparse_feature_number, dense_feature_number, num_embeddings, embed_dim, bottom_mlp_dims, top_mlp_dims):
        super(DLRM, self).__init__()
        
        self.embeddings = nn.EmbeddingBag(num_embeddings=num_embeddings, embedding_dim=embed_dim, mode='sum')
        self.layer_feature_interaction = FeatureInteraction()
        
        self.bottom_mlp = torch.nn.Sequential(
            torch.nn.Linear(dense_feature_number, bottom_mlp_dims[0]),
            torch.nn.ReLU(),
            torch.nn.Linear(bottom_mlp_dims[0], bottom_mlp_dims[1]),
            torch.nn.ReLU()
        )
        
        top_mlp_input_dim = (
            (embed_dim + bottom_mlp_dims[1]) 
            * ((embed_dim + bottom_mlp_dims[1]) + 1) // 2 
            + bottom_mlp_dims[1]
         )

        self.top_mlp = nn.Sequential(
            nn.Linear(top_mlp_input_dim, top_mlp_dims[0]),
            nn.ReLU(),
            nn.Linear(top_mlp_dims[0], top_mlp_dims[1]),
            nn.ReLU(),
            nn.Linear(top_mlp_dims[1], 1)
        )

    def forward(self, x_sparse, x_dense):
        # Embedding layer for categorical inputs
        embed_x = self.embeddings(x_sparse)
        # MLPs for numeric inputs
        bottom_mlp_output = self.bottom_mlp(x_dense)
        # Combine categical embeddings and MLP outputs
        concat_first = torch.cat([bottom_mlp_output, embed_x], dim=-1)
        # Get feature interactions
        interaction = self.layer_feature_interaction(concat_first)
        # Concat interaction outputs with MLP outputs
        concat_second = torch.cat([interaction, bottom_mlp_output], dim=-1)
        # MLP layers to output 
        output = self.top_mlp(concat_second)
        return output

## Model Training

The model training function deploys the model and data to each device. Gradients are combined and propagated across all devices with each training batch. After each epoch, training and validation losses are averaged across all devices and the model weights are saved. 

**Snowflake Feature:** Snowpark ML Modeling API - PyTorch (PuPr) - Perform distributed across GPU devices from a Snowpark DataFrame.



In [None]:
# Adjust number of epochs and records in the training data
num_epochs = 2
training_sample = 100000

In [None]:
# Model training function
def setup(rank, world_size):
    # Initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    torch.manual_seed(42)

def train_func():
    
    context = get_context()
    rank = context.get_rank()
    world_size = context.get_world_size()
    setup(rank, world_size)

    batch_size = 256

    # GET DATA FROM CONTEXT AND SET UP TENSORS
    dataset_map = context.get_dataset_map()
    training_data = dataset_map["train"].get_shard().to_torch_datapipe(
        batch_size=batch_size, shuffle=True
    )
    validation_data = dataset_map["val"].get_shard().to_torch_datapipe(
        batch_size=batch_size, shuffle=True
    )
    dataloader = DataLoader(training_data, batch_size=None)
    val_dataloader = DataLoader(validation_data, batch_size=None)

    # DEFINE MODEL
    model = DLRM(
        sparse_feature_number=len(sparse_features),
        dense_feature_number=len(dense_features),
        num_embeddings=142,
        embed_dim=128,
        bottom_mlp_dims=[256, 128],
        top_mlp_dims=[128, 128],
    )
        
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    criterion = torch.nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
    
    # TRAIN
    for epoch in range(num_epochs):
        start_time = time.time()
        records_processed = 0
        running_loss = 0.0
        i = 0
        
        for batch_idx, batch_data in enumerate(dataloader):
            y = batch_data.pop(label_col).type(torch.float32).to(rank)
            
            x_sparse = torch.stack(
                [tensor.to(torch.int) for key, tensor in batch_data.items() if key in sparse_features],
                dim=1
            )
            x_dense = torch.stack(
                [tensor.to(torch.float32) for key, tensor in batch_data.items() if key in dense_features],
                dim=1
            )
                        
            optimizer.zero_grad()
            output = ddp_model(x_sparse, x_dense)
            loss = criterion(output, y.unsqueeze(1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            records_processed += len(y)

            if (batch_idx + 1) % 500 == 0:
                print(
                    f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx + 1}: Device {rank} processed {records_processed} records, Epoch Time: {time.time() - start_time:.2f} seconds, Average Training loss: {running_loss / (batch_idx + 1):.4f}"
                )

        print(
            f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx + 1}: Device {rank} processed {records_processed} records, Epoch Time: {time.time() - start_time:.2f} seconds, Average Training loss: {running_loss / (batch_idx + 1):.4f}"
        )

        # Average loss across devices
        running_loss_tensor = torch.tensor(running_loss / (batch_idx + 1), device=rank)
        dist.all_reduce(running_loss_tensor)
        dist.barrier()
        running_loss = running_loss_tensor.item()
        running_loss /= world_size

        # GET VALIDATION LOSS
        ddp_model.eval()
        val_loss = 0.0
        for val_batch_idx, val_batch_data in enumerate(val_dataloader):
            y_val = val_batch_data.pop(label_col).type(torch.float32).to(rank)
            x_sparse_val = torch.stack(
                [tensor.to(torch.int) for key, tensor in val_batch_data.items() if key in sparse_features],
                dim=1
            )
            x_dense_val = torch.stack(
                [tensor.to(torch.float32) for key, tensor in val_batch_data.items() if key in dense_features],
                dim=1
            )
    
            with torch.no_grad():
                output_val = ddp_model(x_sparse_val, x_dense_val)
                loss_val = criterion(output_val, y_val.unsqueeze(1))
            
            val_loss += loss_val.item()
    
        # Average validation loss across devices
        val_loss_tensor = torch.tensor(val_loss / (val_batch_idx + 1), device=rank)
        dist.all_reduce(val_loss_tensor)
        dist.barrier()
        val_loss = val_loss_tensor.item()
        val_loss /= world_size
        ddp_model.train()
        
    
        # SAVE MODEL
        if rank == 0:
            print(f" Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss:.4f}, Validation Loss: {val_loss:.4f}, Epoch Time: {time.time() - start_time:.2f} seconds ")
            torch.save(model.state_dict(), '/tmp/latest_model.pth')
    
    dist.destroy_process_group()

In [None]:
# Train - Snowflake ML PyTorch API
pytroch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(
        num_nodes=1,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    ),
)

data_train = ShardedDataConnector.from_dataframe(train_data.limit(training_sample))
data_val = ShardedDataConnector.from_dataframe(val_data)

out = pytroch_trainer.run(
    dataset_map=dict(
             train=data_train,
             val=data_val
         )
)

## Model Deployment
The model will be logged to the Snowflake Model Registry. The logged model will then be deployed for inference on Snowpark Container Services (SPCS).

**Snowflake Feature**: Snowflake Model Registry (GA) with SPCS deployment (PuPr) - securely deploy and manage models and their metadata in Snowflake in a flexible compute environment.


In [None]:
# Load the model
def load_model(model_path):
    model = DLRM(sparse_feature_number=len(sparse_features),
                 dense_feature_number=len(dense_features),
                 num_embeddings=142,
                 embed_dim=128,
                 bottom_mlp_dims=[256, 128],
                 top_mlp_dims=[128, 128])
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Load saved model
model = load_model('/tmp/latest_model.pth')

In [None]:
# Register the model to the Snowflake model registry.
registry = Registry(session=session, database_name=f"{solution_prefix}_PROD", schema_name="REGISTRY")


In [None]:
sample_input = train_data.limit(1).to_pandas()
x_sparse = torch.tensor(sample_input[sparse_features].values, dtype=torch.int)
x_dense = torch.tensor(sample_input[dense_features].values, dtype=torch.float32)

# Log model to registry
model_ref = registry.log_model(
    model,
    model_name="RecModelDemo",
    version_name="V1",
    sample_input_data=[x_sparse[0].unsqueeze(0), x_dense[0].unsqueeze(0)],
    options={'relax_version': True}
)


In [None]:
model_ref.create_service(service_name="TB_REC_SERVICE_DEMO_PREDICT",
                  service_compute_pool="TASTYBYTESENDTOENDML_DEPLOY_POOL",
                  image_repo="TASTYBYTESENDTOENDML_PROD.REGISTRY.IMAGE_REPO",
                  build_external_access_integration="TASTYBYTESENDTOENDML_CONDA_ACCESS_INTEGRATION")

## Model Inference & Evaluation
Inference of the test data will be completed using the model deployed to SPCS running on a dedicated compute pool. Features for the test data will be accessed from the feature store and the preprocessing pipeline will transform the data as required.

**Model Output**: 
The model will output a score based on the input features. The higher the score, the more highly recommended a menu item is for that customer. A binary prediction is created from the score to evaluate the model's performance (1 if the score above 0.5, 0 otherwise).

**Snowflake Features:** 
- Inference on SPCS (PrPr) - Run inference against the model deployed to a container environment with a dedicated compute pool.
- Snowpark ML Modeling API - Evaluation Metrics (GA) - Improve performance and scalability with distributed execution for frequently-used scikit-learn preprocessing functions.



In [None]:
ALTER FUNCTION IF EXISTS TB_REC_SERVICE_DEMO_PREDICT(ARRAY, ARRAY) SET MAX_BATCH_ROWS = 100;

In [None]:
# Get features from feature store
test_df = FS.retrieve_feature_values(
    spine_df=datasets[2],
features=[customer_fv, menu_fv, purchase_fv]
)
#test_df_subset = test_df.sample(100000)
test_df_subset = test_df.sample(n=10000)

# Preprocess
test_data = preprocessing_pipeline.transform(test_df_subset)

# Predict
eval_df = test_data.select(
    "customer_id",
    "menu_item_name",
    "purchased",
    F.call_udf(
        "TB_REC_SERVICE_DEMO_PREDICT_FORWARD", 
        F.array_construct(*sparse_features),
        F.array_construct(*dense_features),
    )["output_feature_0"][0].alias("prediction"),
    F.iff(F.col("prediction") >=0.5, 1, 0).alias("binary_prediction")
).order_by("prediction", ascending=0).cache_result()


In [None]:
eval_df.show()

### Evaluation

A strong recommendation model should recommend most of the purchased items (this was the training indicator that an item was of interest to the customer). On the other hand, unpurchased items are not always indications of disinterest. The goal of the recommendation engine is to identify unpurchased items that could be of interest to the customer, which would require "misclassifying" unpurchased items. Ideally, we want a high recall and a portion of unpurchased items to be recommended.

In [None]:
# Get Evaluation Metrics
cols = st.columns(3)
cols[0].metric("AUC", round(roc_auc_score(df=eval_df, y_true_col_names="PURCHASED", y_score_col_names="PREDICTION"),3))
cols[1].metric("Recall", round(recall_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
cols[2].metric("Precision", round(precision_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))

In [None]:
m = registry.get_model("RecModelDemo")
mv=m.version("v1")
mv.set_metric("AUC", round(roc_auc_score(df=eval_df, y_true_col_names="PURCHASED", y_score_col_names="PREDICTION"),3))
mv.set_metric("Recall", round(recall_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
mv.set_metric("Precision", round(precision_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
#m.set_tag("live_version", "v1")
m.description = "Provides menu recommendations for Tasty bytes business"


## Summary
This notebook leveraged a GPU compute pool, unlocking the ability to work with data at scale and deep learning models. End-to-end, this workflow used a Snowpark DataFrame and leveraged the following features to simplify development and deployment:
- Snowflake Notebooks with GPU Container Runtime (PrPr)
- Snowflake Feature Store (PuPr)
- Snowflake Modeling API (GA) - Preprocessing, Training (PyTorch API PrPr), Evaluation
- Snowflake Model Registry (GA)
- Model Deployment to SPCS (PrPr)

In [None]:
session.close()