In [1]:
import snowflake.snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType, StructField, DateType, Variant
from snowflake.snowpark.functions import udf, sum, col,array_construct,month,year,call_udf,lit,count
from snowflake.snowpark.version import VERSION
# Misc
import json
import pandas as pd
import numpy as np
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

In [2]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION

# # Current Environment Details
# print('User                        : {}'.format(snowflake_environment[0][0]))
# print('Role                        : {}'.format(snowflake_environment[0][1]))
# print('Database                    : {}'.format(snowflake_environment[0][2]))
# print('Schema                      : {}'.format(snowflake_environment[0][3]))
# print('Warehouse                   : {}'.format(snowflake_environment[0][5]))
# print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
# print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

## Environment Setup

In [3]:
session.sql('''create database if not exists snowflake_sample_data from share sfc_samples.sample_data''').collect()

[Row(status='SNOWFLAKE_SAMPLE_DATA already exists, statement succeeded.')]

In [4]:
session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()
session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='3X-LARGE'").collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.use_warehouse('FE_AND_INFERENCE_WH')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse. 

In [5]:
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'SNOWFLAKE_SAMPLE_DATA' # Name of Snowflake Sample Database might be different...

if TPCDS_SIZE_PARAM == 100: 
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")
    
store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales') 
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales') 
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

In [6]:
print((store_sales.count(), len(store_sales.columns)))


(28800239865, 23)


## Feature Engineering
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data. 

In [7]:
store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [8]:
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [9]:
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))

In [10]:
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')

In [11]:
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.rename('c_customer_sk', 'customer_sk')
#customer.show()

In [12]:
final_df = total_sales.join(customer, on='customer_sk')

In [13]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
final_df.write.mode('overwrite').save_as_table('feature_store')

In [14]:
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost', 'joblib')

In [15]:
session.sql('CREATE OR REPLACE STAGE ml_models_10T ').collect()

[Row(status='Stage area ML_MODELS_10T successfully created.')]

In [16]:
#Create stage for LR

session.sql('CREATE OR REPLACE STAGE ml_models_LR_10T ').collect()

[Row(status='Stage area ML_MODELS_LR_10T successfully created.')]

## Training Model
In this part, we will training a XGBoost model and upload to snowflake stage. This will create `stored procedure` on snowflake, and we can call this model to predict.

### XGBoost

In [17]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from xgboost import XGBRegressor
import joblib
import os

def train_model(session: snowflake.snowpark.Session) -> float:
    snowdf = session.table("feature_store")
    snowdf = snowdf.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) 

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")
    train_x = snowdf_train.drop("TOTAL_SALES").to_pandas() # drop labels for training set
    train_y = snowdf_train.select("TOTAL_SALES").to_pandas()
    test_x = snowdf_test.drop("TOTAL_SALES").to_pandas()
    test_y = snowdf_test.select("TOTAL_SALES").to_pandas()
    cat_cols = ['CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
    num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    preprocessor = ColumnTransformer(
    transformers=[('num', num_pipeline, num_cols),
                  ('encoder', OneHotEncoder(handle_unknown="ignore"), cat_cols) ])

    pipe = Pipeline([('preprocessor', preprocessor), 
                        ('xgboost', XGBRegressor())])
    pipe.fit(train_x, train_y)

    test_preds = pipe.predict(test_x)
    rmse = mean_squared_error(test_y, test_preds)
    model_file = os.path.join('/tmp', 'model.joblib')
    joblib.dump(pipe, model_file)
    session.file.put(model_file, "@ml_models_10T",overwrite=True)
    print('successes')
    return rmse

In [18]:
session.use_warehouse('snowpark_opt_wh')
train_model_sp = F.sproc(train_model, session=session, replace=True, is_permanent=True, name="xgboost_sproc", stage_location="@ml_models_10T")
# Switch to Snowpark Optimized Warehouse for training and to run the stored proc
train_model_sp(session=session)

34220644.16519068

### Linear Regression

In [19]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression
import joblib
import os

def train_model_LR(session: snowflake.snowpark.Session) -> float:
    
#---------Preprocess Start ---------
    snowdf = session.table("feature_store")
    snowdf = snowdf.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) 

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")
    train_x = snowdf_train.drop("TOTAL_SALES").to_pandas() # drop labels for training set
    train_y = snowdf_train.select("TOTAL_SALES").to_pandas()
    test_x = snowdf_test.drop("TOTAL_SALES").to_pandas()
    test_y = snowdf_test.select("TOTAL_SALES").to_pandas()
    cat_cols = ['CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
    num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    preprocessor = ColumnTransformer(
    transformers=[('num', num_pipeline, num_cols),
                  ('encoder', OneHotEncoder(handle_unknown="ignore"), cat_cols) ])
#---------Preprocess End---------

    pipe = Pipeline([('preprocessor', preprocessor), 
                        ('LinearRegression', XGBRegressor())])
    pipe.fit(train_x, train_y)

    test_preds = pipe.predict(test_x)
    rmse = mean_squared_error(test_y, test_preds)
    model_file = os.path.join('/tmp', 'model.joblib')
    joblib.dump(pipe, model_file)
    session.file.put(model_file, "@ml_models_LR_10T",overwrite=True)
    print('successes')
    return rmse

In [20]:
#----USE DATABASE AND SESSION
session.use_database('tpcds_xgboost')
session.use_schema('demo')
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost', 'joblib')
#Create stage for LR
session.sql('CREATE OR REPLACE STAGE ml_models_LR_10T ').collect()

session.use_warehouse('snowpark_opt_wh')
train_model_sp = F.sproc(train_model_LR, session=session, replace=True, is_permanent=True, name="LinearRegresson_sproc", stage_location="@ml_models_LR_10T")
# Switch to Snowpark Optimized Warehouse for training and to run the stored proc
train_model_sp(session=session)

34194350.95222402

## Inference
Call `stored procedure` or say our `model` is billed by predict times, \$0.52 for 10T model, \\$1.3 for 100T model.
- `@ml_models` is 100T model
- `@ml_models_10T` is 10T model
- `@ml_models_LR_10T` is Linear Regression model on 10T dataset

In [21]:
# Switch back to feature engineering/inference warehouse
session.use_warehouse('FE_AND_INFERENCE_WH')
session.use_database('tpcds_xgboost')
session.use_schema('demo')
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost', 'joblib')



In [33]:
import sys
import pandas as pd
import cachetools
import joblib
from snowflake.snowpark import types as T
# choose model here
model_name = 'ml_models_10T'
#model_name = 'ml_models_LR_10T'
session.add_import(f"@{model_name}/model.joblib")  

features = [ 'C_BIRTH_YEAR', 'CA_ZIP', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS', 'CD_DEP_COUNT']

@cachetools.cached(cache={})
def read_file(filename):
    import os, joblib
    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

@F.pandas_udf(session=session, max_batch_size=10000, is_permanent=True, stage_location=f'@{model_name}', replace=True, name="clv_xgboost_udf")
def predict(df:  T.PandasDataFrame[int, str, str, str, str, str, int]) -> T.PandasSeries[float]:
    m = read_file('model.joblib')       
    df.columns = features
    return m.predict(df)

Failed to execute query [queryID: 01aac57a-0004-57f1-004c-6687000130d6] 
CREATE OR REPLACE 
  FUNCTION clv_xgboost_udf(arg1 BIGINT,arg2 STRING,arg3 STRING,arg4 STRING,arg5 STRING,arg6 STRING,arg7 BIGINT)
RETURNS FLOAT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8
IMPORTS=('@ml_models_10T/model.joblib','@ml_models_LR_10T/model.joblib','@ml_models_10T/clv_xgboost_udf/udf_py_457487603.zip')
PACKAGES=('snowflake-snowpark-python','scikit-learn','pandas','numpy','joblib','cachetools','xgboost','cloudpickle==2.0.0')
HANDLER='udf_py_457487603.compute'


391500 (42601): SQL compilation error: Duplicate file name model.joblib in IMPORTS clause.


SnowparkSQLException: (1304): 01aac57a-0004-57f1-004c-6687000130d6: 391500 (42601): SQL compilation error: Duplicate file name model.joblib in IMPORTS clause.

#### Inference on data from feature_store.

In [23]:
# this will take all training dataset as input, let use smaller dataset in the following
# inference_df = session.table('feature_store').limit(100)
# inference_df = inference_df.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
# inputs = inference_df.drop("TOTAL_SALES")
# snowdf_results = inference_df.select(*inputs,
#                     predict(*inputs).alias('PREDICTION'), 
#                     (F.col('TOTAL_SALES')).alias('ACTUAL_SALES')
#                     )
# snowdf_results.write.mode('overwrite').save_as_table('predictions')

#### Inference on typed data

In [24]:
# assume this is from strealit UI
typed_input = [[1969, '66060','M','U','Low Risk','2 yr Degree', 1]]

In [25]:
input_df = session.create_dataframe(typed_input, schema=features)

In [26]:
typed_output = input_df.select(*input_df,
                    predict(*input_df).alias('PREDICTION'))


In [27]:
typed_output.show()

-----------------------------------------------------------------------------------------------------------------------------------------------
|"C_BIRTH_YEAR"  |"CA_ZIP"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_CREDIT_RATING"  |"CD_EDUCATION_STATUS"  |"CD_DEP_COUNT"  |"PREDICTION"   |
-----------------------------------------------------------------------------------------------------------------------------------------------
|1969            |66060     |M            |U                    |Low Risk            |2 yr Degree            |1               |32338.9609375  |
-----------------------------------------------------------------------------------------------------------------------------------------------



In [28]:
%%writefile Customer_Lifetime_Value.py
import pandas as pd
import streamlit as st

st.write('## Introduction')

Overwriting Customer_Lifetime_Value.py


In [29]:
%%writefile pages/CLV_prediction.py
import pandas as pd
import streamlit as st

st.write('## XGBoost model to predict CLV')

st.write('Type inputs here......')


Overwriting pages/CLV_prediction.py
