# NOTEBOOK 3: END TO END ML USING SNOWPARK AND SCIKIT-LEARN

In this notebook we fit/train a Scikit-Learn ML pipeline that includes common feature engineering tasks such as Imputations, Scaling and One-Hot Encoding. The pipeline also includes a `RandomForestRegressor` model that will predict median house values in California. 

We will fit/train the pipeline using a Snowpark Python Stored Procedure (SPROC) and then log it to the Snowflake Model Registry. This example concludes by showing how a loged model/pipeline can run in a scalable fashion on a snowflake warehouse. 

We will also use Snowpark Optimized warehouse in this notebook.

![Snowpark ML](images/snowflake_e2e_ml.png)

### Create a session with Snowflake

In [None]:
# Snowpark libs
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as snow_funcs
from snowflake.snowpark import types as snow_types
from snowflake.snowpark import version

# Snowpark ML libs for Model Registry
from snowflake.ml.registry import Registry

# Snowflake Python libs
from snowflake.core import Root
from snowflake.core.warehouse import Warehouse
from snowflake.core.stage import Stage

# Sickit-learn libs
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

import json

#Snowflake connection info
from config import snowflake_conn_prop

print(f'Snowpark version : {version.VERSION}')

Connect to Snowflake

In [None]:
session = Session.builder.configs(snowflake_conn_prop).create()
#root = Root(session)

print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

### Create a Snowflake stage

In order to create a permanent Stored Procedure, model training in Snowflake, we need a Snowflake stage.

In [None]:
root = Root(session)

stage_name="qs_sklearn_stage"

code_stage = Stage(
  name=stage_name
)

code_stage = root.databases[snowflake_conn_prop['database']].schemas[snowflake_conn_prop['schema']].stages.create(code_stage, mode='or_replace')


### Stored Proc fits the pipeline and the model and then saves it in Snowflake

Start by creating a training function, that creates a pipline with preprocessing of the data and then train a RandomForestRegressor model.

We already saw some preprocessing steps in previous notebook but now we will create it as a function which will then be packaged as Stored procedure to run this entire python function in Snowflake

We will use scickit-learn for this.

In [None]:
# Training function
def fit_pipeline(X, y, cat_attribs, num_attribs):

    # create a pipeline for numerical features
    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    # Pipeline for categorical features
    cat_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    
    # Create the preprocessor
    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs),
            ("cat", cat_pipeline, cat_attribs)
        ])

    # Create the full pipeline wincluding the model training
    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)),
        ])

    # fit the preprocessing pipeline and the model together
    full_pipeline.fit(X, y)

    return full_pipeline


To test the training function locally we will ned to pull back the data into a Pandas DataFrame, by using the **sample** method we can get 10% of the data.

In [None]:
pd_test = session.table("HOUSING_DATA").sample(frac=0.10).to_pandas()
pd_test.shape

Test the function with the sample data

In [None]:

X = pd_test.loc[:, pd_test.columns != 'MEDIAN_HOUSE_VALUE']
y = pd_test['MEDIAN_HOUSE_VALUE']

test_full_pipe = fit_pipeline(X, y,  ['OCEAN_PROXIMITY'], ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME'])
test_full_pipe

In [None]:
# Get the first prediction
test_full_pipe.predict(X)[0]

We can now deploy the training function as a Python Stored Procedure in Snowflake, so we can run the training on Snowflake compute and do not need to move data around.

We will also create a wrapper function for our training function where we can get the data and convert it to a Pandas DataFrame to be used with the training function, this is the function that will be the logic of the Stored Procedure.

In [None]:
# Stored Procedure function
def train_model(snf_session: Session # A stored procedure will recive a session object when executed in snowflake with the authentification done
                , training_table: str # Table name that has the data to be used for training and test
                , target_col: str # name of the target column
                , m_name: str # name of the model in the Model Registry
                , m_version: str # name of the model version
                ) -> dict: # 
    
    # Libraries used in the function that has not been imported as part of the python session
    from datetime import datetime
    import numpy as np

    now = datetime.now() # Get the date and time when this is strated
    
    # Get the training table and split into a training and test Snowpark DataFrames
    snowdf_train, snowdf_test = snf_session.table(training_table).random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable

    # Get the categorical and numeric column names
    cat_attribs = [c.name for c in snowdf_train.schema.fields if (type(c.datatype) == snow_types.StringType) & (c.name != target_col)]
    numeric_types = [snow_types.DecimalType, snow_types.LongType, snow_types.DoubleType, snow_types.FloatType, snow_types.IntegerType]
    num_attribs = [c.name for c in snowdf_train.schema.fields if (type(c.datatype) in numeric_types) & (c.name != target_col)]


    # save the train and test sets as time stamped tables in Snowflake 
    table_suffix = now.strftime("%Y%m%d%H%M%S")
    train_table_name = training_table + '_TRAIN_' + table_suffix
    test_table_name = training_table + '_TEST_' + table_suffix
    snowdf_train.write.mode("overwrite").save_as_table(train_table_name)
    snowdf_test.write.mode("overwrite").save_as_table(test_table_name)

    pd_train = snowdf_train.to_pandas()
    
    X_train = pd_train.loc[:, pd_train.columns != target_col]
    y_train = pd_train[target_col]
    
    # Fit the model (pipeline)
    full_pipeline = fit_pipeline(X_train, y_train, cat_attribs, num_attribs)

    # predict on the test set and return the root mean squared error (RMSE)
    pd_test = snowdf_test.to_pandas()
    
    X_test = pd_test.loc[:, pd_train.columns != target_col]
    y_test = pd_test[target_col]
    
    # Calculate test metrics
    housing_predictions = full_pipeline.predict(X_test)
    lin_mse = mean_squared_error(y_test, housing_predictions)

    lin_rmse = np.sqrt(lin_mse)

    # Connect to Model Registry
    snowml_registry = Registry(snf_session)
    
    # Log the fitted pipeline in the Model Registry
    sklearn_mv = snowml_registry.log_model(
                                    full_pipeline,
                                    model_name=m_name,
                                    version_name=m_version,
                                    sample_input_data= X_train[:50],
                                    conda_dependencies=["scikit-learn"],
                                    options={"relax_version": False},
                                    comment = 'SKLearn pipline to predict housing prices'
                                    ,metrics={'test_mse': lin_mse, 'test_rmse': lin_rmse}
                        )

    # Create a dict to return with test metrics and the path to the saved model pipeline
    ret_dict = {
        "model_name": sklearn_mv.model_name
        ,"model_version": sklearn_mv.version_name
        , "fully_qualified_model_name": sklearn_mv.fully_qualified_model_name
        , "train_table": train_table_name
        , "test_table": test_table_name
    }
    return ret_dict

Deploy the train_model function to Snowflake as a Python stored procedure, Snowpark will also include the fit_pipeline function.

When deploying a stored procedure we will also need to sepcify what third-party Python libraies the functions are depended on, these libraries must be avalible in the Snowflake Anaconda channel. By using the **packages** we make sure that we only include the ones needed for this stored procedure.

In [None]:
train_model_sp = snow_funcs.sproc(func=train_model, name="train_house_sp" ,replace=True, is_permanent=True
                                  , stage_location=f"{stage_name}/sp/", session=session
                                  , packages=['snowflake-snowpark-python==1.23.0', 'snowflake-ml-python==1.6.4' ,'scikit-learn'])

### Run the training within the SPROC
We will now train, test and deploy a new model in Snowflake. Everything is running on Snowflake compute without any need to move data outside of Snowflake. The resulting model will be deployed in Snowflake using Model Registry.

In [None]:
return_dict = json.loads(train_model_sp(session, "HOUSING_DATA", "MEDIAN_HOUSE_VALUE",  "sklearn_housing", "V1"))
return_dict

Now the model are trained and also deployed in Snowflake using the Model Registry, we can connect to it and list the models

In [None]:
snow_registry = Registry(session)
snow_registry.show_models()

We can also list the version avalible for a specific model

In [None]:
snow_model = snow_registry.get_model("sklearn_housing")
snow_model.show_versions()

### Optionally : For use cases where training data size is big you can optimize execution speed of model training by using Snowpark optimized warehouse

In [None]:
# creating a snowpark optimised warehouse
#session.sql("create or replace warehouse LAB_SCIKIT_SNOWPARK_WH with \
#                WAREHOUSE_SIZE = MEDIUM \
#                AUTO_SUSPEND = 60 \
#                WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED' \
#                AUTO_RESUME = TRUE").collect()

# Create warehouse
so_wh_name = "LAB_SCIKIT_SNOWPARK_WH"
so_wh = Warehouse(
    name=so_wh_name, 
    warehouse_size="MEDIUM", 
    auto_suspend=600, 
    auto_resume='true', 
    warehouse_type='SNOWPARK-OPTIMIZED'
)

warehouses = root.warehouses
so_wh = warehouses.create(so_wh, mode='or_replace')

session.use_warehouse(so_wh_name)
session.get_current_warehouse()

Call the training using the new snowpark-optimized warehouse

In [None]:
# calling the training stored procedure
return_dict = json.loads(train_model_sp(session, "HOUSING_DATA", "MEDIAN_HOUSE_VALUE",  "sklearn_housing", "V2"))
return_dict
# suspending the snowpark optimised warehouse
so_wh.suspend()

# using regular warehouse
session.use_warehouse(format(snowflake_conn_prop['warehouse']))
session.get_current_warehouse()

Check that we now have a new version of the model registred

In [None]:
snow_model.show_versions()

#### Run the Model Version in Snowflake to make predictions over the test dataset

Since the model and it's versions are loged in Snowflake using Model Registry we can use a version for making predictions on our data. We can either use the default version, V1, or the our second version. For this example we wioll use the first version which is the default.

Start by checkling what functions we can call for the model version

In [None]:
# Get the default version of the model
def_mv = snow_model.default
# Show avalible functions for the version
def_mv.show_functions()

In above we can see that we have a **predict** functtion and a **explain** function, the **explain** function will give us SHAP values and the **predict** will give us the predictions.

We can run the model using the test dataset that is created as part of the model training, to get the name we can execute `SHOW TABLES` to see it.

In [None]:
session.sql("show tables").show()

In [None]:
snowdf_test = session.table("HOUSING_DATA_TEST_20241108072239") # REPLACE THE TABLE NAME WITH ONE FROM ABOVE!
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE")
                    
snowdf_results = def_mv.run(inputs, function_name='predict')

snowdf_results.show()

If we want to save the predictions in Snowflake we can do `snowdf_results.write.save_as_table("my_table_name", mode='overwrite')`

In [None]:
session.close()