In [None]:
# Base dir for this code
import os
base_dir = os.getcwd()
print(base_dir)

/content


In [None]:
#!pip uninstall Snowflake

In [None]:
!pip install snowflake



In [None]:
!pip install snowflake-snowpark-python==1.5.1



In [None]:
!pip install snowflake-ml-python



In [None]:
!pip install xgboost==1.7.3



In [None]:
import json
import pandas as pd

from snowflake.snowpark import functions as F
from snowflake.snowpark import version as v
from snowflake.snowpark.session import Session

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.preprocessing import KBinsDiscretizer, OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer

In [None]:
import warnings
warnings.filterwarnings("ignore")

# 1.0 Snowflake Setup

In [None]:
# Ensure that your credentials are stored in creds.json
with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['user']
    PASSWORD = data['password']
    SF_ACCOUNT = data['account']
    SF_WH = data['warehouse']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

DatabaseError: ignored

#### Ensure that TPC-DS dataset is available in your environment.

In [None]:
session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()
session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='3X-LARGE'").collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.sql("CREATE OR REPLACE STAGE TPCDS_XGBOOST.DEMO.ML_MODELS").collect()
session.use_warehouse('FE_AND_INFERENCE_WH')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse.

In [None]:
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'SNOWFLAKE_SAMPLE_DATA' # Name of Snowflake Sample Database might be different...

if TPCDS_SIZE_PARAM == 100:
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")

store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales')
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales')
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

# 2.0 Data Engineering
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data.

In [None]:
store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [None]:
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [None]:
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))

In [None]:
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')

In [None]:
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.rename('c_customer_sk', 'customer_sk')

In [None]:
customer.limit(5).to_pandas()

In [None]:
final_df = total_sales.join(customer, on='customer_sk')

In [None]:
# Size of the final DF is around 95 Million.
final_df.count()

In [None]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
final_df.write.mode('overwrite').save_as_table('feature_store')

# 3.0 Feature Engineering

In [None]:
session.use_warehouse('snowpark_opt_wh')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

In [None]:
snowdf = session.table("feature_store")
snowdf = snowdf.drop(['CA_ZIP','CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])

In [None]:
from snowflake.snowpark.functions import col



snowdf = snowdf.withColumn("C_BIRTH_YEAR", col("C_BIRTH_YEAR").cast("float"))

snowdf = snowdf.withColumn("CD_DEP_COUNT", col("CD_DEP_COUNT").cast("float"))

In [None]:
snowdf.limit(5).to_pandas()

In [None]:
cat_cols = ['CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

### 3.1 Missing Value Imputation

We can use the SimpleImputer in snowflake.ml.preprocessing to replace missing values with the most frequent.

```python
# SimpleImputer in snowflake.ml.preprocessing
from snowflake.ml.modeling.impute import SimpleImputer
my_imputer = sfml.preprocessing.SimpleImputer(input_cols=['your_column'],
                                output_cols=['your_column'],
                                strategy='constant',
                                fill_value='OTHER')
my_imputer.fit(my_sdf)
my_sdf = my_imputer.transform(my_sdf)
```

In [None]:
# Imputation of Numeric Cols
my_imputer = SimpleImputer(input_cols= num_cols,
                           output_cols= num_cols,
                           strategy='median')
sdf_prepared = my_imputer.fit(snowdf).transform(snowdf)

### 3.2 One-Hot Encoding of Categorical Cols

In [None]:
# OHE of Categorical Cols
my_ohe_encoder = OneHotEncoder(input_cols=cat_cols, output_cols=cat_cols, drop_input_cols=True)
sdf_prepared = my_ohe_encoder.fit(sdf_prepared).transform(sdf_prepared)

In [None]:
sdf_prepared.limit(5).to_pandas()

### 3.3 Clean column names

In [None]:
# Cleaning column names to make it easier for future referencing
import re

cols = sdf_prepared.columns
for old_col in cols:
    new_col = re.sub(r'[^a-zA-Z0-9_]', '', old_col)
    new_col = new_col.upper()
    sdf_prepared = sdf_prepared.withColumnRenamed(old_col, new_col)

# 4.0 ML Modeling

In [None]:
# Use Snowpark Optimized Warehouse
session.use_warehouse('snowpark_opt_wh')

### 4.1 Prepare data

In [None]:
# Prepare Data for modeling
feature_cols = sdf_prepared.columns
feature_cols.remove('TOTAL_SALES')
target_col = 'TOTAL_SALES'

In [None]:
# Save the train and test sets as time stamped tables in Snowflake
snowdf_train, snowdf_test = sdf_prepared.random_split([0.8, 0.2], seed=82)
snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")

### 4.2 Initialize Model and Fit

In [None]:
# Define the XGBRegressor and fit the model
xgbmodel = XGBRegressor(random_state=123, input_cols=feature_cols, label_cols=target_col, output_cols='PREDICTION')
xgbmodel.fit(snowdf_train)

### 4.3 Predict on test set

In [None]:
# Score the data using the fitted xgbmodel
sdf_scored = xgbmodel.predict(snowdf_test)

In [None]:
sdf_scored.limit(5).to_pandas()

### 4.4 Save predictions in Snowflake

In [None]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
sdf_scored.write.mode('overwrite').save_as_table('predictions')

# 5.0 Deploying trained model as UDF for future usage

Steps to follow-
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage
4. Create UDF using model in stage

We can use `to_xgboost()` in order to get the actual xgboost model object which gives us access to all its attributes.

### Creating sample dataset for quick predictions

In [None]:
snowdf_test = session.table('tpc_TEST')
# Predicting with sample dataset
sample_data = snowdf_test.limit(100)
sample_data.write.mode("overwrite").save_as_table("temp_test")

In [None]:
test_sdf = session.table('temp_test')

### 5.1 Prepare model to convert to UDF
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage

In [None]:
import joblib
import cachetools

In [None]:
xgb_file = xgbmodel.to_xgboost()
xgb_file

In [None]:
MODEL_FILE = 'model.joblib.gz'
joblib.dump(xgb_file, MODEL_FILE) # we are just pickling it locally first

In [None]:
# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@ML_MODELS", auto_compress=False, overwrite=True)

### 5.2 Create UDF for future reference


In [None]:
from snowflake.snowpark.functions import udf
import snowflake.snowpark.types as T

In [None]:
# Define a simple scoring function
from cachetools import cached

@cached(cache={})
def load_model(model_path: str) -> object:
    from joblib import load
    model = load(model_path)
    return model

def udf_score_xgboost_model_vec_cached(df: pd.DataFrame) -> pd.Series:
    import os
    import sys
    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = 'model.joblib.gz'
    model = load_model(import_dir+model_name)
    df.columns = feature_cols
    scored_data = pd.Series(model.predict(df))
    return scored_data

In [None]:
# Register UDF
udf_clv = session.udf.register(func=udf_score_xgboost_model_vec_cached,
                               name="TPCDS_PREDICT_CLV",
                               stage_location='@ML_MODELS',
                               input_types=[T.FloatType()]*len(feature_cols),
                               return_type = T.FloatType(),
                               replace=True,
                               is_permanent=True,
                               imports=['@ML_MODELS/model.joblib.gz'],
                               packages=['pandas',
                                         'xgboost',
                                         'joblib',
                                         'cachetools'],
                               session=session)

### 5.3 Extra Stuff

### Inference using UDF Created right here

Note we are using `udf_clv` that was defined earlier.

In [None]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED', udf_clv(*feature_cols))
test_sdf_w_preds.limit(2).to_pandas()

### Inference using UDF Called from Snowflake

Notice we are calling the UDF created in snowflake using `F.call_udf()`

In [None]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED',F.call_udf("TPCDS_PREDICT_CLV",
                                                               [F.col(c) for c in feature_cols]))
test_sdf_w_preds.limit(2).to_pandas()

# 6.0 Wrap up

In [None]:
session.close()