In [0]:
import pandas as pd
import numpy as np

import snowflake.snowpark as snp
from snowflake.snowpark import types as T
from snowflake.snowpark.types import *
from snowflake.snowpark.session import Session
from snowflake.snowpark import Window


from snowflake.snowpark.functions import ceil, col, pandas_udf
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
import onnx

## Bring data from Snowflake using Snowpark

In [0]:

sfUser = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-USERNAME")
sfPassword = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-PASSWORD")
sfAccount = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-ACCOUNT")
sfDatabase = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-DB")
sfSchema = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-SCHEMA")
sfTrainingWarehouse = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-TRAINING-WAREHOUSE")
sfRole = dbutils.secrets.get("snowflake", "SNOW-AZURE-STROKE-ROLE")

In [0]:
CONNECTION_PARAMETERS = {
    'account': sfAccount,
    'user': sfUser,
    'password': sfPassword,
    'schema': sfSchema,
    'database': sfDatabase,
    'warehouse': sfTrainingWarehouse,
    'role': sfRole
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

## Check the data

In [0]:
raw_table_name = 'STROKE'
stroke_snowdf = session.table(raw_table_name)
stroke_snowdf.limit(10).to_pandas()

Unnamed: 0,ID,GENDER,AGE,HYPERTENSION,HEART_DISEASE,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,AVG_GLUCOSE_LEVEL,BMI,SMOKING_STATUS,STROKE
0,30669,Male,3,0,0,No,children,Rural,95.12,18.0,,0
1,30468,Male,58,1,0,Yes,Private,Urban,87.96,39.2,never smoked,0
2,16523,Female,8,0,0,No,Private,Urban,110.89,17.6,,0
3,56543,Female,70,0,0,Yes,Private,Rural,69.04,35.9,formerly smoked,0
4,46136,Male,14,0,0,No,Never_worked,Rural,161.28,19.1,,0
5,32257,Female,47,0,0,Yes,Private,Urban,210.95,50.1,,0
6,52800,Female,52,0,0,Yes,Private,Urban,77.59,17.7,formerly smoked,0
7,41413,Female,75,0,1,Yes,Self-employed,Rural,243.53,27.0,never smoked,0
8,15266,Female,32,0,0,Yes,Private,Rural,77.67,32.3,smokes,0
9,28674,Female,74,1,0,Yes,Self-employed,Urban,205.84,54.6,never smoked,0


In [0]:
stroke_snowdf.filter(col("STROKE")==1).limit(10).to_pandas()

Unnamed: 0,ID,GENDER,AGE,HYPERTENSION,HEART_DISEASE,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,AVG_GLUCOSE_LEVEL,BMI,SMOKING_STATUS,STROKE
0,9046,Male,67,0,1,Yes,Private,Urban,228.69,36.6,formerly smoked,1
1,51676,Female,61,0,0,Yes,Self-employed,Rural,202.21,,never smoked,1
2,31112,Male,80,0,1,Yes,Private,Rural,105.92,32.5,never smoked,1
3,60182,Female,49,0,0,Yes,Private,Urban,171.23,34.4,smokes,1
4,1665,Female,79,1,0,Yes,Self-employed,Rural,174.12,24.0,never smoked,1
5,56669,Male,81,0,0,Yes,Private,Urban,186.21,29.0,formerly smoked,1
6,53882,Male,74,1,1,Yes,Private,Rural,70.09,27.4,never smoked,1
7,10434,Female,69,0,0,No,Private,Urban,94.39,22.8,never smoked,1
8,27419,Female,59,0,0,Yes,Private,Rural,76.15,,,1
9,60491,Female,78,0,0,Yes,Private,Urban,58.57,24.2,,1


In [0]:
stroke_snowdf.schema

## Step1: Push-Down preprocessing/feature engineering with Snowpark

#### Lets convert gender, ever_married, work_type, residence_type and smoking_status into label encoded columns

In [0]:
dbutils.fs.ls("/snowpark/preprocessing/")
import sys
sys.path.append('/dbfs/snowpark/')

Lets perform label encoding

In [0]:
import preprocessing as pp
le_gender = pp.LabelEncoder(input_col="GENDER", output_col="GENDER_ENCODED")
le_ever_married = pp.LabelEncoder(input_col="EVER_MARRIED", output_col="EVER_MARRIED_ENCODED")
le_work_type = pp.LabelEncoder(input_col="WORK_TYPE", output_col="WORK_TYPE_ENCODED")
le_residence_type = pp.LabelEncoder(input_col="RESIDENCE_TYPE", output_col="RESIDENCE_TYPE_ENCODED")
le_smoking_status = pp.LabelEncoder(input_col="SMOKING_STATUS", output_col="SMOKING_STATUS_ENCODED")

le_gender.fit(stroke_snowdf)
stroke_snowdf_labelencoded = le_gender.transform(stroke_snowdf)
le_ever_married.fit(stroke_snowdf_labelencoded)
stroke_snowdf_labelencoded = le_ever_married.transform(stroke_snowdf_labelencoded)
le_work_type.fit(stroke_snowdf_labelencoded)
stroke_snowdf_labelencoded = le_work_type.transform(stroke_snowdf_labelencoded)
le_residence_type.fit(stroke_snowdf_labelencoded)
stroke_snowdf_labelencoded = le_residence_type.transform(stroke_snowdf_labelencoded)
le_smoking_status.fit(stroke_snowdf_labelencoded)
stroke_snowdf_labelencoded = le_smoking_status.transform(stroke_snowdf_labelencoded)

columns_excluded = ['ID', 'GENDER', 'EVER_MARRIED', 'WORK_TYPE', 'RESIDENCE_TYPE', 'SMOKING_STATUS']
included_columns = [c for c in stroke_snowdf_labelencoded.columns if c not in columns_excluded]

stroke_snowdf_labelencoded = stroke_snowdf_labelencoded.select(*included_columns) \
                            .with_column_renamed(col("GENDER_ENCODED"), "GENDER") \
                            .with_column_renamed(col("EVER_MARRIED_ENCODED"), "EVER_MARRIED")\
                            .with_column_renamed(col("WORK_TYPE_ENCODED"), "WORK_TYPE")\
                            .with_column_renamed(col("RESIDENCE_TYPE_ENCODED"), "RESIDENCE_TYPE")\
                            .with_column_renamed(col("SMOKING_STATUS_ENCODED"), "SMOKING_STATUS")
stroke_snowdf_labelencoded.limit(10).to_pandas()

Unnamed: 0,AGE,HYPERTENSION,HEART_DISEASE,AVG_GLUCOSE_LEVEL,BMI,STROKE,GENDER,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,SMOKING_STATUS
0,3,0,0,95.12,18.0,0,1,0,4,0,
1,58,1,0,87.96,39.2,0,1,1,2,1,1.0
2,8,0,0,110.89,17.6,0,0,0,2,1,
3,70,0,0,69.04,35.9,0,0,1,2,0,0.0
4,14,0,0,161.28,19.1,0,1,0,1,0,
5,47,0,0,210.95,50.1,0,0,1,2,1,
6,52,0,0,77.59,17.7,0,0,1,2,1,0.0
7,75,0,1,243.53,27.0,0,0,1,3,0,1.0
8,32,0,0,77.67,32.3,0,0,1,2,0,2.0
9,74,1,0,205.84,54.6,0,0,1,3,1,1.0


In [0]:
stroke_snowdf_labelencoded.count()

In [0]:
stroke_snowdf_processed = stroke_snowdf_labelencoded.filter(col('age')>2)
stroke_snowdf_processed.count()

#### Missing Data Management with Snowpark

In [0]:
stroke_missing_removed_snowdf = stroke_snowdf_processed.na.drop()
stroke_missing_removed_snowdf.count()

In [0]:
stroke_missing_removed_snowdf = stroke_missing_removed_snowdf.with_column('BMI', col('BMI').cast('float'))
stroke_missing_removed_snowdf.describe(['BMI']).limit(10).to_pandas()

Unnamed: 0,SUMMARY,BMI
0,count,29072.0
1,mean,30.054166
2,stddev,7.193908
3,min,10.1
4,max,92.0


In [0]:
stroke_missing_removed_snowdf.count()

In [0]:
snow_features_original_table_name = 'stroke_features'
featureColumns = ['GENDER','AGE', 'HYPERTENSION', 'HEART_DISEASE', 'EVER_MARRIED', 'WORK_TYPE', 'RESIDENCE_TYPE', 'AVG_GLUCOSE_LEVEL', 'BMI', 'SMOKING_STATUS', 'STROKE']
stroke_missing_removed_snowdf.select(*featureColumns).write.save_as_table(snow_features_original_table_name, mode="overwrite")
stroke_snowdf = session.table(snow_features_original_table_name)
stroke_snowdf.limit(10).to_pandas()

Unnamed: 0,GENDER,AGE,HYPERTENSION,HEART_DISEASE,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,AVG_GLUCOSE_LEVEL,BMI,SMOKING_STATUS,STROKE
0,1,58,1,0,1,2,1,87.96,39.2,1,0
1,0,70,0,0,1,2,0,69.04,35.9,0,0
2,0,52,0,0,1,2,1,77.59,17.7,0,0
3,0,75,0,1,1,3,0,243.53,27.0,1,0
4,0,32,0,0,1,2,0,77.67,32.3,2,0
5,0,74,1,0,1,3,1,205.84,54.6,1,0
6,1,79,0,1,1,2,1,57.08,22.0,0,0
7,0,37,0,0,1,2,0,162.96,39.4,1,0
8,0,37,0,0,1,2,0,73.5,26.1,0,0
9,0,40,0,0,1,2,0,95.04,42.4,1,0


#### Synthetic Minority Oversampling Technique (SMOTE) with Snowpark Stored Procedure

In [0]:
# We have highly imbalanced data
stroke_snowdf.group_by('stroke').count().to_pandas()

Unnamed: 0,STROKE,COUNT
0,0,28524
1,1,548


In [0]:
session.sql('CREATE OR REPLACE STAGE ML_PROCS').collect()
pd.DataFrame(session.sql('SHOW STAGES').collect())

Unnamed: 0,created_on,name,database_name,schema_name,url,has_credentials,has_encryption_key,owner,comment,region,type,cloud,notification_channel,storage_integration
0,2022-11-01 10:02:04.562000-07:00,DATA_STAGE,[REDACTED],[REDACTED],,N,N,[REDACTED],,,INTERNAL,,,
1,2023-01-30 20:50:51.167000-08:00,FUNCTIONS,[REDACTED],[REDACTED],,N,N,[REDACTED],,,INTERNAL,,,
2,2023-01-31 18:04:05.065000-08:00,ML_PROCS,[REDACTED],[REDACTED],,N,N,[REDACTED],,,INTERNAL,,,


In [0]:
# This local Python-function will be registered as a Stored Procedure and runs in Snowflake

def sproc_oversample_smote(session: Session, 
                           training_table: str, 
                           feature_cols: list,
                           target_col: str,
                           target_table: str) -> T.Variant:
    
    # Loading data into pandas dataframe
    local_training_data = session.table(training_table).to_pandas()

    # Define features and label
    X = local_training_data[feature_cols]
    y = local_training_data[target_col]
    
    # Oversample minority class via SMOTE
    from imblearn.over_sampling import SMOTE
    X_balance, y_balance = SMOTE().fit_resample(X,y)
    
    # Combine return values into single pandas dataframe
    X_balance[target_col] = y_balance
    
    # Persist dataframe in Snowflake table
    session.sql('DROP TABLE IF EXISTS {}'.format(target_table)).collect()
    session.write_pandas(X_balance, table_name=target_table, auto_create_table=True)
    
    # Getting model coefficients
    return "Successfully oversampled"

In [0]:
# Get the path of imblearn package
import imblearn
imblearn_path = imblearn.__path__[0]
print(imblearn_path)

In [0]:
# Registering the function as a Stored Procedure
sproc_oversample_smote = session.sproc.register(func=sproc_oversample_smote, 
                                                name='sproc_oversample_smote', 
                                                is_permanent=True, 
                                                replace=True,
                                                stage_location='@ML_PROCS',
                                                packages=['snowflake-snowpark-python','scikit-learn==1.1.1'],
                                                imports=[imblearn_path])

In [0]:
##Switch to high memory warehouse for Snowpark Stored procedure using session.use_warehouse(HIGH_MEM_WH) which provides around 230GB of RAM to do in memory computation

In [0]:
training_table = snow_features_original_table_name
# get feature columns
feature_cols = stroke_snowdf.columns
feature_cols.remove('STROKE')
target_col = 'STROKE'
target_table = 'STROKE_RESAMPLED_FEATURES'

sproc_oversample_smote(training_table, 
                       feature_cols, 
                       target_col,
                       target_table, 
                       session=session)

In [0]:
##Move back to standward warehouse in snowflake using session.use_warehouse(STANDARD_WH)

#### Preprocessing complete: Checking the Resampled Data

In [0]:
# Now our training data is balanced
rebalanced_snow_sdf = session.table(target_table)
rebalanced_snow_sdf.limit(10).to_pandas()

Unnamed: 0,GENDER,AGE,HYPERTENSION,HEART_DISEASE,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,AVG_GLUCOSE_LEVEL,BMI,SMOKING_STATUS,STROKE
0,1,58,1,0,1,2,1,87.96,39.2,1,0
1,0,70,0,0,1,2,0,69.04,35.9,0,0
2,0,52,0,0,1,2,1,77.59,17.7,0,0
3,0,75,0,1,1,3,0,243.53,27.0,1,0
4,0,32,0,0,1,2,0,77.67,32.3,2,0
5,0,74,1,0,1,3,1,205.84,54.6,1,0
6,1,79,0,1,1,2,1,57.08,22.0,0,0
7,0,37,0,0,1,2,0,162.96,39.4,1,0
8,0,37,0,0,1,2,0,73.5,26.1,0,0
9,0,40,0,0,1,2,0,95.04,42.4,1,0


In [0]:
# We have highly imbalanced data
rebalanced_snow_sdf.group_by('stroke').count().to_pandas()

Unnamed: 0,STROKE,COUNT
0,0,28524
1,1,28524


In [0]:
weights = [0.8, 0.2]
rebalanced_snow_sdf_parts = rebalanced_snow_sdf.random_split(weights)
training_snowdf = rebalanced_snow_sdf_parts[0]
testing_snowdf = rebalanced_snow_sdf_parts[1]
training_table_name = "ONNX_TRAIN_STROKE"
testing_table_name = "ONNX_TEST_STROKE"
training_snowdf.write.save_as_table(training_table_name, mode="overwrite")
testing_snowdf.write.save_as_table(testing_table_name, mode="overwrite")

## Step2: Model Training with Pyspark (SynapseML LightGBMClassifier)

In [0]:
target_col = 'STROKE'
feature_cols = training_snowdf.drop(target_col).columns
featurizer = VectorAssembler(inputCols = feature_cols, outputCol = "features")
training_sparkdf = featurizer.transform(spark.createDataFrame(training_snowdf.to_pandas()))["STROKE", "features"]

In [0]:
display(training_sparkdf.head(10))

STROKE,features
0,"Map(vectorType -> dense, length -> 10, values -> List(1.0, 58.0, 1.0, 0.0, 1.0, 2.0, 1.0, 87.96, 39.2, 1.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 52.0, 0.0, 0.0, 1.0, 2.0, 1.0, 77.59, 17.7, 0.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 32.0, 0.0, 0.0, 1.0, 2.0, 0.0, 77.67, 32.3, 2.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 74.0, 1.0, 0.0, 1.0, 3.0, 1.0, 205.84, 54.6, 1.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(1.0, 79.0, 0.0, 1.0, 1.0, 2.0, 1.0, 57.08, 22.0, 0.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 37.0, 0.0, 0.0, 1.0, 2.0, 0.0, 162.96, 39.4, 1.0))"
0,"Map(vectorType -> sparse, length -> 10, indices -> List(1, 4, 5, 7, 8), values -> List(37.0, 1.0, 2.0, 73.5, 26.1))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 40.0, 0.0, 0.0, 1.0, 2.0, 0.0, 95.04, 42.4, 1.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(1.0, 35.0, 0.0, 0.0, 0.0, 2.0, 0.0, 85.37, 33.0, 1.0))"
0,"Map(vectorType -> dense, length -> 10, values -> List(0.0, 20.0, 0.0, 0.0, 0.0, 2.0, 1.0, 84.62, 19.7, 2.0))"


#### Use LightGBM to train a model

In [0]:
model = (
    LightGBMClassifier(featuresCol="features", labelCol="STROKE")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(training_sparkdf)

## Step 3: Export to ONNX model

In [0]:
from synapse.ml.core.platform import running_on_binder

if running_on_binder():
    !pip install lightgbm==3.2.1
    from IPython import get_ipython
import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

#### Write the model to databricks driver for snowpark to pick it up

In [0]:
with open("pipeline_lightgbm.onnx", "wb") as f:
    f.write(model_payload_ml)

## Step 4: Snowpark Vectorized UDF that brings in ONNX model (to perform distributed batch predictions)

In [0]:
# Create a Snowflake stage to store functions
session.sql('CREATE OR REPLACE STAGE FUNCTIONS').collect()

In [0]:
from cachetools import cached

@cached(cache={})
def load_model(model_path: str) -> object:
    # Load ONNX model
    import onnxruntime as rt
    # Adjust session options (use only 1 Thread for best performance)
    opts = rt.SessionOptions()
    opts.intra_op_num_threads = 1
    providers = ['CPUExecutionProvider']
    model = rt.InferenceSession(model_path, providers=providers, sess_options=opts)
    return model

@pandas_udf(name='SCORE_ONNX_MODEL',
            stage_location='@FUNCTIONS',
            input_types=[T.FloatType()]*10, 
            return_type = T.IntegerType(), 
            replace=True, 
            is_permanent=True, 
            imports=['pipeline_lightgbm.onnx'],
            packages=['onnxruntime','cachetools','numpy'], 
            max_batch_size=10000, 
            session=session)
def score_onnx_model(df: pd.DataFrame) -> pd.Series:
    import sys
    import numpy as np
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = '/pipeline_lightgbm.onnx'
    model = load_model(import_dir+model_name)
    return model.run(None, {"input": df.values.astype(np.float32)})[0]

## Step 5: Native ONNX predictions within Snowflake

In [0]:
testing_snowdf = session.table(testing_table_name)
target_col = 'STROKE'
feature_cols =  testing_snowdf.drop(target_col).columns
snowpark_df_scored = testing_snowdf.with_column('PREDICTED_STROKE', score_onnx_model(*feature_cols))

In [0]:
snowpark_df_scored.limit(10).to_pandas()

Unnamed: 0,GENDER,AGE,HYPERTENSION,HEART_DISEASE,EVER_MARRIED,WORK_TYPE,RESIDENCE_TYPE,AVG_GLUCOSE_LEVEL,BMI,SMOKING_STATUS,STROKE,PREDICTED_STROKE
0,0,70,0,0,1,2,0,69.04,35.9,0,0,0
1,0,75,0,1,1,3,0,243.53,27.0,1,0,0
2,0,42,0,0,1,2,0,82.67,22.5,1,0,0
3,1,71,0,0,1,2,1,198.21,27.3,0,0,0
4,1,67,0,0,1,2,1,190.7,36.0,0,0,0
5,0,66,0,0,1,2,0,141.24,28.5,1,0,0
6,0,30,0,0,1,2,1,61.45,36.7,2,0,0
7,0,37,1,0,1,3,0,127.71,36.0,1,0,0
8,0,52,0,0,1,2,1,82.24,54.7,0,0,0
9,0,37,0,0,1,2,0,75.18,48.2,0,0,0


In [0]:
results_table_name = "ONNX_PREDICTIONS_STROKE"
snowpark_df_scored.write.save_as_table(results_table_name, mode="overwrite")

#### Calculate classification metrics with Snowpark Stored Procedure

In [0]:
# This local Python-function will be registered as a Stored Procedure and runs in Snowflake

def sproc_eval_metrics(session: Session, 
                           results_table: str, 
                           actual_col_name: str,
                           pred_col_name: str) -> T.Variant:
    
    from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
    # Loading data into pandas dataframe
    results_pdf = session.table(results_table).to_pandas()
    actual = results_pdf[actual_col_name].values
    pred = results_pdf[pred_col_name].values
    accuracy = accuracy_score(actual, pred)
    f1 = f1_score(actual, pred)
    precision = precision_score(actual, pred)
    recall = recall_score(actual, pred)
    
    # Getting model coefficients
    return {"accuracy": accuracy, "f1": f1, "precision": precision, "recall": recall}

# Registering the function as a Stored Procedure
sproc_eval_metrics = session.sproc.register(func=sproc_eval_metrics, 
                                                name='sproc_eval_metrics', 
                                                is_permanent=True, 
                                                replace=True,
                                                stage_location='@ML_PROCS',
                                                packages=['snowflake-snowpark-python','scikit-learn==1.1.1'],
                                                imports=[imblearn_path])


In [0]:
##Switch to high memory warehouse for Snowpark Stored procedure using session.use_warehouse(HIGH_MEM_WH) which provides around 230GB of RAM to do in memory computation

In [0]:
actual_target_col_name = "STROKE"
predicted_target_col_name = "PREDICTED_STROKE"

sproc_eval_metrics(results_table_name, 
                       actual_target_col_name, 
                       predicted_target_col_name,
                       session=session)

In [0]:
##Move back to standward warehouse in snowflake using session.use_warehouse(STANDARD_WH)

In [0]:
session.close()