In this file, I use Snowpark to load the full insurance dataset csv(1M rows) into a snowflake table 

In [3]:
from snowflake.snowpark import Session
import json
import pandas as pd

# Initiate session
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))


Connection Established with the following parameters:
User                        : COCONUT457
Role                        : "ACCOUNTADMIN"
Database                    : "INSURANCE"
Schema                      : "ML_PIPE"
Warehouse                   : "COMPUTE_WH"


In [10]:
#!kaggle datasets download -d sridharstreaks/insurance-data-for-machine-learning --unzip

Downloading insurance-data-for-machine-learning.zip to c:\Users\txsmi\Documents\Local Programming\Snowflake\Snowflake-ML-Pipeline




  0%|          | 0.00/21.3M [00:00<?, ?B/s]
  5%|▍         | 1.00M/21.3M [00:00<00:06, 3.20MB/s]
  9%|▉         | 2.00M/21.3M [00:00<00:05, 3.87MB/s]
 24%|██▎       | 5.00M/21.3M [00:00<00:01, 8.71MB/s]
 28%|██▊       | 6.00M/21.3M [00:00<00:01, 8.59MB/s]
 33%|███▎      | 7.00M/21.3M [00:01<00:01, 8.16MB/s]
 42%|████▏     | 9.00M/21.3M [00:01<00:01, 9.06MB/s]
 52%|█████▏    | 11.0M/21.3M [00:01<00:01, 9.85MB/s]
 61%|██████    | 13.0M/21.3M [00:01<00:00, 10.2MB/s]
 71%|███████   | 15.0M/21.3M [00:01<00:00, 10.7MB/s]
 80%|███████▉  | 17.0M/21.3M [00:01<00:00, 10.4MB/s]
 85%|████████▍ | 18.0M/21.3M [00:02<00:00, 10.2MB/s]
 94%|█████████▍| 20.0M/21.3M [00:02<00:00, 10.3MB/s]
 99%|█████████▉| 21.0M/21.3M [00:02<00:00, 10.1MB/s]
100%|██████████| 21.3M/21.3M [00:02<00:00, 9.21MB/s]


In [None]:
# Load full 1M dataset into dataframe
insurance_df = pd.read_csv('insurance_dataset.csv')

Use the write_pandas() method to write the first 10k rows into the 'SOURCE_OF_TRUTH' table created with the SQL commands in the SQL file. The method "returns a Snowpark DataFrame object referring to the table where the pandas DataFrame was written to." (Snowpark Documentation)

In [23]:
# Capitalize column names
insurance_df.columns = insurance_df.columns.str.upper()

# Rearrange columns to fit target schema
cols = insurance_df.columns.tolist()
cols = cols[:3] + cols[-1:] + cols[3:-1]

insurance_df = insurance_df[cols]

source_of_truth_df = session.write_pandas(insurance_df[:10000], table_name='SOURCE_OF_TRUTH',auto_create_table=True)

The code below writes the remaining 990k to the INCOMING_DATA_SOURCE table (created automatically) to simulate data being streamed in

In [24]:
incoming_data_source_df = session.write_pandas(insurance_df[10000:], table_name='INCOMING_DATA_SOURCE',auto_create_table=True)

  success, nchunks, nrows, ci_output = write_pandas(


In [None]:
create or replace stage ML_PIPE_STAGE;

In [None]:
import snowflake.snowpark
from snowflake.snowpark.dataframe import col as column
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import sproc
import snowflake.snowpark.types as T

import json
import pandas as pd
import numpy as np

# Snowpark ML
from snowflake.ml._internal.utils import identifier
from snowflake.ml.registry import registry

from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.ml.modeling.preprocessing as snowmlpp
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error, mean_squared_error


@sproc(name='predict_write_to_gold', stage_location='@ML_PIPE_STAGE', is_permanent=True, replace=True,packages=["snowflake-snowpark-python",'snowflake-ml-python', 'xgboost','pandas'])
def predict_write_to_gold(session: Session) -> str:
        try:    
                df = session.table('STREAM_ON_LANDING').filter(column('METADATA$ACTION') == 'INSERT')

        except Exception as e:
                return (f'Error with reading from stream: {e}')

        # Standardize values
        try:
                # Define Snowflake categorical types and determine which columns to OHE
                categorical_types = [T.StringType]
                cols_to_ohe = [col.name for col in df.schema.fields if (type(col.datatype) in categorical_types)]
                ohe_cols_output = [col + '_OHE' for col in cols_to_ohe]

                def fix_values(columnn):
                        return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))
                
                for col in cols_to_ohe:
                        df = df.na.fill('NONE', subset=col)
                        df = df.withColumn(col, fix_values(col))

        except Exception as e:
                return (f'Error standardizing values {e}')

        # Create model registry and load the default pipeline 
        try:
                model_registry = registry.Registry(session=session, database_name=session.get_current_database(), schema_name='ML_PIPE')
                model_version = model_registry.get_model('INSURANCE_CHARGES_PREDICTION').default

        except Exception as e:
                return (f'Error with creating model registry object: {e}')


        # Run the pipeline
        try:
                results = model_version.run(df,function_name = 'predict')

        except Exception as e:
                return (f'Error with running model: {e}')

        # Load the results into the gold table
        try:
                count = results.count()

                cols_to_update = {col: results[col] for col in session.table('INSURANCE_GOLD').columns if 'METADATA_UPDATED_AT' not in col}
                metadata_col_to_update = {'METADATA_UPDATED_AT': F.current_timestamp()}
                updates = {**cols_to_update, **metadata_col_to_update}
                target = session.table('INSURANCE_GOLD')
                merge_results = target.merge(results,target['METADATA$ROW_ID'] == results['METADATA$ROW_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])
                

                return (f'{merge_results.rows_inserted} record(s) inserted, {merge_results.rows_updated} record(s) updated in the INSURANCE_GOLD table')

        except Exception as e:
                return (f'Error with writing results to gold table: {e}')


In [None]:
# Before running, make sure @ML_PIPE_STAGE exists (SQL File)
# Create the sproc that creates and fits the pipeline based on the table passed
@sproc(name='train_save_ins_model', stage_location='@ML_PIPE_STAGE', is_permanent=True, replace=True, packages=["snowflake-snowpark-python",'snowflake-ml-python', 'xgboost','pandas'])
def train_save_ins_model(session: Session, source_of_truth: str, major_version: bool = True) -> str:

    # Access the data from the source of truth table
    try:
        df = session.table(source_of_truth).limit(1000)

    except Exception as e:
        return (f'Error with getting table data: {e}')

    # Define label and feature columns
    LABEL_COLUMNS = ['CHARGES']
    FEATURE_COLUMN_NAMES = [i for i in df.schema.names if i not in LABEL_COLUMNS]
    OUTPUT_COLUMNS = ['PREDICTED_CHARGES']

    # Define Snowflake numeric types (possibly for scaling, ordinal encoding)
    # numeric_types = [T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType, T.LongType]
    # numeric_columns = [col.name for col in df.schema.fields if (type(col.datatype) in numeric_types) and (col.name in FEATURE_COLUMN_NAMES)]

    # Define Snowflake categorical types and determine which columns to OHE
    categorical_types = [T.StringType]
    cols_to_ohe = [col.name for col in df.schema.fields if (type(col.datatype) in categorical_types)]
    ohe_cols_output = [col + '_OHE' for col in cols_to_ohe]


    # Standardize the values in the rows by removing spaces, capitalizing
    def fix_values(columnn):
            return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))

    try:
        for col in cols_to_ohe:
                df = df.na.fill('NONE', subset=col)
                df = df.withColumn(col, fix_values(col))

    except Exception as e:
        return (f'Error with standardizing values: {e}')

    # Define the pipeline
    try:
        pipe = Pipeline(
            steps=[
                #('imputer', SimpleImputer(input_cols=all_cols)),
                #('mms', snowmlpp.MinMaxScaler(input_cols=cols_to_scale, output_cols=scale_cols_output)),
                ('ohe', snowmlpp.OneHotEncoder(input_cols=cols_to_ohe, output_cols=ohe_cols_output, drop_input_cols=True)),
                ('grid_search_reg', GridSearchCV(estimator=XGBRegressor(),
                                                    param_grid={ "n_estimators":[50], # 25
                                                                "learning_rate":[0.4], # .5
                                                                },
                                                    n_jobs = -1,
                                                    scoring="neg_mean_absolute_percentage_error",
                                                    input_cols=FEATURE_COLUMN_NAMES.append(ohe_cols_output),
                                                    label_cols=LABEL_COLUMNS,
                                                    output_cols=OUTPUT_COLUMNS
                                                    )
                )
            ]      
        )

    except Exception as e:
        return (f'Error with defining the pipeline: {e}')


    # Split the data into training and testing
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


    # Fit the pipeline
    try:
        pipe.fit(train_df)

    except Exception as e:
        return (f'Error with fitting pipeline: {e}')


    # Predict with the pipeline
    try:
        results = pipe.predict(test_df)

    except Exception as e:
        return (f'Error with predicting with pipeline: {e}')


    # Use Snowpark ML metrics to calculate MAPE and MSE

    # Calculate MAPE
    mape = mean_absolute_percentage_error(df=results, y_true_col_names=LABEL_COLUMNS, y_pred_col_names=OUTPUT_COLUMNS)

    # Calculate MSE
    mse = mean_squared_error(df=results, y_true_col_names=LABEL_COLUMNS, y_pred_col_names=OUTPUT_COLUMNS)

    def set_model_version(registry_object,model_name, major_version=True):
        # See what we've logged so far, dynamically set the model version
        import numpy as np
        import json
        
        model_list = registry_object.show_models()
        
        if len(model_list) == 0:
            return 'V1'
        
        model_list_filter = model_list[model_list['name'] ==  model_name]

        if len(model_list_filter) == 0:
            return 'V1'

        version_list_string = model_list_filter['versions'].iloc[0]
        version_list = json.loads(version_list_string)
        version_numbers = [float(s.replace('V', '')) for s in version_list]
        model_last_version = max(version_numbers)
        
        
        if np.isnan(model_last_version) == True:
            model_new_version = 'V1'

        elif np.isnan(model_last_version) == False and major_version == True:
            model_new_version = round(model_last_version + 1,2)
            model_new_version = 'V' + str(model_new_version)
            
        else:
            model_new_version = round(model_last_version + .1,2)
            model_new_version = 'V' + str(model_new_version)
            
        return model_new_version # This is the version we will use when we log the new model.

    # Create model regisry object
    try:
        model_registry = registry.Registry(session=session)

    except Exception as e:
        return (f'Error with creating model registry object: {e}')
    
    # Save model to registry
    try:
        LABEL_COLUMNS = ['CHARGES']
        FEATURE_COLUMN_NAMES = [i for i in df.schema.names if i not in LABEL_COLUMNS]
        X = train_df.select(FEATURE_COLUMN_NAMES).limit(100)

        model_name = 'INSURANCE_CHARGES_PREDICTION'
        version_name = set_model_version(model_registry, model_name, major_version=major_version)
        model_version = model_registry.log_model(
            model = pipe, 
            model_name = model_name, 
            version_name= f'"{version_name}"',
            sample_input_data=X,
            conda_dependencies=['snowflake-snowpark-python','snowflake-ml-python','scikit-learn', 'xgboost']
            )

        model_version.set_metric(metric_name='mean_abs_pct_err', value=mape)
        model_version.set_metric(metric_name='mean_sq_err', value=mse)
    
    except Exception as e:
        return (f'Error with saving model to registry: {e}')
    
    try:
        session.sql(f'alter model INSURANCE_CHARGES_PREDICTION set default_version = "{version_name}";')
    
    except Exception as e:
        return (f'Error with setting default version: {e}')

    return f'Model {model_name} has been logged with version {version_name} and has a MAPE of {mape} and MSE of {mse}'



In [None]:

-- Create the task that calls the training sproc
CREATE or REPLACE TASK TRAIN_SAVE_TASK
  WAREHOUSE = WH
  SCHEDULE = '5 MINUTE' --
  AS
    CALL train_save_ins_model('SOURCE_OF_TRUTH',FALSE);

-- Tasks are created in a suspended state. Resume it
ALTER TASK TRAIN_SAVE_TASK RESUME;

-- Execute immediately so that you have a trained model in registry
EXECUTE TASK TRAIN_SAVE_TASK;

-- Create the landing table (where streamed-in records could land)
CREATE or REPLACE TABLE LANDING_TABLE (
	AGE NUMBER(38,0),
	GENDER VARCHAR(16777216),
	BMI FLOAT,
	CHARGES FLOAT,
	CHILDREN NUMBER(38,0),
	SMOKER VARCHAR(16777216),
	REGION VARCHAR(16777216),
	MEDICAL_HISTORY VARCHAR(16777216),
	FAMILY_MEDICAL_HISTORY VARCHAR(16777216),
	EXERCISE_FREQUENCY VARCHAR(16777216),
	OCCUPATION VARCHAR(16777216),
	COVERAGE_LEVEL VARCHAR(16777216)
);

-- Create the stream on the landing table
CREATE OR REPLACE STREAM STREAM_ON_LANDING ON TABLE LANDING_TABLE;

-- Create a gold table for the records and their predictions to land
CREATE OR REPLACE TABLE INSURANCE_GOLD(
    AGE NUMBER(38,0),
	GENDER VARCHAR(16777216),
	BMI FLOAT,
	CHILDREN NUMBER(38,0),
	SMOKER VARCHAR(16777216),
	REGION VARCHAR(16777216),
	MEDICAL_HISTORY VARCHAR(16777216),
	FAMILY_MEDICAL_HISTORY VARCHAR(16777216),
	EXERCISE_FREQUENCY VARCHAR(16777216),
	OCCUPATION VARCHAR(16777216),
	COVERAGE_LEVEL VARCHAR(16777216),
    METADATA$ROW_ID VARCHAR(16777216),
    METADATA$ISUPDATE BOOLEAN,
    METADATA$ACTION VARCHAR(16777216),
    METADATA_UPDATED_AT DATE,
    CHARGES FLOAT,
    PREDICTED_CHARGES FLOAT
);

-- Insert records into the landing table to simulate streamed data
INSERT INTO LANDING_TABLE(
    AGE ,
	GENDER,
	BMI,
	CHARGES ,
	CHILDREN,
	SMOKER,
	REGION,
	MEDICAL_HISTORY ,
	FAMILY_MEDICAL_HISTORY,
	EXERCISE_FREQUENCY ,
	OCCUPATION ,
	COVERAGE_LEVEL
) SELECT 
    AGE,
	GENDER,
	BMI,
	CHARGES ,
	CHILDREN,
	SMOKER,
	REGION,
	MEDICAL_HISTORY ,
	FAMILY_MEDICAL_HISTORY,
	EXERCISE_FREQUENCY ,
	OCCUPATION ,
	COVERAGE_LEVEL
FROM INCOMING_DATA_SOURCE
LIMIT 100; -- Change this number to test prediction speed at different quantities

-- View the inserted records in the stream, along with the added metadata columns
SELECT * FROM STREAM_ON_LANDING;

-- Run the predict_and_write.py file here. This will create the prediction/write to gold sproc

-- Call the prediction SPROC to see it work on the data you loaded into the stream.
CALL PREDICT_WRITE_TO_GOLD();

-- Testing the capacity to update records that already exist in the gold table
update landing_table set coverage_level = 'STANDARD'
where age = 41;

-- View the stream after updating
SELECT * FROM STREAM_ON_LANDING;

-- Call the prediction SPROC again to see how it handles updates
CALL PREDICT_WRITE_TO_GOLD();

-- Create the predict and write task
CREATE or REPLACE TASK PREDICT_WRITE_TASK
  WAREHOUSE = WH
  SCHEDULE = '1 MINUTE'
  WHEN
    SYSTEM$STREAM_HAS_DATA('STREAM_ON_LANDING')
  AS
    CALL PREDICT_WRITE_TO_GOLD();

-- Again, tasks are created in a suspended state. Resume it
ALTER TASK PREDICT_WRITE_TASK RESUME;

-- Clean up
--ALTER TASK PREDICT_WRITE_TASK SUSPEND;
--ALTER TASK TRAIN_SAVE_TASK SUSPEND;
--DROP DATABASE INSURANCE;


In [None]:
CALL train_save_ins_model('SOURCE_OF_TRUTH',FALSE);

In [None]:
EXECUTE TASK TRAIN_SAVE_TASK;
