# NOTEBOOK 3: END TO END ML USING SNOWPARK AND SCIKIT-LEARN

In this notebook we fit/train a Scikit-Learn ML pipeline that includes common feature engineering tasks such as Imputations, Scaling and One-Hot Encoding. The pipeline also includes a `RandomForestRegressor` model that will predict median house values in California. 

We will fit/train the pipeline using a Snowpark Python Stored Procedure (SPROC) and then save the pipeline to a Snowflake stage. This example concludes by showing how a saved model/pipeline can be loaded and run in a scalable fashion on a snowflake warehouse using Snowpark Python User-Defined Functions (UDFs). 

We will also use Snowpark Optimized warehouse in this notebook.

![Snowpark ML](images/snowflake_e2e_ml.png)

### Create a session with Snowflake

In [None]:
# Snowpark libs
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import version

# Sickit-learn libs
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

import pandas as pd
import json
import cachetools

#Snowflake connection info
from config import snowflake_conn_prop

print(f'Snowpark version : {version.VERSION}')

Connect to Snowflake

In [None]:
import sys
sys.path.append('..')
from utilities.creds import Credentials
session = Session.builder.configs(Credentials().__dict__).create()

# Setting up the session environment
session.use_role("LEARNINGSNOWPARKROLE")
session.use_database("SCIKIT_LEARN")
session.use_schema("SCIKIT_LEARN.PUBLIC")
session.use_warehouse("LEARNINGSNOWPARKVW")

print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

### Create a Snowflake stage to save the ML model/pipeline and permanent UDFs

In order to create a permanent Stored Procedure, model training in Snowflake, UDF to score the model in Snowflake, and store the model file we need a Snowflake stage.

In [None]:
stage_name = 'qs_sklearn_stage'
# collect function triggers execution of the SQL
session.sql(f"create or replace stage {stage_name}").collect()

### Stored Proc fits the pipeline and the model and then saves it in Snowflake

Start by creating a training function, that creates a pipline with preprocessing of the data and then train a RandomForestRegressor model.

We already saw some preprocessing steps in previous notebook but now we will create it as a function which will then be packaged as Stored procedure to run this entire python function in Snowflake

We will use scickit-learn for this.

In [None]:
# Training function
def fit_pipeline(X, y, cat_attribs, num_attribs):

    # create a pipeline for numerical features
    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    # Pipeline for categorical features
    cat_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    
    # Create the preprocessor
    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs),
            ("cat", cat_pipeline, cat_attribs)
        ])

    # Create the full pipeline wincluding the model training
    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)),
        ])

    # fit the preprocessing pipeline and the model together
    full_pipeline.fit(X, y)

    return full_pipeline


To test the training function locally we will ned to pull back the data into a Pandas DataFrame, by using the **sample** method we can get 10% of the data.

In [None]:
pd_test = session.table("HOUSING_DATA").sample(frac=0.10).to_pandas()
pd_test.shape

Test the function with the sample data

In [None]:

X = pd_test.loc[:, pd_test.columns != 'MEDIAN_HOUSE_VALUE']
y = pd_test['MEDIAN_HOUSE_VALUE']

test_full_pipe = fit_pipeline(X, y,  ['OCEAN_PROXIMITY'], ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME'])
test_full_pipe

In [None]:
# Get the first prediction
test_full_pipe.predict(X)[0]

We can now deploy the training function as a Python Stored Procedure in Snowflake, so we can run the training on Snowflake compute and do not need to move data around.

We also want to save the trained model (pipeline) as a file so we can use it in UDF, scoring function, later. The palce to save it is in a Snowflake stage and we can create a function to do that, in real life we would already have that function in a utility model we can resue for multiple projects.

In [None]:
# Function to save a Python object to a Snowflake internal stage
def save_file(snf_session, object, stage_name, stage_path, file_name):
  import io
  import joblib

  save_path = stage_name + '/' + stage_path
  input_stream = io.BytesIO()
  input_stream.name = file_name
  joblib.dump(object, input_stream)
  put_result = snf_session.file.put_stream(input_stream, save_path, overwrite=True)
  
  return f'{save_path}/{put_result.target}'

We will create a wrapper function for our training function where we can get the data and convert it to a Pandas DataFrame to be used with the training function, this is the function that will be the logic of the Stored Procedure.

In [None]:
# Stored Procedure function
def train_model(snf_session: Session # A stored procedure will recive a session object when executed in snowflake with the authentification done
                , training_table: str # Table name that has the data to be used for training and test
                , target_col: str # name of the target column
                , save_stage: str # name of the stage to save the fitted pipline object
                ) -> dict: # 
    
    # Libraries used in the function that has not been imported as part of the python session
    from datetime import datetime
    import numpy as np

    now = datetime.now() # Get the date and time when this is strated
    
    # Get the training table and split into a training and test Snowpark DataFrames
    snowdf_train, snowdf_test = snf_session.table(training_table).random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable

    # Get the categorical and numeric column names
    cat_attribs = [c.name for c in snowdf_train.schema.fields if (type(c.datatype) == T.StringType) & (c.name != target_col)]
    numeric_types = [T.DecimalType, T.LongType, T.DoubleType, T.FloatType, T.IntegerType]
    num_attribs = [c.name for c in snowdf_train.schema.fields if (type(c.datatype) in numeric_types) & (c.name != target_col)]


    # save the train and test sets as time stamped tables in Snowflake 
    table_suffix = now.strftime("%Y%m%d%H%M%S")
    train_table_name = training_table + '_TRAIN_' + table_suffix
    test_table_name = training_table + '_TEST_' + table_suffix
    snowdf_train.write.mode("overwrite").save_as_table(train_table_name)
    snowdf_test.write.mode("overwrite").save_as_table(test_table_name)

    pd_train = snowdf_train.to_pandas()
    
    X_train = pd_train.loc[:, pd_train.columns != target_col]
    y_train = pd_train[target_col]
    
    # Fit the model (pipeline)
    full_pipeline = fit_pipeline(X_train, y_train, cat_attribs, num_attribs)


    # save the full pipeline including the model
    
    # Save the model to stage
    save_path = now.strftime("%Y-%m-%d-%H%M%S")
    object_saved_path = save_file(snf_session, full_pipeline, f"@{save_stage}/models", save_path, 'housing_fores_reg.joblib')


    # predict on the test set and return the root mean squared error (RMSE)
    pd_test = snowdf_test.to_pandas()
    
    X_test = pd_test.loc[:, pd_train.columns != target_col]
    y_test = pd_test[target_col]
    
    housing_predictions = full_pipeline.predict(X_test)
    lin_mse = mean_squared_error(y_test, housing_predictions)

    lin_rmse = np.sqrt(lin_mse)

    # Create a dict to return with test metrics and the path to the saved model pipeline
    ret_dict = {
        "MSE": lin_mse
        ,"RMSE": lin_rmse
        , "model_path": object_saved_path
        , "train_table": train_table_name
        , "test_table": test_table_name
    }
    return ret_dict

Deploy the train_model function to Snowflake as a Python stored procedure, Snowpark will also include the fit_pipeline and save_file functions.

When deploying a stored procedure we will also need to sepcify what third-party Python libraies the functions are depended on, these libraries must be avalible in the Snowflake Anaconda channel. By using **clear_packages** and **clear_imports** first we make sure that we only include the ones needed for this stored procedure.

In [None]:
session.clear_packages()
session.clear_imports()
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')
train_model_sp = F.sproc(func=train_model, name="train_house_sp" ,replace=True, is_permanent=True, stage_location=f"{stage_name}/sp/", session=session)

### Run the training within the SPROC

In [None]:
return_dict = json.loads(train_model_sp(session, "HOUSING_DATA", "MEDIAN_HOUSE_VALUE", stage_name))
return_dict

### Optionally : For use cases where training data size is big you can optimize execution speed of model training by using Snowpark optimized warehouse

In [None]:
# creating a snowpark optimised warehouse
#session.sql("create or replace warehouse LAB_SCIKIT_SNOWPARK_WH with \
#                WAREHOUSE_SIZE = MEDIUM \
#                AUTO_SUSPEND = 60 \
#                WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED' \
#                AUTO_RESUME = TRUE").collect()
#session.use_warehouse("LAB_SCIKIT_SNOWPARK_WH")
# calling the training stored procedure
return_dict = json.loads(train_model_sp(session, "HOUSING_DATA", "MEDIAN_HOUSE_VALUE", stage_name))
return_dict
# suspending the snowpark optimised warehouse
#session.sql("ALTER WAREHOUSE LAB_SCIKIT_SNOWPARK_WH SUSPEND")
# using regular warehouse
session.use_warehouse(format(snowflake_conn_prop['warehouse']))

Check that the model file is stored on the stage

In [None]:
session.sql(f"ls @{stage_name}").show(max_width=150)

### Model/Pipeline Deployment 

To use the fitted model on new data we can create UDF in Snowflake, that allows us to do the scoring where the data is.

Since the model is stored on stage we need to load it as part of the call to the UDF, but we do not wish to read it from stage for every function call so we can then create specific function for loading the file and then use cachetools to cache the result of the function call

In [None]:
# To make sure we do not have previous imports and packages added
session.clear_imports()
session.clear_packages()

@cachetools.cached(cache={})
def read_file(filename):
       import sys
       import os
       import joblib
       # Get the "path" of where files added through iport are avalible
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

We then can create the scoring function, predict_house_value, that by using the decorator @F.udf before will be automatically deployed to Snowflake as UDF with the name predict_house_value. By defining the input data type for the function to a PandasDataFrame and the return as PandasSeries, the UDF will run as a vectorized UDF where it will recive a batch of rows for each call.

Since we have saved the fitted pipline as a file in stage we need to add it as a import so it can be accessed by the function, through the read_file function created above.

In [None]:
features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME', 'OCEAN_PROXIMITY']

@F.udf(name="predict_house_value", is_permanent=True, stage_location=f'@{stage_name}/udf/', replace=True
              , imports=[return_dict['model_path']]
              , packages=['scikit-learn', 'pandas', 'joblib', 'cachetools'])
def predict_house_value(pd_df: T.PandasDataFrame[float, float, float, float, float, float, float, float
                                                 , str]) -> T.PandasSeries[float]:
       pd_df.columns = features
       m = read_file('housing_fores_reg.joblib.gz') 
       return m.predict(pd_df)

#### Run the UDF to make predictions over the test dataset

We can now use the UDF to score data that is in Snowflake, we can use the Snowpark DataFrame API for it, but also use SQL or JAVA/SCALA to do that.

In [None]:
# to call the udf for inferencing we will use call_function

snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE")
                    
snowdf_results = snowdf_test.select(*inputs,
                    F.call_function("predict_house_value",*inputs).alias('PREDICTION'), 
                    (F.col('MEDIAN_HOUSE_VALUE')).alias('ACTUAL_LABEL')
                    )

snowdf_results.show()

In [None]:
session.close()