# Tasty Bytes - Price Optimization

## Overview
**Tasty Bytes is one of the largest food truck networks** in the world with localized menu options spread across  **15 food truck brands** globally. **Tasty Bytes is aiming to achieve 25% YoY sales growth over 5 years.**

**Price optimization** enables Tasty Bytes to achieve this goal by **determining the right prices** for their menu items to **maximize profitability** while **maintaining customer satisfaction**.

As Tasty Bytes Data Scientists, **we will implement price optimization for their diversified food-truck brands to inform their pricing and promotions**. 

In this notebook, we will train & deploy an ML model which leverages historical menu-item sale data to understand how menu-item demand changes with varying price. By utilizing this trained model, we would recommend the optimal day of week prices for all menu-items for the upcoming month to our food-truck brands.

**In this notebook, we will use Snowpark to:**

- Explore the data
- Perform feature engineering
- Train a model
- Deploy & utilize the model in Snowflake

**Benefits of using Snowpark:**

- No copies or movement of data
- Maintain governance
- Leverage Snowflake scalable compute

### Import Packages
After installing Snowpark in our Python environment, we import the Snowpark packages similar to any other Python package.

In [None]:
#Import required libraries
import os
import pandas as pd
import numpy as np
import json
import cachetools
import xgboost
import ast
import warnings
warnings.filterwarnings("ignore")

# Import viz packages 
import matplotlib.pyplot as plt 
plt.rc("font", size=10)
import seaborn as sns
sns.set(style="white")
sns.set(style="whitegrid", color_codes=True)

# Import Snowflake modules
from snowflake.snowpark import Session
from snowflake.snowpark import Window
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.ml.registry.registry import Registry
from snowflake.ml.modeling.metrics import mean_squared_error, mean_absolute_error, r2_score
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection.grid_search_cv import GridSearchCV

from snowflake.snowpark.context import get_active_session

session = get_active_session()


In [None]:
# Current Environment Details
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))

# Data Exploration
We will be conducting all our data exploration by leveraging Snowpark Dataframe enabling us to utilize Snowflake compute to do our operations while employing the familiar data representation of Pandas dataframes which is popular among Python users.

### Snowpark DataFrame
Let's create a Snowpark DataFrame by reading the data in the daily menu-item sale view in our Snowflake account using Snowpark's session.table function.

In [None]:
# Read daily sales data from snowflake table into Snowpark dataframe
sales_daily_df = session.table("menu_item_aggregate_v").order_by(["TRUCK_BRAND_NAME","MENU_ITEM_ID","DATE"])

### Preview the Data
With our Snowpark DataFrame defined, we use the .show() function and specify the number of rows we want printed to be 5. This value is defaulted to 10 in case the argument is not provided. 

In [None]:
sales_daily_df.show(5)

### How many rows are in our data?

We utilize the Snowpark count function to how much data we are working with. It is similar to COUNT function in SQL. The rows are counted within Snowflake without any data transfer. 

In [None]:
# Number of records in our data
sales_daily_df.count() 

### What are the columns in our data?
Let's access the columns object of our Snowpark Dataframe to look at all the columns present in our data.

In [None]:
# Columns in our data
sales_daily_df.columns

# Feature Engineering

In this section, we will build our feature engineering pipeline which leverages Snowflake compute to perform aggregation and transformation operations enabling us to utilize native Snowflake performance & scale.

### Create Aggregate Table

We create aggregate table by utilizing the Snowpark group_by function to define groups rows by the columns specified and Snowpark agg function to get average for our aggregated columns. 

In [None]:
# Create year & month column
sales_daily_df = sales_daily_df.with_column("MONTH",F.date_part("MONTH", F.col("DATE"))) \
                .with_column("YEAR",F.date_part("YEAR", F.col("DATE")))

# Group by columns : Truck_Brand_ID, Month, Weekday, Menu_Item_ID
# Aggegrate columns : AVG(TOTAL_QUANTITY_SOLD), AVG(PRICE), AVG(COST_OF_GOODS_USD), AVG("BASE_PRICE")
sales_agg_df = sales_daily_df.group_by(["TRUCK_BRAND_NAME", "MONTH", "YEAR", "DAY_OF_WEEK","MENU_ITEM_ID"]) \
                    .agg(F.avg("TOTAL_QUANTITY_SOLD").alias("TOTAL_QUANTITY_SOLD"), \
                         F.avg("PRICE").alias("PRICE"), \
                         F.avg("BASE_PRICE").alias("BASE_PRICE"), \
                         F.avg("COST_OF_GOODS_USD").alias("COST_OF_GOODS_USD"))

sales_agg_df.show(5)

### Create New Feature

We track for price fluctuations by creating a new column and naming it Price Change by leveraging Snowpark with_column function.

In [None]:
# Price change column
sales_agg_df = sales_agg_df.with_column("PRICE_CHANGE",(F.col("PRICE") - F.col("BASE_PRICE"))/ F.col("BASE_PRICE"))


### Rows for Future Month

Next we add rows to our Snowpark Dataframe for the upcoming month. We do so by utilizing Snowpark functions enabling us to push all our complex data transformations onto our Snowflake compute.  

In [None]:
## Future month
# Get latest month & year
max_year = sales_agg_df.select(F.max("YEAR")).collect()[0][0]
max_month = (
    sales_agg_df.filter(F.col("YEAR") == max_year)
    .select(F.max("MONTH"))
    .collect()[0][0]
)

# Get future month
new_year = max_year if max_month < 12 else max_year + 1
new_month = max_month + 1 if max_month < 12 else 1

# Create future dataframe
future_df = session.create_dataframe([0,1, 2, 3, 4, 5, 6]).to_df("DAY_OF_WEEK")
future_df = future_df.with_columns(
        ["MONTH", "YEAR", "TOTAL_QUANTITY_SOLD", "PRICE","PRICE_CHANGE"],
        [F.lit(new_month), F.lit(new_year), F.lit(None), F.lit(0),  F.lit(0)],
    )

# Get menu_item_id, truck_brand, base_price & cost_of_goods 
menu_df = session.table("menu_item_cogs_and_price_v") \
                 .with_column("YEAR",F.date_part("YEAR", F.col("START_DATE"))).filter(F.col("YEAR") == new_year) \
                 .select(["MENU_ITEM_ID","COST_OF_MENU_ITEM_USD","SALES_PRICE_USD"]).distinct() \
                 .with_column_renamed(F.col("SALES_PRICE_USD"),"BASE_PRICE") \
                 .with_column_renamed(F.col("COST_OF_MENU_ITEM_USD"),"COST_OF_GOODS_USD")

truck_brand_df = sales_agg_df.select(["MENU_ITEM_ID","TRUCK_BRAND_NAME"]).distinct()
menu_df = menu_df.join(truck_brand_df, (menu_df.col("MENU_ITEM_ID") == truck_brand_df.col("MENU_ITEM_ID")) , \
                                        how = "inner" , lsuffix="_left", _rsuffix="_right") \
                                  .drop(["MENU_ITEM_ID_left"])

# Cross-join with future dataframe 
future_df = future_df.cross_join(menu_df)

# Append to historical agg sales data
sales_agg_df = future_df.union_all_by_name(sales_agg_df)

### Rolling Aggregate Features

We will use a Snowflake window function to get a **rolling averages of price & price change columns** over time. Window functions allow us to aggregate on a "moving" group of rows.

We create **two partitions** for rolling averages over **varied lags** (last month, last year, historical):
1. Menu-item & Day of Week 
2. Menu-item

We create 6 rolling features in total.

In [None]:
# Define function to create Window Average Columns for Snowpark Dataframe based on given column & lag
def create_window_columns(df, col_name, agg_col, lag, partition_col, order_col, lag_param):
    if lag == "HIST" : 
        window = (
        Window.partition_by(partition_col) \
        .order_by(order_col) \
        .rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW - 1)
    )
    else :
        window = (
            Window.partition_by(partition_col) \
            .order_by(order_col) \
            .rows_between(Window.CURRENT_ROW  - lag_param[lag], Window.CURRENT_ROW - 1 )
        )
    return df.with_column(col_name, F.avg(agg_col).over(window))


In [None]:
# Create Day of the Week Features Columns for Price & Price Change with varying lags 
lag_df = ["HIST","YEAR","MONTH"]
agg_feat_columns = [ "PRICE","PRICE_CHANGE"] 
order_columns = ["YEAR","MONTH"]
partition_columns = ["MENU_ITEM_ID","DAY_OF_WEEK"]
lag_param = { "YEAR" : 12, "MONTH" : 1 }
new_cols = []

for agg_col in agg_feat_columns :
    col_name = agg_col
    for lag in lag_df :
        new_col_name = col_name + "_" + lag + "_DOW"
        new_cols.append(new_col_name)
        sales_agg_df = create_window_columns(sales_agg_df, new_col_name, agg_col, lag, \
                                             partition_columns, order_columns, lag_param)
        

In [None]:
# Create Rolling Average Features Columns for Price & Price Change with varying lags 
lag_df = ["HIST","YEAR","MONTH"]
agg_feat_columns = [ "PRICE","PRICE_CHANGE"] 
order_columns = ["YEAR","MONTH","DAY_OF_WEEK"]
partition_columns = ['MENU_ITEM_ID']
lag_param = { "YEAR" : 84, "MONTH" : 7 }

for agg_col in agg_feat_columns :
    col_name = agg_col
    for lag in lag_df :
        new_col_name = col_name + "_" + lag + "_ROLL"
        new_cols.append(new_col_name)
        sales_agg_df = create_window_columns(sales_agg_df, new_col_name, agg_col, lag, \
                                             partition_columns, order_columns, lag_param)


### Impute Missing Values

We check for columns with any missing values and replace null values by 0 using Snowpark built in function - ifnull.

In [None]:
# Check for columns which have nulls
sales_agg_df_pandas = sales_agg_df.toPandas()
print(sales_agg_df_pandas[sales_agg_df_pandas.columns[sales_agg_df_pandas \
                                                              .isnull().any()]].isnull().sum())
nan_cols = [i for i in sales_agg_df_pandas.columns if sales_agg_df_pandas[i].isnull().any()]

# Replace null values by 0 using Snowpark built in function - ifnull
for col in nan_cols:
    sales_agg_df = sales_agg_df.with_column(col, F.call_builtin("ifnull", F.col(col), 0))

### Final features post feature engineering

In [None]:
sales_agg_df.columns

### Save data to Snowflake table

We split the prepped data into training & testing Snowpark dataframe. We will utilize data from last 2 months as testing data. 

In [None]:
# Training data
train_df = sales_agg_df.filter((F.col("YEAR") < F.lit(max_year)) | \
                               ((F.col("YEAR") ==  F.lit(max_year)) & \
                                (F.col("MONTH") <  F.lit(max_month) - 1))) 

# Testing data
test_df = sales_agg_df.filter((F.col("YEAR") == F.lit(max_year)) & \
                              (F.col("MONTH") >= F.lit(max_month) - 1 ) & \
                              (F.col("MONTH") < F.lit(new_month))
                             )

We will save our training, test and all with full prepped datasets to the analytics schema in our Snowflake account.

In [None]:
# Save training data
train_df.write.mode("overwrite").save_as_table(
    "demand_est_input_train"
)

# Save test data
test_df.write.mode("overwrite").save_as_table(
    "demand_est_input_test"
)

In [None]:
# Save full data
sales_agg_df.write.mode("overwrite").save_as_table(
    "demand_est_input_full"
)

# Model Training

## Hyperparameter tuning using Snowflale ML modelling API 

We leverage Snowflake ML API to perform **distributed hyperparameter tuning** in order to train multiple models in parallel to learn to how menu-item demand is influenced by their prices . The `model.fit()` function creates a temporary stored procedure in the background enabling us to leverage Snowflake compute to train the models without moving the data outside the secure Snowflake environment.

**NOTE** - Below cell may take upto 10 mins to complete.

In [None]:
# %%time
# Get training data
train_df = session.table("demand_est_input_train")

# Use price columns for features 
price_cols = [i for i in train_df.columns if "PRICE" in i]

# Grid Search for XGBRegressor using Snowpark ML
grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    cv=5,
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=price_cols,
    label_cols="TOTAL_QUANTITY_SOLD",
    output_cols="DEMAND_ESTIMATION"
)

# Fit on training data
grid_search.fit(train_df)

We can look at the best parameters for model training based on the grid search by accessing best parameter attribute of the **fitted sklearn object**.

In [None]:
best_params_ = grid_search.to_sklearn().best_params_
best_params_

## Model Evaluation

Model is evaluated is using different metrics from Snowpark ML Modeling -  mean_squared_error, mean_absolute_error, r2_score and tested on our prepped test data. We compare the predictions againts the actual historical sale.

In [None]:
# Test data
test_df = session.table("demand_est_input_test")

# Prediction
df_test_pred = grid_search.predict(test_df)

# Model metrics
mse = mean_squared_error(df=df_test_pred, y_true_col_names="TOTAL_QUANTITY_SOLD", y_pred_col_names="DEMAND_ESTIMATION")
mae = mean_absolute_error(df=df_test_pred, y_true_col_names="TOTAL_QUANTITY_SOLD", y_pred_col_names="DEMAND_ESTIMATION")
r2 = r2_score(df=df_test_pred, y_true_col_name="TOTAL_QUANTITY_SOLD", y_pred_col_name="DEMAND_ESTIMATION")
print(f'MSE: {mse}')
print(f'MAE: {mae}')
print(f'R2: {r2}')

# Model Inference

## Deploy model to Model Registry

In [None]:
# Helper functions 

# Function to know if model with the specified name is logged on the given registry
def model_exists(reg, model_name):
    models = reg.show_models()
    if (models.empty) or (model_name not in models["name"].to_list()) :
        return False
    return True

# Function to get the latest version of a given model on the specified registry
def get_latest_version(reg, model_name):
    models = reg.show_models()
    if model_exists(reg, model_name):
        return int(max(ast.literal_eval(models.loc[models["name"] == model_name, "versions"].values[0]))[1:])
    return 0

### Opening Snowpark Model Registry
Before we can deploy our model, we must open the registry. Opening the registry returns a reference to it, which we use to add new models and obtain references to existing models. Below, we open a registry with our default database and schema. 

In [None]:
# Use the session default database and schema for model registry
reg = Registry(session=session) 

# Models logged on the registry
model_df = reg.show_models()
model_df

With registry opened, we can see if there are an pre-existing models using `registry.show_models()` function which returns a dataframe with details of the models  available on the registry. Currently, we have no models deployed on the registry hence the dataframe would be empty.

### Log model to the registry

Log a model by calling the `registry.log_model()` function. This function does the following:
- Serializes the model, a Python object, and creates a Snowflake model object from it.
- Adds metadata, such as a description, to the model as specified in the log_model call.

In [None]:
# Provide name for the model 
model_name = "DEMAND_ESTIMATION_MODEL"

# Log the model
mv = reg.log_model(model=grid_search, \
    model_name= model_name, \
    version_name= "V"+str(get_latest_version(reg, model_name) + 1), \
    metrics={"mean_squared_error":mse, "mean_absolute_error":mae, "r2_score":r2}, 
    comment="Demand estimation ML model based on price features")

Below, we can look at our newly logged model.  

In [None]:
# Models logged on the registry
model_df = reg.show_models()
model_df

Each model may have any number of versions. To log additional versions of the model, we can call log_model again with the same model_name but a different version_name.

We can also explore all the versions of a model by utilizing `model.show_versions()` function. Currently, we have only one version logged for our model. In an event we were to retrain the model, we can log it under the same model name with a new version.

In [None]:
# Versions available for a given model on the registry
model_versions = reg.get_model(model_name).show_versions()
model_versions

### Accessing model prediction
With our model now logged onto the registry, we can access it's associated functions that can be executed to perform inference or other model operations.

To call a function for our model, we use `mv.run` specifying the name of the function to be called and passing a DataFrame containing the inference data. Below, we call the "predict" function of our model to get predictions and infer on our test data. 

In [None]:
prediction = mv.run(test_df.select(price_cols), function_name="predict")
prediction.show()

# Model Utilization

### Create a Stored Procedure which functions as Price Recommender: 
**Step 1. Create a Function for get day of the week price recommendations for all menu-items for a given month**

Let's create Snowpark Stored procedure to utilize the deployed model to predict demand for all possible prices for each menu-item. The possible prices for a given menu-item range from 50% discount of it's base sale price to 20% mark-up on it's base sale price. Once we get the predicted demand at all possible price for each menu-item, we calculate the profit on that demand by accounting for cost of goods, item profit & basket profit. The recommded price would then be price which estimates the maximum profit. Finally, these recommendations are stored to a Snowflake table.

In [None]:
def get_recommendations(
    session: Session,
    month: int,
    year: int,
    price_rec_table: str,
    interval: int, 
    model_name: str
) -> T.Variant :
        from snowflake.ml.registry.registry import Registry
        
        price_cols = ['PRICE', 'PRICE_CHANGE', 'BASE_PRICE', 'PRICE_HIST_DOW', 'PRICE_YEAR_DOW', 'PRICE_MONTH_DOW', 'PRICE_CHANGE_HIST_DOW', \
                        'PRICE_CHANGE_YEAR_DOW', 'PRICE_CHANGE_MONTH_DOW', 'PRICE_HIST_ROLL', 'PRICE_YEAR_ROLL', 'PRICE_MONTH_ROLL', 'PRICE_CHANGE_HIST_ROLL', \
                        'PRICE_CHANGE_YEAR_ROLL', 'PRICE_CHANGE_MONTH_ROLL']
                        
        # Create dataframe with feature columns using prepped table filtered to our input
        item_df = session.table("demand_est_input_full") \
                        .filter((F.col("YEAR") == F.lit(year)) \
                                & (F.col("MONTH") == F.lit(month))) \
                        .drop(["PRICE","PRICE_CHANGE"])


        # Possible price dataframe
        discount_df = session.create_dataframe(list(np.arange(50, -21, -interval)), \
                                                schema=T.StructType([T.StructField("DISCOUNT", T.IntegerType())]),)

        # Cross-join with item_df to get record to infer on
        item_df = item_df.cross_join(discount_df) \
                        .with_column("PRICE", F.round((1 - F.col("DISCOUNT") * 0.01) * F.col("BASE_PRICE"), 2)) \
                        .with_column("PRICE_CHANGE", F.col("PRICE") - F.col("BASE_PRICE"))

        # Get demand estimator model from registry
        reg = Registry(session=session) 
        demand_estimator = reg.get_model(model_name).default
 
        for col in price_cols :
                item_df = item_df.withColumn(col+"_NEW",F.col(col).cast(T.DoubleType())).drop(col).rename(col+"_NEW",col)
           
        # Estimate demand & corresponding item profit based on varying price using Demand Estimator model predict function
        item_df = demand_estimator.run(item_df, function_name="predict") \
                .with_column("ITEM_PROFIT", \
                                        (F.col("DEMAND_ESTIMATION") * F.col("PRICE")) - \
                                        (F.col("DEMAND_ESTIMATION") * F.round(F.col("COST_OF_GOODS_USD"), 2)),)
                                        
        # Get avg basket profit for previous month for the given item
        basket_profit = (session.table("order_item_cost_agg_v") \
                        .filter((F.col("YEAR") == F.lit(year)) \
                                & (F.col("MONTH") == F.lit(month))) \
                                .select("MENU_ITEM_ID", "PREV_AVG_PROFIT_WO_ITEM")).cache_result()

        # Recommend with highest profit
        window = Window.partition_by(["MENU_ITEM_ID", "DAY_OF_WEEK"]).order_by(F.col("TOTAL_PROFIT").desc())
        item_df = (item_df.join(basket_profit, "MENU_ITEM_ID") \
                .with_column("BASKET_PROFIT", F.col("DEMAND_ESTIMATION") * F.col("PREV_AVG_PROFIT_WO_ITEM"),) \
                .with_column("TOTAL_PROFIT", F.col("BASKET_PROFIT") + F.col("ITEM_PROFIT")) \
                .order_by(F.col("PRICE").asc()) \
                .with_column("MAX_PROFIT_IND", F.row_number().over(window))) \
        .filter(F.col("MAX_PROFIT_IND") == 1)

        # Save recommendation to Snowflake
        item_df.select("TRUCK_BRAND_NAME", "MONTH", "YEAR", "DAY_OF_WEEK","MENU_ITEM_ID", \
                                F.round(F.col("COST_OF_GOODS_USD"),2).alias("COST_OF_GOODS_USD"), \
                                "BASE_PRICE", "PRICE","DEMAND_ESTIMATION", \
                                F.round(F.col("ITEM_PROFIT"),2).alias("ITEM_PROFIT"), \
                                F.round(F.col("BASKET_PROFIT"),2).alias("BASKET_PROFIT"), \
                                F.col("TOTAL_PROFIT"),
                                ).write.mode("overwrite") \
        .save_as_table(price_rec_table)
        
        return "Recommendations complete"

In [None]:
# Create stage to host UDF & Stored Procedures
session.sql("CREATE STAGE IF NOT EXISTS PO_STAGE").collect()

**Step 2. Register the Function on Snowflake** 

To register the function on Snowflake as a stored procedure, specify what python packages are required in the function. 

In [None]:
# Register stored procedure
get_recommendations_sp = session.sproc.register(
    func=get_recommendations,
    name="sproc_get_recommendations",
    is_permanent=True,
    replace=True,
    stage_location="@PO_STAGE",
    packages=["snowflake-snowpark-python","snowflake-ml-python==1.4.0"]
)

# Data Driven Insights

**Peking Truck** food-truck brand has seen **lower than average Wednesday sales** for **Wonton Soup** for the past few months. The Brand Manager proposed introducing a Wednesday Wonton promotion to drive up the sales. In order to get the best promotion price, Tasty Bytes utilized deployed price recommender stored procedure to drive up sales.

We will start off by **visualizing Wonton Wednesday Sales & Price** for last 6 months to understand if there might be a relation between the two.

In [None]:
## Wonton Wednesday Sales for last 6 months
wonton_agg_df = sales_agg_df.filter(
                   (((F.col("YEAR") == F.lit(2022)) & \
                    (F.col("MONTH").in_(F.lit(10), F.lit(11), F.lit(12)))) |
                    ((F.col("YEAR") == F.lit(2022)) & (F.col("MONTH").in_(F.lit(1),F.lit(2),F.lit(3))))
                   ) & \
                   (F.col("DAY_OF_WEEK") == F.lit(3)) & \
                   (F.col("MENU_ITEM_ID") == F.lit(133))).order_by([F.col("YEAR"),F.col("MONTH")]) \
                .select(
                    F.col("TOTAL_QUANTITY_SOLD"), \
                    F.col("PRICE_CHANGE"), \
                    F.col("PRICE"), \
                    F.col("BASE_PRICE"), \
                    F.concat(F.col("YEAR"),F.lit('-'),F.col("MONTH")).alias("MONTH")) \
                 .toPandas()

avg_wonton_sale = sales_agg_df.filter(
                   (F.col("YEAR") == F.lit(2022)) & \
                   (F.col("DAY_OF_WEEK") == F.lit(3)) & \
                   (F.col("MENU_ITEM_ID") == F.lit(133))).order_by([F.col("YEAR"),F.col("MONTH")]) \
                    .agg(F.avg("TOTAL_QUANTITY_SOLD").alias("TOTAL_QUANTITY_SOLD")).collect()[0][0]


In [None]:
# Plot Average Monthly Wednesday Sales for Wonton Soup
plt.rcParams["figure.figsize"] = [6.00, 3.50]
lp = sns.lineplot(data=wonton_agg_df, x="MONTH", y="TOTAL_QUANTITY_SOLD")
plt.axhline(y=avg_wonton_sale, color="r", linestyle="-", label="Average Wednesday Wonton Sales")
plt.title("Wednesday Sales for Wonton Soup")
plt.legend( loc='lower right')
plt.xlabel("Month")
plt.ylabel("Quatity Sold")
plt.show()

In [None]:
#Plot Average Monthly Wednesday Prices for Wonton Soup
price = sns.lineplot(data=wonton_agg_df, x="MONTH", y="PRICE")
plt.title("Wednesday Prices for Wonton Soup")
plt.xticks(range(len(wonton_agg_df["MONTH"])), wonton_agg_df["MONTH"], rotation='vertical')
plt.xlabel("Month")
plt.ylabel("Price")
plt.show()

We can see the drop in the sales closely follows an incresase in price.

**Scale up our Snowflake compute**

We will dynamically change to a larger compute warehouse without restarting our Python kernel or needing to redefine variables. Here we use a 2XL warehouse before calling price recommender stored procedure.

In [None]:
# Scale up the compute
session.sql(f"ALTER WAREHOUSE TB_PO_DS_WH SET WAREHOUSE_SIZE = X2LARGE").collect()

In [None]:
# Versions available for a given model on the registry
model_versions = reg.get_model(model_name).show_versions()
model_versions

Next up, **we call the price recommender to get best promotion price** for April 2023. The stored procedure writes the recommendation to Snowflake table - promotion_recommendations.

NOTE - The cell below takes about 30 secs to run.

In [None]:
# Input
month = 4 
year = 2022
price_rec_table = "promotion_recommendations" 

# Call stored procedure
print(get_recommendations_sp(session, month, year,price_rec_table, 10, "DEMAND_ESTIMATION_MODEL"))



**Scale Down Snowflake Compute**

Once complete, we will scale our compute warehouse back down. The larger warehouse size is not required for the remaining tasks.

In [None]:
# Scale down the compute
session.sql(f"ALTER WAREHOUSE TB_PO_DS_WH SET WAREHOUSE_SIZE = XSMALL").collect()

Let's look at the recommended price for Wonton Soup. We see a **recommendation of a lower price compared to its base sale price**. We are also provided with the estimated demand & profit at the recommended price. 

In [None]:
wonton_wed_recommendations = session.table(price_rec_table) \
                            .filter((F.col("DAY_OF_WEEK") == F.lit(3)) & \
                                    (F.col("MENU_ITEM_ID") == F.lit(133))) \
                            .with_column_renamed(F.col("PRICE"), "RECOMMENDED_PRICE").toPandas()
wonton_wed_recommendations

Peking Truck food-trucks apply the recommended discounted price for Wonton on Wednesdays for April 2022.

In [None]:
## Wonton Wednesday Sales for Feb - Apr 2022
wonton_agg_df = sales_agg_df.filter(
                                   (F.col("YEAR") == F.lit(2022)) & \
                                   (F.col("MONTH").in_(F.lit(1), F.lit(2), F.lit(3), F.lit(4))) & \
                                   (F.col("DAY_OF_WEEK") == F.lit(3)) & \
                                   (F.col("MENU_ITEM_ID") == F.lit(133))) \
                            .select(
                                    F.col("TOTAL_QUANTITY_SOLD"),  F.col("PRICE"), \
                                    F.concat(F.col("YEAR"),F.lit('-'),F.col("MONTH")).alias("MONTH")) \
                            .order_by(F.col("MONTH")).toPandas()

# Plot Average Monthly Wednesday Sales for Wonton Soup
plt.rcParams["figure.figsize"] = [6.00, 3.50]
lp = sns.lineplot(data=wonton_agg_df, x="MONTH", y="TOTAL_QUANTITY_SOLD")
plt.axhline(y=avg_wonton_sale, color="r", linestyle="-", label="Average Wednesday Wonton Sales")
plt.title("Wednesday Sales for Wonton Soup")
plt.legend( loc='lower right')
plt.xlabel("Month")
plt.ylabel("Quatity Sold")
plt.show()

We can see a spike in the sales post the promotion being introduced for Peking Trucks.

**Output for Price Optimization SiS App**
Tasty Bytes also utilizes the price recommender to get optimum prices for all menu-items apart from special promotional prices. These price recommendations are leveraged downstream by the SiS App to inform food-truck brand pricing strategy. 

NOTE - The cell below takes about 30 secs to run.

In [None]:
# Input
month = 4 
year = 2022
price_rec_table = "price_recommendations" 


# Scale up the compute
session.sql(f"ALTER WAREHOUSE TB_PO_DS_WH SET WAREHOUSE_SIZE = X2LARGE").collect()

# Call stored procedure
print(get_recommendations_sp(session, month, year,price_rec_table, 1, "DEMAND_ESTIMATION_MODEL"))

# Scale down the compute
session.sql(f"ALTER WAREHOUSE TB_PO_DS_WH SET WAREHOUSE_SIZE = XSMALL").collect()

# Summary
**- Securely connected to Snowflake via Snowpark**

**- Performed Feature Engineering by leveraging Snowpark DataFrame to query & transform Snowflake data**

**- Save prepped data to Snowflake table**

**- Employed Snowpark Stored Procedure to train ML model**

**- Deployed model to perform inference using Snowpark User Defined Functions**

**- Utilizing deployed model using Snowpark Stored Procedure**


#### Demo: frostbyte Tasty Bytes
Version: v1

Vignette: Price Optimization

Script: tasty_bytes_price_optimization

Create Date:    2023-06-01

Author:         Shriya Rai

#### Description:
**Tasty Bytes is one of the largest food truck networks** in the world with localized menu options spread across  **15 food truck brands** globally. **Tasty Bytes is aiming to achieve 25% YoY sales growth over 5 years.**

**Price optimization** enables Tasty Bytes to achieve this goal by **determining the right prices** for their menu items to **maximize profitability** while **maintaining customer satisfaction**.

As Tasty Bytes Data Scientists, **we will implement price optimization for their diversified food-truck brands to inform their pricing and promotions**.

In this notebook, we will train & deploy an ML model which leverages historical menu-item sale data to understand how menu-item demand changes with varying price. By utilizing this trained model, we would recommend the optimal day of week prices for all menu-items for the upcoming month to our food-truck brands.

#### Summary of Changes:

| Date(yyyy-mm-dd) | Author         | Comments                      |
| :---             | :---           | :---                                                  |
| 2023-06-01       | Shriya Rai | Initial Release |
| 2024-03-07       | Shriya Rai | Update with Snowpark ML |


In [None]:
/***************************************************************************************************
  _______           _            ____          _
 |__   __|         | |          |  _ \        | |
    | |  __ _  ___ | |_  _   _  | |_) | _   _ | |_  ___  ___
    | | / _` |/ __|| __|| | | | |  _ < | | | || __|/ _ \/ __|
    | || (_| |\__ \| |_ | |_| | | |_) || |_| || |_|  __/\__ \
    |_| \__,_||___/ \__| \__, | |____/  \__, | \__|\___||___/
                          __/ |          __/ |
                         |___/          |___/
Demo:         Tasty Bytes - Price Optimization SiS
Version:      v1
Vignette:     2 - SiS with Snowpark        
Create Date:  2023-06-08
Author:       Marie Coolsaet
Copyright(c): 2023 Snowflake Inc. All rights reserved.
****************************************************************************************************
Description: 
   Create tables used in SiS Streamlit App for Setting Monthly Pricing
****************************************************************************************************
SUMMARY OF CHANGES
Date(yyyy-mm-dd)    Author              Comments
------------------- ------------------- ------------------------------------------------------------
2023-06-08        Marie Coolsaet      Initial Release
2024-03-07        Shriya Rai          Update with Snowpark ML 
***************************************************************************************************/

/*----------------------------------------------------------------------------------
Instructions: Run all of this script to create the required tables and roles for the SiS app.

Note: In order for these scripts to run you will need to have run the notebook in
tasty_bytes_price_optimization.ipynb in 1 - Machine Learning with Snowpark.

 ----------------------------------------------------------------------------------*/

USE ROLE tb_po_data_scientist;
USE WAREHOUSE tb_po_ds_wh;

ALTER warehouse tb_po_ds_wh SET warehouse_size='large';

-- create the table that the app will write back to
CREATE OR REPLACE TABLE tb_po_prod.analytics.pricing_final (
    brand VARCHAR(16777216),
    item VARCHAR(16777216),
    day_of_week VARCHAR(16777216),
    new_price FLOAT,
    current_price FLOAT,
    recommended_price FLOAT,
    profit_lift FLOAT,
    comment VARCHAR(16777216),
    timestamp TIMESTAMP_NTZ(9)
);

-- create the table with required pricing information for the app
CREATE OR REPLACE TABLE tb_po_prod.analytics.pricing_detail AS
SELECT 
    a.truck_brand_name AS brand,
    a.menu_item_name AS item,
    case 
        when a.day_of_week = 0 then '7 - Sunday'
        when a.day_of_week = 1 then '1 - Monday'
        when a.day_of_week = 2 then '2 - Tuesday'
        when a.day_of_week = 3 then '3 - Wednesday'
        when a.day_of_week = 4 then '4 - Thursday'
        when a.day_of_week = 5 then '5 - Friday'
        else '6 - Saturday'
    end as day_of_week,
    round(b.price::FLOAT,2) AS current_price,
    round(a.price::FLOAT,2) AS recommended_price,
    tb_po_prod.analytics.DEMAND_ESTIMATION_MODEL!PREDICT(
        current_price,
        current_price - c.base_price,
        c.base_price,
        c.price_hist_dow,
        c.price_year_dow,
        c.price_month_dow,
        c.price_change_hist_dow,
        c.price_change_year_dow,
        c.price_change_month_dow,
        c.price_hist_roll,
        c.price_year_roll,
        c.price_month_roll,
        c.price_change_hist_roll,
        c.price_change_year_roll,
        c.price_change_month_roll):DEMAND_ESTIMATION::INT AS current_price_demand,
    tb_po_prod.analytics.DEMAND_ESTIMATION_MODEL!PREDICT(
        recommended_price,
        recommended_price - c.base_price,
        c.base_price,
        c.price_hist_dow,
        c.price_year_dow,
        c.price_month_dow,
        c.price_change_hist_dow,
        c.price_change_year_dow,
        c.price_change_month_dow,
        c.price_hist_roll,
        c.price_year_roll,
        c.price_month_roll,
        c.price_change_hist_roll,
        c.price_change_year_roll,
        c.price_change_month_roll):DEMAND_ESTIMATION::INT AS recommended_price_demand,
    round(((recommended_price_demand
        * (d.prev_avg_profit_wo_item 
            + recommended_price 
            - round(a.cost_of_goods_usd,2))) 
            - (current_price_demand
        * (d.prev_avg_profit_wo_item 
            + current_price 
            - round(a.cost_of_goods_usd,2))))
            ,0) AS profit_lift,
    c.base_price,
    c.price_hist_dow,
    c.price_year_dow,
    c.price_month_dow,
    c.price_change_hist_dow,
    c.price_change_year_dow,
    c.price_change_month_dow,
    c.price_hist_roll,
    c.price_year_roll,
    c.price_month_roll,
    c.price_change_hist_roll,
    c.price_change_year_roll,
    c.price_change_month_roll,
    d.prev_avg_profit_wo_item AS average_basket_profit,
    round(a.cost_of_goods_usd,2) AS item_cost,
    recommended_price_demand * (average_basket_profit
            + recommended_price 
            - item_cost) AS recommended_price_profit,
    current_price_demand * (average_basket_profit
            + current_price
            - item_cost) AS current_price_profit
FROM (
SELECT p.*,m.menu_item_name FROM tb_po_prod.analytics.price_recommendations p 
left join tb_po_prod.raw_pos.menu m on p.menu_item_id=m.menu_item_id) a
LEFT JOIN (SELECT * FROM tb_po_prod.analytics.demand_est_input_full WHERE (month = 3) AND (year=2022)) b
ON  a.day_of_week = b.day_of_week AND a.menu_item_id = b.menu_item_id
LEFT JOIN (SELECT * FROM tb_po_prod.analytics.demand_est_input_full WHERE (month = 4) AND (year=2022)) c
ON a.day_of_week = c.day_of_week AND a.menu_item_id = c.menu_item_id
LEFT JOIN (SELECT * FROM tb_po_prod.analytics.order_item_cost_agg_v WHERE (month = 4) AND (year=2022)) d
ON a.menu_item_id = d.menu_item_id
ORDER BY brand, item, day_of_week;


-- create pricing table to be displayed in the app
CREATE OR REPLACE TABLE tb_po_prod.analytics.pricing 
AS SELECT
    brand,
    item,
    day_of_week,
    current_price AS new_price,
    current_price,
    recommended_price,
profit_lift
FROM
    tb_po_prod.analytics.pricing_detail;

# Price Recommendations
Automated pipelines update ML-driven price recommendations for menu items by day-of-week on a monthly basis. Brand managers are responsible for adjusting pricing for menu items under their food truck brand. We will look at a spreadsheet provided by the Guac n' Roll brand manager for the upcoming month. We'll use the model to see the profit lift from the changes in pricing compared to last month's pricing.

## View Price Recommendations
Recommendations are for the upcoming month by day of the week for each menu item. By default, the new price is set to the current price unless overridden by the brand manager responsible for pricing of that specific food truck brand.

**Value**: Easy deployment of ML models to provide updated insight.

In [None]:
brand = "Guac n' Roll"
item = "Chicken Burrito"
session.table("pricing").filter((F.col("brand")==brand) & (F.col("item")==item)).show()

## Import New Pricing
Currently brand managers submit pricing by spreadsheet. The brand manager for Guac n' Roll provided their pricing so that it could be demonstrated how the model could be used to provide insight on demand and profit changes at different prices.

**Value**: Snowflake provides an accessible, single version of the truth for data.

In [None]:
stage_name = 'TB_PO_PROD.PUBLIC.EXCEL_S3'
file_name = 'pricing_guac_roll_04_2023.xlsx'

file_path = session.file.get_stream(f'@{stage_name}/{file_name}')
set_prices = pd.read_excel(file_path)
set_prices = set_prices[set_prices["ITEM"]==item]
set_prices

## Forecast Demand of New Prices
Here, we call the model that has been deployed to a user-defined function in Snowflake to predict the demand for the new prices that were set in the spreadsheet by the Guac n' Roll brand manager.

**Value:** Access deployed models and apply them adhoc to new data.

In [None]:
# Create Snowpark DataFrame
set_prices = session.create_dataframe(set_prices)

# Define model input features
feature_cols = [
    "price",
    "price_change",
    "base_price",
    "price_hist_dow",
    "price_year_dow",
    "price_month_dow",
    "price_change_hist_dow",
    "price_change_year_dow",
    "price_change_month_dow",
    "price_hist_roll",
    "price_year_roll",
    "price_month_roll",
    "price_change_hist_roll",
    "price_change_year_roll",
    "price_change_month_roll",
]

# Get demand estimation
df_demand = set_prices.join(
    session.table("pricing_detail"), ["brand", "item", "day_of_week"]
).withColumn("price",F.col("new_price")).withColumn("price_change",F.col("PRICE")- F.col("base_price"))

# Get demand estimator model from registry
reg = Registry(session=session) 
demand_estimator = reg.get_model("DEMAND_ESTIMATION_MODEL").default

for col in feature_cols :
        df_demand = df_demand.withColumn(col+"_NEW",F.col(col).cast(T.DoubleType())).drop(col).rename(col+"_NEW",col)
    
df_demand = demand_estimator.run(df_demand, function_name="predict")\
    .select(
    "day_of_week",
    "current_price_demand",
    "new_price",
    "item_cost",
    "average_basket_profit",
    "current_price_profit",
    F.col("demand_estimation").alias("new_price_demand"))

### Visualize Predicted Demand of New vs. Current  Prices

In [None]:
df_demand.to_pandas().plot.bar(x="DAY_OF_WEEK", y=["NEW_PRICE_DEMAND","CURRENT_PRICE_DEMAND"])

## Calculate Profit and Demand Lift
Profit is the total profit (average basket profit sold with Chicken Burritos + menu item profit) multiplied by demand.

**Value:** Push-down calculations in Snowflake of key metrics using model results.

In [None]:
# Demand lift
demand_lift = df_demand.select(
    F.round(((F.sum("new_price_demand") - F.sum("current_price_demand"))
            / F.sum("current_price_demand"))* 100, 1)).collect()[0][0]

# Profit lift
profit_lift = (
    df_demand.with_column(
        "new_price_profit",
        F.col("new_price_demand")
        * (F.col("new_price") - F.col("item_cost") + F.col("average_basket_profit")))
    .select(
        F.round(((F.sum("new_price_profit") - F.sum("current_price_profit"))
                  / F.sum("current_price_profit")) * 100, 1)).collect()[0][0]
)

print("Total Weekly Demand Lift (%)", demand_lift)
print("Total Weekly Profit Lift (%)", profit_lift)

We see that the Wednesday promotion increases demand, but has a minimal effect on profit. We need a better process for brand managers to submit their pricing and use the ML-driven pricing recommendations to drive their decisions around promotions.

**Value**: Streamlit in Snowflake (SiS) provides an easy way to build user interfaces for allowing business users to interact with ML models and data in Snowflake. Plus, deployment is seamless and governed.

**Demo: 		Tasty Bytes - Price Optimization**
Script: 	    tasty_bytes_price_recommendations.ipynb  	
Create Date:    2023-06-08
Author:         Marie Coolsaet

**Description:**

Deployed item-level demand forecasts are used to analyze monthly pricing set by Tasty Bytes's brand managers.

**Summary of Changes:**

| Date(yyyy-mm-dd) | Author         | Comments                        |
| :---             | :---           | :---                                                  |
| 2023-06-068      | Marie Coolsaet | Release 1 | 
| 2024-03-07       | Shriya Rai | Update with Snowpark ML |