# Snowflake ML Intro Notebook - ML Forecasting
This notebook introduces several key features of Snowflake ML in the process of training a machine learning model for forecasting Chicago bus ridership.


* Establish secure connection to Snowflake
* Load features and target from Snowflake table into Snowpark DataFrame
* Add features into a feature store
* Prepare features for model training
* Train ML model using Snowpark ML distributed processing
* Save the model to the Snowflake Model Registry
* Run model predictions inside Snowflake
* Deploy model to Snowflake Container Services

This notebook is intended to highlight Snowflake functionality and should not be taken as a best practice for time series forecasting. 

[Get Notebook](https://github.com/rajshah4/snowflake-notebooks/blob/main/Forecasting_ChicagoBus/Snowpark_Forecasting_Bus_FeatureStore.ipynb)  

[Go to folder with dataset](https://github.com/rajshah4/snowflake-notebooks/blob/main/Forecasting_ChicagoBus/)  

## 1. Setup Environment

In [None]:
# Snowflake connector
#from snowflake import connector
#from snowflake.ml.utils import connection_params

# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import Variant
from snowflake.snowpark.version import VERSION
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

# Snowpark ML
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import StandardScaler, OrdinalEncoder
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml import version
mlversion = version.VERSION

#Feature Store
from snowflake.ml.feature_store import FeatureStore, CreationMode, Entity, FeatureView

# Misc
import pandas as pd
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

import sys
print(sys.version) ##Last run used Python 3.11

session = get_active_session()



In [82]:
# Create a session with the specified connection parameters
session = get_active_session()

from snowflake.core.warehouse import Warehouse
from snowflake.core import Root
root = Root(session)

In [None]:
snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))
print('Snowflake ML version        : {}.{}.{}'.format(mlversion[0],mlversion[2],mlversion[4]))


In [None]:
session.sql("CREATE OR REPLACE SCHEMA ML_DEMO").collect()
session.sql("USE SCHEMA ML_DEMO").collect()

In [85]:
fs = FeatureStore(
    session=session,
    database="MODEL_SERVING_DB",
    name="FEATURE_STORE_MLDEMO",
    default_warehouse="DEMO_BUILD_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

## 2. Load Data in Snowflake 

In [None]:
df_clean = pd.read_csv('CTA_Daily_Totals_by_Route.csv')
df_clean.columns = df_clean.columns.str.upper()
print (df_clean.shape)
print (df_clean.dtypes)
df_clean.head()

Let's create a Snowpark dataframe and split the data for test/train. This operation is done inside Snowflake and not in your local environment. We will also save this as a table so we don't ever have to manually upload this dataset again.

PRO TIP -- Snowpark will inherit the schema of a pandas dataframe into Snowflake. Either change your schema before importing or after it has landed in snowflake. People that put models into production are very careful about data types.

In [None]:
input_df = session.create_dataframe(df_clean)
schema = input_df.schema
print(schema)

In [88]:
input_df.write.mode('overwrite').save_as_table('MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_BUS_RIDES')

Let's read from the table, since that is generally what you will be doing in production. 

In [None]:
df = session.read.table('MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_BUS_RIDES')
print (df.count())
df.show()

An entity is an abstraction over a set of primary keys used for looking up feature data. An Entity represents a real-world "thing" that has data associated with it. Below cell registers an entity called "route" in Feature Store.

In [None]:
entity = Entity(
    name="route",
    join_keys=["DATE"],
)
fs.register_entity(entity)

#Show the entities
fs.list_entities().show()

## 3. Distributed Feature Engineering

Let's do some feature engineering and then move that logic to the feature store. The feature engineering includes: adding a day of the week and aggregation the data by day and then later joining in weather data.

These operations are done inside the Snowpark warehouse which provides improved performance and scalability with distributed execution for these scikit-learn preprocessing functions. This dataset uses SMALL, but you can always move up to larger ones including Snowpark Optimized warehouses (16x memory per node than a standard warehouse), e.g., `session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()`

In [None]:
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()

Simple feature engineering

In [None]:
from snowflake.snowpark.functions import col, to_timestamp, dayofweek, month,sum, listagg, lag
from snowflake.snowpark import Window

df = df.with_column('DATE', to_timestamp(col('DATE'), 'MM/DD/YYYY'))

# Add a new column for the day of the week
# The day of week is represented as an integer, with 0 = Sunday, 1 = Monday, ..., 6 = Saturday
df = df.with_column('DAY_OF_WEEK', dayofweek(col('DATE')))

df.show()

In [None]:
# Add a new column for the month
df = df.with_column('MONTH', month(col('DATE')))

# Group by DATE, DAY_OF_WEEK, and MONTH, then aggregate
total_riders = df.group_by('DATE','DAY_OF_WEEK','MONTH').agg(
    F.listagg('DAYTYPE', is_distinct=True).alias('DAYTYPE'),
    F.sum('RIDES').alias('TOTAL_RIDERS')
).order_by('DATE')

#Define a window specification
window_spec = Window.order_by('DATE')

# Add a lagged column for total ridership of the previous day
total_riders = total_riders.with_column('PREV_DAY_RIDERS', lag(col('TOTAL_RIDERS'), 1).over(window_spec))

# Show the resulting dataframe
print (total_riders.count())
print (total_riders.show())


**Feature Views**

In [None]:
agg_fv = FeatureView(
    name="AggBusData",
    entities=[entity],
    feature_df=total_riders,
    timestamp_col="DATE",
)

agg_fv = fs.register_feature_view(agg_fv, version="1", overwrite=True)

# Show our newly created Feature View and display as Pandas DataFrame
fs.list_feature_views().to_pandas()

In [None]:
agg_fv = fs.get_feature_view(
    name="AGGBUSDATA",
    version="1"
)

## Join in the Weather Data from the Snowflake Marketplace

Instead of downloading data and building pipelines, Snowflake has a lot of useful data, including weather data in it's Marketplace. This means the data is only a SQL query away. 

 [Cybersyn Weather](https://app.snowflake.com/marketplace/listing/GZTSZAS2KIM/cybersyn-inc-weather-environmental-essentials?search=weather)

SQL QUERY: 
```
SELECT
  ts.noaa_weather_station_id,
  ts.DATE,
  COALESCE(MAX(CASE WHEN ts.variable = 'minimum_temperature' THEN ts.Value ELSE NULL END), 0) AS minimum_temperature,
  COALESCE(MAX(CASE WHEN ts.variable = 'precipitation' THEN ts.Value ELSE NULL END), 0) AS precipitation,
  COALESCE(MAX(CASE WHEN ts.variable = 'maximum_temperature' THEN ts.Value ELSE NULL END), 0) AS maximum_temperature
FROM
  cybersyn.noaa_weather_metrics_timeseries AS ts
JOIN
  cybersyn.noaa_weather_station_index AS idx
ON
  (ts.noaa_weather_station_id = idx.noaa_weather_station_id)
WHERE
  idx.NOAA_WEATHER_STATION_ID = 'USW00014819'
  AND (ts.VARIABLE = 'minimum_temperature' OR ts.VARIABLE = 'precipitation' OR ts.VARIABLE = 'maximum_temperature')
GROUP BY
  ts.noaa_weather_station_id,
  ts.DATE
LIMIT 1000;
```


In [None]:
create or replace table MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_WEATHER
as 
SELECT
  ts.noaa_weather_station_id,
  ts.DATE,
  COALESCE(MAX(CASE WHEN ts.variable = 'minimum_temperature' THEN ts.Value ELSE NULL END), 0) AS minimum_temperature,
  COALESCE(MAX(CASE WHEN ts.variable = 'precipitation' THEN ts.Value ELSE NULL END), 0) AS precipitation,
  COALESCE(MAX(CASE WHEN ts.variable = 'maximum_temperature' THEN ts.Value ELSE NULL END), 0) AS maximum_temperature
FROM
  WEATHER__ENVIRONMENT.cybersyn.noaa_weather_metrics_timeseries AS ts
JOIN
  WEATHER__ENVIRONMENT.cybersyn.noaa_weather_station_index AS idx
ON
  (ts.noaa_weather_station_id = idx.noaa_weather_station_id)
WHERE
  idx.NOAA_WEATHER_STATION_ID = 'USW00014819'
  AND (ts.VARIABLE = 'minimum_temperature' OR ts.VARIABLE = 'precipitation' OR ts.VARIABLE = 'maximum_temperature')
GROUP BY
  ts.noaa_weather_station_id,
  ts.DATE

In [None]:
weather = session.read.table('MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_WEATHER')

from snowflake.snowpark.types import DoubleType
weather = weather.withColumn('MINIMUM_TEMPERATURE', weather['MINIMUM_TEMPERATURE'].cast(DoubleType()))
weather = weather.withColumn('MAXIMUM_TEMPERATURE', weather['MAXIMUM_TEMPERATURE'].cast(DoubleType()))
weather = weather.withColumn('PRECIPITATION', weather['PRECIPITATION'].cast(DoubleType()))

weather.show()

Creating a feature view here for the weather data. This feature view will refresh every 24 hours. This is essential data that is constantly changing and Snowflake uses a dynamic table to manage the process. 

In [96]:
weather_fv = FeatureView(
    name="weather",
    entities=[entity],
    feature_df=weather,
    timestamp_col="DATE",
    refresh_freq="1 day", 
)

weather_fv = fs.register_feature_view(weather_fv, version="1", overwrite=True)

In [None]:
weather_fv= fs.get_feature_view(
    name="weather",
    version="1"
)

## Generate Training Dataset

Our feature store is filled with data, but we don't need to use it all. Here we select a subset of the feature store for training. 

In [97]:
# Create a date range between 2017 and 2019
date_range = pd.date_range(start='01/01/2013', end='12/31/2019')
date_column = date_range.strftime('%m/%d/%Y')
df = pd.DataFrame(date_column, columns=['DATE'])
spine_df = session.create_dataframe(df)

Generate a training dataset 

In [98]:
training_set = fs.generate_training_set(
    spine_df=spine_df,
    features=[agg_fv,weather_fv])

In [None]:
training_set.show()

In [100]:
## Dropping any null values
from snowflake.snowpark.functions import col, is_null, to_date

# Create a filter condition for non-finite values across all columns
non_finite_filter = None

# Iterate over all columns and update the filter condition
for column in training_set.columns:
    current_filter = is_null(col(column))
    non_finite_filter = current_filter if non_finite_filter is None else (non_finite_filter | current_filter)

# Apply the filter to the DataFrame to exclude rows with any non-finite values
df_filtered = training_set.filter(~non_finite_filter)

In [101]:
#Split the data into training and test sets
df_filtered = df_filtered.withColumn("DATE", to_date(col("DATE"), 'MM/dd/yyyy'))
train = df_filtered.filter(col('DATE') < '01/01/2019')
test = df_filtered.filter(col('DATE') >= '01/01/2019')

In [None]:
print (train.count())
print (test.count())

In [None]:
schema_info = train.schema

for field in schema_info.fields:
    print(f"Column: {field.name}, Type: {field.datatype}")

train = train.with_column("DATE", col("DATE").cast("STRING"))
test = test.with_column("DATE", col("DATE").cast("STRING"))

## 4. Distributed Feature Engineering in a Pipeline

Feature engineering + XGBoost

In [None]:
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()

In [104]:
 ## Distributed Preprocessing - 25X to 50X faster
numeric_features = ['DAY_OF_WEEK','MONTH','PREV_DAY_RIDERS','MINIMUM_TEMPERATURE','MAXIMUM_TEMPERATURE','PRECIPITATION']
numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])

categorical_cols = ['DAYTYPE']
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OrdinalEncoder(handle_unknown='use_encoded_value',unknown_value=-99999))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_cols)
        ])

pipeline = Pipeline(steps=[('preprocessor', preprocessor),('model', XGBRegressor())])

## 5. Distributed Training

These operations are done inside the Snowpark warehouse which provides improved performance and scalability with distributed execution for these scikit-learn preprocessing functions, hyperparameter tuning (grid and random) and XGBoost training (and many other types of models).

In [None]:
 ## Distributed HyperParameter Optimization
hyper_param = dict(
        model__max_depth=[2,4],
        model__learning_rate=[0.1,0.3],
    )

xg_model = GridSearchCV(
    estimator=pipeline,
    param_grid=hyper_param,
    #cv=5,
    input_cols=numeric_features + categorical_cols,
    label_cols=['TOTAL_RIDERS'],
    output_cols=["TOTAL_RIDERS_FORECAST"],
)

# Fit and Score
xg_model.fit(train)
##Takes 25 seconds

## 6. Model Evaluation
Look at the results of the mode. cv_results is a dictionary, where each key is a string describing one of the metrics or parameters, and the corresponding value is an array with one entry per combination of parameters

In [None]:
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'SMALL'").collect()

In [None]:
cv_results = xg_model.to_sklearn().cv_results_

for i in range(len(cv_results['params'])):
    print(f"Parameters: {cv_results['params'][i]}")
    print(f"Mean Test Score: {cv_results['mean_test_score'][i]}")
    print()

Look at the accuracy of the model

In [None]:
from snowflake.ml.modeling.metrics import mean_absolute_error
testpreds = xg_model.predict(test)
print('MSE:', mean_absolute_error(df=testpreds, y_true_col_names='TOTAL_RIDERS', y_pred_col_names='"TOTAL_RIDERS_FORECAST"'))
testpreds.select("DATE", "TOTAL_RIDERS", "TOTAL_RIDERS_FORECAST").show(10)         

Materialize the results to a table

In [109]:
testpreds.write.save_as_table(table_name='MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_BUS_RIDES_FORECAST', mode='overwrite')

Using metrics from snowpark so calculation is done inside snowflake

## 7. Save to the Model Registry and use for Predictions (Python & SQL)

Connect to the registry

In [110]:
from snowflake.ml.registry import Registry
reg = Registry(session=session, database_name="MODEL_SERVING_DB", schema_name="FEATURE_STORE_MLDEMO")

In [None]:
model_ref = reg.log_model(
    model_name="Forecasting_Bus_Ridership",
    version_name="v2",    
    model=xg_model,
    target_platforms=["WAREHOUSE"],
    conda_dependencies=["scikit-learn","xgboost"],
    sample_input_data=train,
    comment="XGBoost model, Oct 9"
)

Let's retrieve the model from the registry

In [112]:
reg_model = reg.get_model("Forecasting_Bus_Ridership").version("v2")

In [None]:
DESC MODEL MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.Forecasting_Bus_Ridership

Let's do predictions inside the warehouse for some evaluation

In [None]:
remote_prediction = reg_model.run(test, function_name='predict')
remote_prediction.sort("DATE").select("DATE","TOTAL_RIDERS","TOTAL_RIDERS_FORECAST").show(10)

save evaluation metrics in the model registry

In [116]:
from snowflake.ml.modeling.metrics import mean_absolute_error
mae = mean_absolute_error(df=remote_prediction, y_true_col_names='TOTAL_RIDERS', y_pred_col_names='"TOTAL_RIDERS_FORECAST"')
reg_model.set_metric("MAE", value=mae)

In [None]:
reg_model.show_metrics()

## 8. Get Shap Explanations

In [120]:
explanations = reg_model.run(test, function_name="explain")

## 9. Show lineage

In [121]:
TABLE_NAME = "MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_BUS_RIDES_FORECAST"
df = session.lineage.trace(f"MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGO_BUS_RIDES", "TABLE", direction="downstream", distance=4)

In [None]:
df.show()

In [None]:
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'SMALL'").collect()

## 10. Deploy the model to SPCS for Inference

It's now possible to deploy and run a model in Snowpark Container Services (SPCS), thus making Snowflake Model Registry more universal and useful by supporting large models that need distributed clusters or GPUs for execution, or that have pip dependencies on OSS or user’s own libraries and frameworks. All of these benefits can be realized without mastering knowledge of docker containers, kubernetes, etc.

In [None]:
CREATE COMPUTE POOL mypool
  MIN_NODES = 1
  MAX_NODES = 1
  INSTANCE_FAMILY = GPU_NV_S;

ALTER COMPUTE POOL mypool SET AUTO_RESUME = true; 

describe compute pool mypool;

create or replace schema notebooks;

create or replace image repository MODEL_SERVING_DB.notebooks.my_inference_images;

CREATE OR REPLACE NETWORK RULE allow_all_rule
MODE = 'EGRESS'
TYPE = 'HOST_PORT'
VALUE_LIST = ('0.0.0.0:443','0.0.0.0:80');

alter NETWORK RULE allow_all_rule set value_list = ('0.0.0.0:443','0.0.0.0:80','<YOUR_ACCOUNT_IDENTIFIER>.snowflakecomputing.com');

-- Please create a public and private key 
alter user XXXX  SET RSA_PUBLIC_KEY='XXXXX';

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION allow_all_integration
ALLOWED_NETWORK_RULES = (allow_all_rule)
ENABLED = true;

In [None]:
reg_model.create_service(service_name="ChicagoBusForecastv12",
                  service_compute_pool="mypool",
                  image_repo="MODEL_SERVING_DB.notebooks.my_inference_images",
                  build_external_access_integration="allow_all_integration",
                  ingress_enabled=True)

In [None]:
spcs_prediction = reg_model.run(test, function_name='predict', service_name="CHICAGOBUSFORECASTV12")
spcs_prediction.sort("DATE").select("DATE","TOTAL_RIDERS","TOTAL_RIDERS_FORECAST").show(10)

In [None]:
session.sql("SHOW ENDPOINTS IN SERVICE MODEL_SERVING_DB.FEATURE_STORE_MLDEMO.CHICAGOBUSFORECASTV12").collect()