## Snowflake Setup

In [None]:
import pandas as pd

from snowflake.snowpark import functions as F
from snowflake.snowpark import version as V

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.preprocessing import KBinsDiscretizer, OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.snowpark.context import get_active_session

session = get_active_session()

In [None]:
import warnings 
warnings.filterwarnings("ignore")

In [None]:
session.sql('CREATE OR REPLACE DATABASE TPCDS').collect()
session.sql('CREATE OR REPLACE SCHEMA TPCDS.DEMO').collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.sql('CREATE OR REPLACE STAGE TPCDS.DEMO.ML_MODELS').collect()
session.use_database('TPCDS')
session.use_schema('DEMO')

In [None]:
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'TPCDS_10TB'

if TPCDS_SIZE_PARAM == 100: 
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")
    
store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales') 
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales') 
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

## Data Engineering

In [None]:
store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [None]:
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [None]:
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))


In [None]:
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')


In [None]:
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.rename('c_customer_sk', 'customer_sk')

In [None]:
customer.limit(5).to_pandas()

In [None]:
final_df = total_sales.join(customer, on='customer_sk')
final_df.count()

In [None]:
session.use_database('TPCDS')
session.use_schema('DEMO')
final_df.write.mode('overwrite').save_as_table('feature_store')

## Feature Engineering

In [None]:
session.use_warehouse('snowpark_opt_wh')
session.use_database('TPCDS')
session.use_schema('DEMO')

In [None]:
snowdf = session.table("feature_store")
snowdf = snowdf.drop(['CA_ZIP','CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])


In [None]:
snowdf.limit(5).to_pandas()

In [None]:
cat_cols = ['CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

### Missing Value Imputation

We can use the SimpleImputer in snowflake.ml.preprocessing to replace missing values with the most frequent.

```
# SimpleImputer in snowflake.ml.preprocessing
from snowflake.ml.modeling.impute import SimpleImputer
my_imputer = sfml.preprocessing.SimpleImputer(input_cols=['your_column'],
                                output_cols=['your_column'],
                                strategy='constant',
                                fill_value='OTHER')
my_imputer.fit(my_sdf)
my_sdf = my_imputer.transform(my_sdf)

```

In [None]:
from snowflake.ml.modeling.impute import SimpleImputer


In [None]:
#imputation of numeric cols
my_imputer = SimpleImputer(input_cols=num_cols,
                            output_cols= num_cols,
                           strategy='median')
sdf_prepared = my_imputer.fit(snowdf).transform(snowdf)

### One-hot Encoding of Categorical Cols

In [None]:
# OHE of Categorical Cols
my_ohe_encoder = OneHotEncoder(input_cols=cat_cols, output_cols=cat_cols, drop_input_cols=True)
sdf_prepared = my_ohe_encoder.fit(sdf_prepared).transform(sdf_prepared)

In [None]:
sdf_prepared.limit(5).to_pandas()

### Clean Column Names

In [None]:
# Cleaning column names to make it easier for future referencing
import re

cols = sdf_prepared.columns
for old_col in cols:
    new_col = re.sub(r'[^a-zA-Z0-9_]', '', old_col)
    new_col = new_col.upper()
    sdf_prepared = sdf_prepared.rename(F.col(old_col), new_col)

In [None]:
#Save the train and test as time stamped tables in snowflake
snowdf_train , snowdf_test =sdf_prepared.random_split([0.8,0.2],seed=82)
snowdf_train.fillna(0).write.mode("overwrite").save_as_table('tpc_train')
snowdf_test.fillna(0).write.mode("overwrite").save_as_table('tpc_test')

## ML Modelling


In [None]:
session.use_warehouse('snowpark_opt_wh')

In [None]:
#Intialize Model registry object in snowflake
from snowflake.ml.registry  import registry

native_registry = registry.Registry(session=session,database_name='TPCDS', schema_name='demo')

### Get data

In [None]:
snowdfTrain = session.table('TPCDS.DEMO.TPC_TRAIN')

In [None]:
# Prepare data for modelling
feature_cols = snowdf_train.columns
feature_cols.remove('TOTAL_SALES')
target_col = 'TOTAL_SALES'

### Intialize Model and Fit

In [None]:
# Define the XGBRegressor and fit the model
xgbmodel = XGBRegressor(n_estimators = 100, random_state=123, max_depth = 3, input_cols=feature_cols, label_cols=target_col, output_cols='PREDICTION')
xgbmodel.fit(snowdf_train)

### Predict on a Small Test

In [None]:
snowdf_test =  session.table('TPCDS.DEMO.TPC_TEST').limit(1000)
sdf_scored = xgbmodel.predict(snowdf_test)

In [None]:
sdf_scored.limit(5).to_pandas()

### Log model in Snowflake Model Registry

You can refernce this model in a seperate workflow without training it again.

In [None]:
# Define model name
model_name = "DEMO_TPCDS"
model_version = f"V1_{model_name}"

# Let's log the best model trained
model_ver = native_registry.log_model(
    model_name= model_name,
    version_name= model_version,
    model= xgbmodel,
    comment= "Wasn't this super easy?"
)

In [None]:
native_registry.show_models()

## 5.0 Using Model Registry for Inference

The Snowpark Model Registry stores machine learning models as first-class schema-level objects in Snowflake so they can easily be found and used by others in your organization.

Once you have stored a model, you can invoke its methods (equivalent to functions or stored procedures) to perform model operations, such as inference, in a Snowflake virtual warehouse.


### 5.1 View all model version

In [None]:
model_name = "DEMO_TPCDS"
native_registry.get_model(model_name).show_versions()

### 5.2 Get Default Version

In [None]:
model = native_registry.get_model(model_name).default
