![title](RefractArchitecture.png)

Refract leverages powerful capabilities of snowflake like snowparkml, snowpark container services to fast track the Machine learning life cycle on data available in snowflake. Snowpark ML pushes down the compute to snowflake warehouses for all your data preprocessing and machine learning workflows. 



# Steps to preprocess, train and deploy a Snowpark ML Model

In [3]:
# ! python -m pip install snowflake-ml-python==1.1.0 -U

In [1]:
# from snowflake.snowpark import Session
# from snowflake.ml.modeling.pipeline import Pipeline
# from snowflake.ml.modeling.xgboost import XGBClassifier
# from snowflake.ml.modeling.preprocessing import MinMaxScaler, OrdinalEncoder, OneHotEncoder
# from sklearn.metrics import mean_absolute_percentage_error
# # Pandas Tools
# from snowflake.connector.pandas_tools import write_pandas
# # Data Science Libs
# import numpy as np
# import pandas as pd
# # create_temp_table warning suppresion
# import warnings; warnings.simplefilter('ignore')
# import configparser

## Create connection to snowflake account

In [22]:
config = configparser.ConfigParser()
config.read("snowflake_connection.ini")

connection_parameters = {
    "user": f'{config["Snowflake"]["user"]}',
    "password": f'{config["Snowflake"]["password"]}',
    "account": f'{config["Snowflake"]["account"]}',
    "WAREHOUSE": f'{config["Snowflake"]["WAREHOUSE"]}',
    "DATABASE": f'{config["Snowflake"]["DATABASE"]}',
    "SCHEMA": f'{config["Snowflake"]["SCHEMA"]}'
}

def snowflake_connector(conn):
    try:
        session = Session.builder.configs(conn).create()
        print("connection successful!")
    except:
        raise ValueError("error while connecting with db")
    return session

session = snowflake_connector(connection_parameters)

connection successful!


## Fetch reference of data from snowflake table

In [23]:
table_name = "ATTRITION"
sf_df = session.table(table_name)
sf_df = sf_df.na.drop()
sf_df = sf_df.drop("USER_ID", "JOB_STARTDATE", "JOB_ENDDATE", "SCHOOL_ENDDATE")
df = sf_df.to_pandas()
df["CHURN"] = df["CHURN"].astype(int)
sf_df = session.create_dataframe(df)

In [24]:
CATEGORICAL_COLUMNS = ["MAPPED_ROLE_CLEAN","SEX", "ETHNICITY","HOSPITAL_TYPE", "HOSPITAL_OWNERSHIP",
                       "COMPANY_NAME","CITY_STATE","DISTANCE", "DEGREE_CLEAN"]
NUMERICAL_COLUMNS = ["SALARY", "SENIORITY", "TENURE_MONTHS", "MONTHS_AFTER_COLLEGE", "BIRTH_YEAR", "OVERTIME_HOURS"]
LABEL_COLUMNS = ["CHURN"]
OUTPUT_COLUMNS = ["PREDICTION"]
train_df, test_df = sf_df.random_split([0.8,0.2], seed=25)

## Create pipeline - Includes preprocessing and modeling
Defining the pipeline with preprocessing steps and algorithm setup.

In [25]:
## train_df and test_df are both snowpark dataframes
pipeline = Pipeline(
    steps=[(
                "OE",
                OrdinalEncoder(
                input_cols=CATEGORICAL_COLUMNS,
                output_cols=CATEGORICAL_COLUMNS,
                handle_unknown='use_encoded_value',
                unknown_value=-1
                )),
               ("MMS",
                MinMaxScaler(
                clip=True,
                input_cols=NUMERICAL_COLUMNS,
                output_cols=NUMERICAL_COLUMNS,
                )),
               ("classification",
                XGBClassifier(
                input_cols=CATEGORICAL_COLUMNS+NUMERICAL_COLUMNS,
                label_cols=LABEL_COLUMNS,
                output_cols=OUTPUT_COLUMNS
                ))])


### Training the model

In [None]:
pipeline.fit(train_df) ## fiting the dataset
result = pipeline.predict(test_df) 

In [26]:
from joblib import dump, load
filename = "snowflake_event_model.joblib"
dump(pipeline, filename)

['snowflake_event_model.joblib']

# Registering Model In Snowflake

In [28]:
# session = Session.builder.configs(SnowflakeLoginOptions()).create()
session = snowflake_connector(connection_parameters)

connection successful!


In [29]:
REGISTRY_DATABASE_NAME = "FOSFOR_REFRACT"
REGISTRY_SCHEMA_NAME = "HR_ANALYTICS"

In [30]:
from snowflake.ml.registry import model_registry

### Creating Model Registry

In [31]:
model_registry.create_model_registry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

create_model_registry() is in private preview since 0.2.0. Do not use it in production. 


True

In [32]:
registry = model_registry.ModelRegistry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

In [33]:
registry.get_history().show()



-------------------------------------------------------------------------------------------------------------------
|"EVENT_TIMESTAMP"  |"EVENT_ID"  |"MODEL_ID"  |"ROLE"  |"OPERATION"  |"ATTRIBUTE_NAME"  |"VALUE[ATTRIBUTE_NAME]"  |
-------------------------------------------------------------------------------------------------------------------
|                   |            |            |        |             |                  |                         |
-------------------------------------------------------------------------------------------------------------------



### Registering the model to snowpak ml registry

In [35]:
model_id = registry.log_model(
    model_name=MODEL_NAME,
    model_version=MODEL_VERSION,
    model=pipeline,
    tags={"stage": "snowflake_event", "classifier_type": "xgboost"},
    sample_input_data=test_df.show(),
    options={"embed_local_ml_library": True}
)



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SALARY"   |"SENIORITY"  |"TENURE_MONTHS"  |"MONTHS_AFTER_COLLEGE"  |"BIRTH_YEAR"  |"MAPPED_ROLE_CLEAN"  |"SEX"  |"ETHNICITY"  |"HOSPITAL_TYPE"       |"HOSPITAL_OWNERSHIP"                         |"COMPANY_NAME"           |"CITY_STATE"     |"DISTANCE"  |"DEGREE_CLEAN"         |"OVERTIME_HOURS"  |"CHURN"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|62668.834  |2            |8                |106                     |198



In [17]:
# registry.get_history().show()

In [36]:
model_list = registry.list_models()
model_list.to_pandas()

Unnamed: 0,CREATION_CONTEXT,CREATION_ENVIRONMENT_SPEC,CREATION_ROLE,CREATION_TIME,ID,INPUT_SPEC,NAME,OUTPUT_SPEC,RUNTIME_ENVIRONMENT_SPEC,TYPE,URI,VERSION,ARTIFACT_IDS,DESCRIPTION,METRICS,TAGS,REGISTRATION_TIMESTAMP
0,,"{\n ""python"": ""3.8.16""\n}","""FOSFOR_REFRACT""",2023-11-29 12:08:52.332000+00:00,0d9ef3188eb011ee8022aeeb3b7fbcf0,,ChurnPrediction,,,snowml,sfc://FOSFOR_REFRACT.HR_ANALYTICS.SNOWML_MODEL...,1,,,,"{\n ""classifier_type"": ""xgboost"",\n ""stage"":...",2023-11-29 12:08:53.648000+00:00


### Deploying the model from Registry

In [37]:
# Pick a deployment name and deploy
# model_deployment_name = model_name + f"{model_version}" + "_UDF"

registry.deploy(model_name="ChurnPrediction",
                model_version="01",
                deployment_name="ChurnPrediction_01_UDF", 
                target_method="predict", 
                permanent=True, 
                options={"relax_version": True})



{'name': 'FOSFOR_REFRACT.HR_ANALYTICS.ChurnPrediction_01_UDF',
 'platform': <TargetPlatform.WAREHOUSE: 'warehouse'>,
 'target_method': 'predict',
 'signature': ModelSignature(
                     inputs=[
                         FeatureSpec(dtype=DataType.DOUBLE, name='SALARY'),
 		FeatureSpec(dtype=DataType.INT8, name='SENIORITY'),
 		FeatureSpec(dtype=DataType.INT16, name='TENURE_MONTHS'),
 		FeatureSpec(dtype=DataType.INT16, name='MONTHS_AFTER_COLLEGE'),
 		FeatureSpec(dtype=DataType.INT16, name='BIRTH_YEAR'),
 		FeatureSpec(dtype=DataType.STRING, name='MAPPED_ROLE_CLEAN'),
 		FeatureSpec(dtype=DataType.STRING, name='SEX'),
 		FeatureSpec(dtype=DataType.STRING, name='ETHNICITY'),
 		FeatureSpec(dtype=DataType.STRING, name='HOSPITAL_TYPE'),
 		FeatureSpec(dtype=DataType.STRING, name='HOSPITAL_OWNERSHIP'),
 		FeatureSpec(dtype=DataType.STRING, name='COMPANY_NAME'),
 		FeatureSpec(dtype=DataType.STRING, name='CITY_STATE'),
 		FeatureSpec(dtype=DataType.STRING, name='DISTANCE'),
 		Fe

In [38]:
# Let's confirm it was added
registry.list_deployments(MODEL_NAME, MODEL_VERSION).to_pandas()



Unnamed: 0,MODEL_NAME,MODEL_VERSION,DEPLOYMENT_NAME,CREATION_TIME,TARGET_METHOD,TARGET_PLATFORM,SIGNATURE,OPTIONS,STAGE_PATH,ROLE
0,ChurnPrediction,1,ChurnPrediction_01_UDF,2023-11-29 12:09:19.720000+00:00,predict,warehouse,"{\n ""inputs"": [\n {\n ""name"": ""SALARY...","{\n ""permanent_udf_stage_location"": ""@FOSFOR_...",@FOSFOR_REFRACT.HR_ANALYTICS._SYSTEM_REGISTRY_...,"""FOSFOR_REFRACT"""


Creating pipeline reference from registry

In [39]:
model_ref = model_registry.ModelReference(registry=registry, model_name=MODEL_NAME, model_version=MODEL_VERSION)

In [33]:
table_name = "ATTRITION"
sf_df = session.table(table_name)
sf_df = sf_df.na.drop()
sf_df = sf_df.drop("USER_ID", "JOB_STARTDATE", "JOB_ENDDATE", "SCHOOL_ENDDATE")
df = sf_df.to_pandas()

In [25]:
# test_df=df.tail(100)
train_df, test_df = sf_df.random_split([0.8,0.2], seed=25)

### Inferencing a deployed model

In [40]:
result_sdf = model_ref.predict(deployment_name="ChurnPrediction_01_UDF", data=test_df)
#result_sdf.rename(F.col('"output_feature_0"'),"PREDICTED_PRICE").show()
# result_sdf.show()



In [41]:
result_sdf.to_pandas()


Unnamed: 0,CHURN,MAPPED_ROLE_CLEAN,SEX,ETHNICITY,HOSPITAL_TYPE,HOSPITAL_OWNERSHIP,COMPANY_NAME,CITY_STATE,DISTANCE,DEGREE_CLEAN,SALARY,SENIORITY,TENURE_MONTHS,MONTHS_AFTER_COLLEGE,BIRTH_YEAR,OVERTIME_HOURS,PREDICTION
0,0,1.0,0.0,3.0,1.0,9.0,4.0,14.0,3.0,1.0,0.323021,0.00,0.024221,0.144397,0.711864,0.296296,0
1,0,1.0,0.0,2.0,1.0,9.0,3.0,12.0,2.0,1.0,0.402295,0.25,0.017301,0.161638,0.694915,0.444444,0
2,1,1.0,1.0,3.0,1.0,9.0,4.0,14.0,1.0,1.0,0.364635,0.00,0.025952,0.075431,0.762712,0.296296,1
3,1,0.0,0.0,3.0,1.0,9.0,4.0,14.0,2.0,5.0,0.205920,0.00,0.022491,0.064655,0.694915,0.111111,0
4,1,1.0,0.0,3.0,1.0,7.0,8.0,68.0,3.0,3.0,0.450423,0.50,0.017301,0.247845,0.694915,0.000000,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
801,1,1.0,1.0,3.0,3.0,9.0,1.0,56.0,2.0,3.0,0.183309,0.00,0.010381,0.034483,0.830508,0.407407,0
802,1,2.0,1.0,0.0,1.0,9.0,5.0,60.0,3.0,3.0,0.171754,0.00,0.005190,0.002155,0.881356,0.074074,1
803,1,1.0,0.0,3.0,1.0,9.0,4.0,14.0,1.0,1.0,0.323021,0.00,0.024221,0.144397,0.711864,0.333333,1
804,1,1.0,0.0,3.0,1.0,9.0,8.0,33.0,1.0,1.0,0.392205,0.50,0.022491,0.178879,0.627119,0.481481,1


In [2]:
!pip install snowflake-ml-python snowflake-connector-python

Collecting snowflake-ml-python
[?25l  Downloading https://files.pythonhosted.org/packages/51/72/e84a68e53a1cad8e9549ea8db0028bb4b8c84ca0a884c0bfadd0619cf76c/snowflake_ml_python-1.1.1-py3-none-any.whl (1.7MB)
[K     |████████████████████████████████| 1.7MB 9.9MB/s eta 0:00:01
[?25hCollecting snowflake-connector-python
  Using cached https://files.pythonhosted.org/packages/45/c9/b15f5a5652fa52dbb9a15c86a8d117d52a3d4630626f6a1bbf7e2675583f/snowflake_connector_python-3.6.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Collecting scipy<2,>=1.9
  Using cached https://files.pythonhosted.org/packages/69/f0/fb07a9548e48b687b8bf2fa81d71aba9cfc548d365046ca1c791e24db99d/scipy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Collecting anyio<4,>=3.5.0
[?25l  Downloading https://files.pythonhosted.org/packages/19/24/44299477fe7dcc9cb58d0a57d5a7588d6af2ff403fdd2d47a246c91a3246/anyio-3.7.1-py3-none-any.whl (80kB)
[K     |████████████████████████████████| 81kB 1.3MB/s s e

Collecting joblib>=1.1.1
  Using cached https://files.pythonhosted.org/packages/10/40/d551139c85db202f1f384ba8bcf96aca2f329440a844f924c8a0040b6d02/joblib-1.3.2-py3-none-any.whl
Collecting aiohttp!=4.0.0a0,!=4.0.0a1; extra == "http"
  Using cached https://files.pythonhosted.org/packages/25/19/91b9f2b737c9117cf8f622ee02a84788ac6fcda0aa4b736dad1321c6dd93/aiohttp-3.9.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Collecting zipp>=3.1.0; python_version < "3.10"
  Downloading https://files.pythonhosted.org/packages/d9/66/48866fc6b158c81cc2bfecc04c480f105c6040e8b077bc54c634b4a67926/zipp-3.17.0-py3-none-any.whl
Collecting aiobotocore<3.0.0,>=2.5.4
[?25l  Downloading https://files.pythonhosted.org/packages/f9/5e/ca5fd1c417f6a1c8cde8519611d82faedb368d7e3684e3a6069c95721bdf/aiobotocore-2.8.0-py3-none-any.whl (75kB)
[K     |████████████████████████████████| 81kB 6.3MB/s  eta 0:00:01
[?25hCollecting wheel
  Using cached https://files.pythonhosted.org/packages/c7/c3/55076fc728723ef927

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
