In [12]:
# Base dir for this code
import os
base_dir = os.getcwd()
print(base_dir)

/Users/baluooj/INFO7374/Assignment-2/tpcds-customer-lifetime-value


In [20]:
import json
import pandas as pd

from snowflake.snowpark import functions as F
from snowflake.snowpark import version as v
from snowflake.snowpark.session import Session

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.preprocessing import KBinsDiscretizer, OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer

In [21]:
import warnings
warnings.filterwarnings("ignore")

# 1.0 Snowflake Setup

In [22]:
# Ensure that your credentials are stored in creds.json
with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['user']
    PASSWORD = data['password']
    SF_ACCOUNT = data['account']
    SF_WH = data['warehouse']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

#### Ensure that TPC-DS dataset is available in your environment.

In [23]:
session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()
session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='3X-LARGE'").collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.sql("CREATE OR REPLACE STAGE TPCDS_XGBOOST.DEMO.ML_MODELS").collect()
session.use_warehouse('FE_AND_INFERENCE_WH')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse. 

In [26]:
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'SNOWFLAKE_SAMPLE_DATA' # Name of Snowflake Sample Database might be different...

if TPCDS_SIZE_PARAM == 100: 
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")
    
store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales') 
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales') 
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

# 2.0 Data Engineering
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data. 

In [27]:
store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [28]:
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [29]:
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))

In [30]:
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')

In [31]:
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.rename('c_customer_sk', 'customer_sk')

In [32]:
customer.limit(5).to_pandas()

Unnamed: 0,CUSTOMER_SK,C_CURRENT_HDEMO_SK,C_CURRENT_ADDR_SK,C_CUSTOMER_ID,C_BIRTH_YEAR,CA_ADDRESS_SK,CA_ZIP,CD_DEMO_SK,CD_GENDER,CD_MARITAL_STATUS,CD_CREDIT_RATING,CD_EDUCATION_STATUS,CD_DEP_COUNT
0,47565134,2903,11432972,AAAAAAAAOEJMFNCA,1990,11432972,68371,2903,M,S,High Risk,2 yr Degree,0
1,47565135,2457,29478386,AAAAAAAAPEJMFNCA,1966,29478386,70499,2457,M,W,Low Risk,Primary,0
2,47565136,450,23602579,AAAAAAAAAFJMFNCA,1934,23602579,11952,450,F,U,Good,College,0
3,47565137,1315,616770,AAAAAAAABFJMFNCA,1969,616770,54593,1315,M,D,Good,Advanced Degree,0
4,47565138,2064,3437061,AAAAAAAACFJMFNCA,1974,3437061,99310,2064,F,S,Low Risk,2 yr Degree,0


In [33]:
final_df = total_sales.join(customer, on='customer_sk')

In [34]:
# Size of the final DF is around 95 Million.
final_df.count()

62726989

In [35]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
final_df.write.mode('overwrite').save_as_table('feature_store')

# 3.0 Feature Engineering

In [36]:
session.use_warehouse('snowpark_opt_wh')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

In [37]:
snowdf = session.table("feature_store")
snowdf = snowdf.drop(['CA_ZIP','CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])

In [38]:
snowdf.limit(5).to_pandas()

Unnamed: 0,TOTAL_SALES,C_BIRTH_YEAR,CD_GENDER,CD_MARITAL_STATUS,CD_CREDIT_RATING,CD_EDUCATION_STATUS,CD_DEP_COUNT
0,34091.42,1967,M,U,Good,College,0
1,33092.14,1933,M,U,Good,College,0
2,31775.91,1958,M,U,Good,College,0
3,27624.84,1961,M,U,Good,College,0
4,29109.72,1944,M,U,Good,College,0


In [39]:
cat_cols = ['CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

### 3.1 Missing Value Imputation

We can use the SimpleImputer in snowflake.ml.preprocessing to replace missing values with the most frequent.

```python
# SimpleImputer in snowflake.ml.preprocessing
from snowflake.ml.modeling.impute import SimpleImputer
my_imputer = sfml.preprocessing.SimpleImputer(input_cols=['your_column'],
                                output_cols=['your_column'],
                                strategy='constant',
                                fill_value='OTHER')
my_imputer.fit(my_sdf)
my_sdf = my_imputer.transform(my_sdf)
```

In [40]:
# Imputation of Numeric Cols
my_imputer = SimpleImputer(input_cols= num_cols,
                           output_cols= num_cols,
                           strategy='median')
sdf_prepared = my_imputer.fit(snowdf).transform(snowdf)

Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "C_BIRTH_YEAR", Type: LongType(), Input Value: 1958.0, Type: <class 'float'>
Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "CD_DEP_COUNT", Type: LongType(), Input Value: 0.0, Type: <class 'float'>


### 3.2 One-Hot Encoding of Categorical Cols

In [41]:
# OHE of Categorical Cols
my_ohe_encoder = OneHotEncoder(input_cols=cat_cols, output_cols=cat_cols, drop_input_cols=True)
sdf_prepared = my_ohe_encoder.fit(sdf_prepared).transform(sdf_prepared)

In [42]:
sdf_prepared.limit(5).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_Good,CD_CREDIT_RATING_High Risk,CD_CREDIT_RATING_Low Risk,...,CD_EDUCATION_STATUS_2 yr Degree,CD_EDUCATION_STATUS_4 yr Degree,CD_EDUCATION_STATUS_Advanced Degree,CD_EDUCATION_STATUS_College,CD_EDUCATION_STATUS_Primary,CD_EDUCATION_STATUS_Secondary,CD_EDUCATION_STATUS_Unknown,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES
0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1977,0,31417.18
1,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1983,0,30234.85
2,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1983,0,29805.64
3,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1941,0,33221.52
4,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1927,0,30050.24


### 3.3 Clean column names

In [45]:
# Cleaning column names to make it easier for future referencing
import re

cols = sdf_prepared.columns
for old_col in cols:
    new_col = re.sub(r'[^a-zA-Z0-9_]', '', old_col)
    new_col = new_col.upper()
    sdf_prepared = sdf_prepared.rename(col(old_col), new_col)

TypeError: 'list' object is not callable

# 4.0 ML Modeling

In [46]:
# Use Snowpark Optimized Warehouse
session.use_warehouse('snowpark_opt_wh')

### 4.1 Prepare data

In [47]:
# Prepare Data for modeling
feature_cols = sdf_prepared.columns
feature_cols.remove('TOTAL_SALES')
target_col = 'TOTAL_SALES'

In [48]:
# Save the train and test sets as time stamped tables in Snowflake
snowdf_train, snowdf_test = sdf_prepared.random_split([0.8, 0.2], seed=82) 
snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")

### 4.2 Initialize Model and Fit

In [50]:
# Define the XGBRegressor and fit the model
xgbmodel = XGBRegressor(random_state=123, input_cols=feature_cols, label_cols=target_col, output_cols='PREDICTION')
xgbmodel.fit(snowdf_train)

The version of package 'snowflake-snowpark-python' in the local environment is 1.10.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.


<snowflake.ml.modeling.xgboost.xgb_regressor.XGBRegressor at 0x7fa389157610>

### 4.3 Predict on test set

In [51]:
# Score the data using the fitted xgbmodel
sdf_scored = xgbmodel.predict(snowdf_test)

In [52]:
sdf_scored.limit(5).to_pandas()

Unnamed: 0,CD_CREDIT_RATING_High Risk,CD_EDUCATION_STATUS_Secondary,CD_EDUCATION_STATUS_Advanced Degree,CD_GENDER_M,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_D,C_BIRTH_YEAR,CD_MARITAL_STATUS_W,CD_DEP_COUNT,CD_EDUCATION_STATUS_4 yr Degree,...,CD_CREDIT_RATING_Low Risk,TOTAL_SALES,CD_GENDER_F,CD_EDUCATION_STATUS_College,CD_EDUCATION_STATUS_Primary,CD_EDUCATION_STATUS_Unknown,CD_CREDIT_RATING_Good,CD_CREDIT_RATING_Unknown,CD_EDUCATION_STATUS_2 yr Degree,PREDICTION
0,0.0,0.0,0.0,1.0,0.0,0.0,1968,0.0,0,1.0,...,0.0,30469.08,0.0,0.0,0.0,0.0,1.0,0.0,0.0,32310.421875
1,0.0,0.0,0.0,1.0,0.0,0.0,1992,0.0,0,1.0,...,0.0,29757.18,0.0,0.0,0.0,0.0,1.0,0.0,0.0,32447.109375
2,0.0,0.0,0.0,1.0,0.0,0.0,1956,0.0,0,1.0,...,0.0,28176.81,0.0,0.0,0.0,0.0,1.0,0.0,0.0,32326.164062
3,0.0,0.0,0.0,1.0,0.0,0.0,1989,0.0,0,1.0,...,0.0,41651.39,0.0,0.0,0.0,0.0,1.0,0.0,0.0,32334.818359
4,0.0,0.0,0.0,1.0,0.0,0.0,1926,0.0,0,1.0,...,0.0,33969.02,0.0,0.0,0.0,0.0,1.0,0.0,0.0,32325.320312


### 4.4 Save predictions in Snowflake

In [53]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
sdf_scored.write.mode('overwrite').save_as_table('predictions')

SnowparkSQLException: (1304): 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "/home/udf/80666120/udf_py_1590301937.zip/udf_py_1590301937.py", line 118, in compute
    return lock_function_once(func, invoked)(*[df[idx] for idx in range(df.shape[1])])
  File "/home/udf/80666120/udf_py_1590301937.zip/udf_py_1590301937.py", line 111, in wrapper
    return f(*args, **kwargs)
  File "/Users/baluooj/anaconda3/envs/pysnowpark_mls/lib/python3.9/site-packages/snowflake/ml/modeling/_internal/snowpark_handlers.py", line 438, in vec_batch_infer
  File "/usr/lib/python_udf/76ea5cd402868097e7ca680c5ec94bc07fe402be32dc23fbe206bb07d545232c/lib/python3.9/site-packages/xgboost/sklearn.py", line 1114, in predict
    predts = self.get_booster().inplace_predict(
  File "/usr/lib/python_udf/76ea5cd402868097e7ca680c5ec94bc07fe402be32dc23fbe206bb07d545232c/lib/python3.9/site-packages/xgboost/core.py", line 2283, in inplace_predict
    data, fns, _ = _transform_pandas_df(data, enable_categorical)
  File "/usr/lib/python_udf/76ea5cd402868097e7ca680c5ec94bc07fe402be32dc23fbe206bb07d545232c/lib/python3.9/site-packages/xgboost/data.py", line 378, in _transform_pandas_df
    _invalid_dataframe_dtype(data)
  File "/usr/lib/python_udf/76ea5cd402868097e7ca680c5ec94bc07fe402be32dc23fbe206bb07d545232c/lib/python3.9/site-packages/xgboost/data.py", line 270, in _invalid_dataframe_dtype
    raise ValueError(msg)
ValueError: DataFrame.dtypes for data must be int, float, bool or category. When categorical type is supplied, The experimental DMatrix parameter`enable_categorical` must be set to `True`.  Invalid columns:C_BIRTH_YEAR: object
 in function SNOWPARK_TEMP_FUNCTION_WA0K3GA5O0 with handler udf_py_1590301937.compute

# 5.0 Deploying trained model as UDF for future usage

Steps to follow-
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage
4. Create UDF using model in stage

We can use `to_xgboost()` in order to get the actual xgboost model object which gives us access to all its attributes.

### Creating sample dataset for quick predictions

In [54]:
snowdf_test = session.table('tpc_TEST')
# Predicting with sample dataset
sample_data = snowdf_test.limit(100)
sample_data.write.mode("overwrite").save_as_table("temp_test")

In [55]:
test_sdf = session.table('temp_test')

### 5.1 Prepare model to convert to UDF
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage

In [56]:
import joblib
import cachetools

In [57]:
xgb_file = xgbmodel.to_xgboost()
xgb_file

In [58]:
MODEL_FILE = 'model.joblib.gz'
joblib.dump(xgb_file, MODEL_FILE) # we are just pickling it locally first

['model.joblib.gz']

In [59]:
# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@ML_MODELS", auto_compress=False, overwrite=True)

[PutResult(source='model.joblib.gz', target='model.joblib.gz', source_size=142023, target_size=142032, source_compression='GZIP', target_compression='GZIP', status='UPLOADED', message='')]

### 5.2 Create UDF for future reference


In [60]:
from snowflake.snowpark.functions import udf
import snowflake.snowpark.types as T

In [61]:
# Define a simple scoring function
from cachetools import cached

@cached(cache={})
def load_model(model_path: str) -> object:
    from joblib import load
    model = load(model_path)
    return model

def udf_score_xgboost_model_vec_cached(df: pd.DataFrame) -> pd.Series:
    import os
    import sys
    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = 'model.joblib.gz'
    model = load_model(import_dir+model_name)
    df.columns = feature_cols
    scored_data = pd.Series(model.predict(df))
    return scored_data

In [62]:
# Register UDF
udf_clv = session.udf.register(func=udf_score_xgboost_model_vec_cached, 
                               name="TPCDS_PREDICT_CLV", 
                               stage_location='@ML_MODELS',
                               input_types=[T.FloatType()]*len(feature_cols),
                               return_type = T.FloatType(),
                               replace=True, 
                               is_permanent=True, 
                               imports=['@ML_MODELS/model.joblib.gz'],
                               packages=['pandas',
                                         'xgboost',
                                         'joblib',
                                         'cachetools'], 
                               session=session)

### 5.3 Extra Stuff

### Inference using UDF Created right here

Note we are using `udf_clv` that was defined earlier.

In [63]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED', udf_clv(*feature_cols))
test_sdf_w_preds.limit(2).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_Good,CD_CREDIT_RATING_High Risk,CD_CREDIT_RATING_Low Risk,...,CD_EDUCATION_STATUS_4 yr Degree,CD_EDUCATION_STATUS_Advanced Degree,CD_EDUCATION_STATUS_College,CD_EDUCATION_STATUS_Primary,CD_EDUCATION_STATUS_Secondary,CD_EDUCATION_STATUS_Unknown,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES,PREDICTED
0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1964,0,36673.08,32294.164062
1,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1989,0,29213.03,32304.625


### Inference using UDF Called from Snowflake

Notice we are calling the UDF created in snowflake using `F.call_udf()`

In [64]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED',F.call_udf("TPCDS_PREDICT_CLV",
                                                               [F.col(c) for c in feature_cols]))
test_sdf_w_preds.limit(2).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_Good,CD_CREDIT_RATING_High Risk,CD_CREDIT_RATING_Low Risk,...,CD_EDUCATION_STATUS_4 yr Degree,CD_EDUCATION_STATUS_Advanced Degree,CD_EDUCATION_STATUS_College,CD_EDUCATION_STATUS_Primary,CD_EDUCATION_STATUS_Secondary,CD_EDUCATION_STATUS_Unknown,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES,PREDICTED
0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1964,0,36673.08,32294.164062
1,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1989,0,29213.03,32304.625


# 6.0 Wrap up

In [65]:
session.close()