In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
# Snowpark for Python
from snowflake.snowpark.types import DoubleType
import snowflake.snowpark.functions as F

In [None]:

-- Using Warehouse, Database, and Schema created during Setup
USE WAREHOUSE ML_HOL_WH;
USE DATABASE ML_HOL_DB;
USE SCHEMA ML_HOL_SCHEMA;

In [None]:
# Get Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True

# Add a query tag to the session.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"e2e_ml_snowparkpython", 
                     "version":{"major":1, "minor":0,},
                     "attributes":{"is_quickstart":1}}

# Current Environment Details
print('Connection Established with the following parameters:')
print('User      : {}'.format(session.get_current_user()))
print('Role      : {}'.format(session.get_current_role()))
print('Database  : {}'.format(session.get_current_database()))
print('Schema    : {}'.format(session.get_current_schema()))
print('Warehouse : {}'.format(session.get_current_warehouse()))

In [None]:
# Create a Snowpark DataFrame that is configured to load data from the CSV file
# We can now infer schema from CSV files.
diamonds_df = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@DIAMONDS_ASSETS")

diamonds_df

In [None]:
# Force headers to uppercase
for colname in diamonds_df.columns:
    if colname == '"table"':
       new_colname = "TABLE_PCT"
    else:
        new_colname = str.upper(colname)
    diamonds_df = diamonds_df.with_column_renamed(colname, new_colname)

diamonds_df
def fix_values(columnn):
    return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))

for col in ["CUT"]:
    diamonds_df = diamonds_df.with_column(col, fix_values(col))

diamonds_df
list(diamonds_df.schema)
for colname in ["CARAT", "X", "Y", "Z", "DEPTH", "TABLE_PCT"]:
    diamonds_df = diamonds_df.with_column(colname, diamonds_df[colname].cast(DoubleType()))

diamonds_df
diamonds_df.write.mode('overwrite').save_as_table('diamonds')


In [None]:

# Snowpark for Python
import snowflake.snowpark.functions as F
from snowflake.snowpark.types import DecimalType

# Snowflake ML
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.metrics.correlation import correlation

# Data Science Libs
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Misc
import json
import joblib

# warning suppresion
import warnings; warnings.simplefilter('ignore')

In [None]:
session = get_active_session()

# Add a query tag to the session.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"e2e_ml_snowparkpython", 
                     "version":{"major":1, "minor":0,},
                     "attributes":{"is_quickstart":1}}
session
# First, we read in the data from a Snowflake table into a Snowpark DataFrame
# **Change this only if you named your table something else in the data ingest notebook **
diamonds_df = session.table("DIAMONDS")
# Normalize the CARAT column
snowml_mms = snowml.MinMaxScaler(input_cols=["CARAT"], output_cols=["CARAT_NORM"])
normalized_diamonds_df = snowml_mms.fit(diamonds_df).transform(diamonds_df)

# Reduce the number of decimals
new_col = normalized_diamonds_df.col("CARAT_NORM").cast(DecimalType(7, 6))
normalized_diamonds_df = normalized_diamonds_df.with_column("CARAT_NORM", new_col)

normalized_diamonds_df
# Encode CUT and CLARITY preserve ordinal importance
categories = {
    "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
}
snowml_oe = snowml.OrdinalEncoder(input_cols=["CUT", "CLARITY"], output_cols=["CUT_OE", "CLARITY_OE"], categories=categories)
ord_encoded_diamonds_df = snowml_oe.fit(normalized_diamonds_df).transform(normalized_diamonds_df)

# Show the encoding
print(snowml_oe._state_pandas)

ord_encoded_diamonds_df

# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=["CUT", "COLOR", "CLARITY"], output_cols=["CUT_OHE", "COLOR_OHE", "CLARITY_OHE"])
transformed_diamonds_df = snowml_ohe.fit(ord_encoded_diamonds_df).transform(ord_encoded_diamonds_df)

np.array(transformed_diamonds_df.columns)
# Categorize all the features for processing
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

categories = {
    "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
    "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
}
# Build the pipeline
preprocessing_pipeline = Pipeline(
    steps=[
            (
                "OE",
                snowml.OrdinalEncoder(
                    input_cols=CATEGORICAL_COLUMNS,
                    output_cols=CATEGORICAL_COLUMNS_OE,
                    categories=categories,
                )
            ),
            (
                "MMS",
                snowml.MinMaxScaler(
                    clip=True,
                    input_cols=NUMERICAL_COLUMNS,
                    output_cols=NUMERICAL_COLUMNS,
                )
            )
    ]
)

PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first

transformed_diamonds_df = preprocessing_pipeline.fit(diamonds_df).transform(diamonds_df)
transformed_diamonds_df
# You can also save the pickled object into the stage we created earlier for deployment
session.file.put(PIPELINE_FILE, "@ML_HOL_ASSETS", overwrite=True)
# Set up a plot to look at CARAT and PRICE
counts = transformed_diamonds_df.to_pandas().groupby(['PRICE', 'CARAT', 'CLARITY_OE']).size().reset_index(name='Count')

fig, ax = plt.subplots(figsize=(20, 20))
plt.title('Price vs Carat', fontsize=28)
ax = sns.scatterplot(data=counts, x='CARAT', y='PRICE', size='Count', hue='CLARITY_OE', markers='o')
ax.grid(axis='y')

# The relationship is not linear - it appears exponential which makes sense given the rarity of the large diamonds
sns.move_legend(ax, "upper left")
sns.despine(left=True, bottom=True)

In [None]:
# Snowpark for Python
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

# Snowflake ML
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier

# data science libs
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error

# misc
import json
import joblib
import cachetools

# warning suppresion
import warnings; warnings.simplefilter('ignore')
# Establish Secure Connection to Snowflake
session = get_active_session()

# Add a query tag to the session.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"e2e_ml_snowparkpython", 
                     "version":{"major":1, "minor":0,},
                     "attributes":{"is_quickstart":1}}
session
# Load in the data
diamonds_df = session.table("DIAMONDS")
diamonds_df
# Categorize all the features for modeling
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']
# Load the preprocessing pipeline object from stage- to do this, we download the preprocessing_pipeline.joblib.gz file to the warehouse
# where our notebook is running, and then load it using joblib.
session.file.get('@ML_HOL_ASSETS/preprocessing_pipeline.joblib.gz', '/tmp')
PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib.gz'
preprocessing_pipeline = joblib.load(PIPELINE_FILE)
# Split the data into train and test sets
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)

# Run the train and test sets through the Pipeline object we defined earlier
train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
test_df = preprocessing_pipeline.transform(diamonds_test_df)
# Define the XGBRegressor
regressor = XGBRegressor(
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
regressor.fit(train_df)

# Predict
result = regressor.predict(test_df)
# Just to illustrate, we can also pass in a Pandas DataFrame to Snowflake ML's model.predict()
regressor.predict(test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].to_pandas())
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE")
print(f"Mean absolute percentage error: {mape}")

# Plot actual vs predicted 
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

In [None]:
ALTER WAREHOUSE ML_HOL_WH SET WAREHOUSE_SIZE=LARGE;


In [None]:
grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)

In [None]:
ALTER WAREHOUSE ML_HOL_WH SET WAREHOUSE_SIZE=XSMALL;

In [None]:
print(grid_search.to_sklearn().best_estimator_)

# Analyze grid search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})

sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

plt.show()

# Predict
result = grid_search.predict(test_df)

# Analyze results
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

# Plot actual vs predicted 
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

optimal_model = grid_search.to_sklearn().best_estimator_
optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate

optimal_mape = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
                                 (gs_results_df['learning_rate']==optimal_learning_rate), 'mape'].values[0]





In [None]:
# Get sample input data to pass into the registry logging function
X = train_df.select(CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS).limit(100)



db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

# Define model name
model_name = "DIAMONDS_PRICE_PREDICTION"

# Create a registry and log the model
native_registry = Registry(session=session, database_name=db, schema_name=schema)

# Let's first log the very first model we trained
model_ver = native_registry.log_model(
    model_name=model_name,
    version_name='V0',
    model=regressor,
    sample_input_data=X, # to provide the feature schema
    options={"enable_explainability": True}
)

# Add evaluation metric
model_ver.set_metric(metric_name="mean_abs_pct_err", value=mape)

# Add a description
model_ver.comment = "This is the first iteration of our Diamonds Price Prediction model. It is used for demo purposes."

# Now, let's log the optimal model from GridSearchCV
model_ver2 = native_registry.log_model(
    model_name=model_name,
    version_name='V1',
    model=optimal_model,
    sample_input_data=X, # to provide the feature schema
    options={"enable_explainability": True}
)

# Add evaluation metric
model_ver2.set_metric(metric_name="mean_abs_pct_err", value=optimal_mape)

# Add a description
model_ver2.comment = "This is the second iteration of our Diamonds Price Prediction model \
                        where we performed hyperparameter optimization."

In [None]:
# Let's confirm they were added
native_registry.get_model(model_name).show_versions()
native_registry.get_model(model_name).default.version_name
model_ver = native_registry.get_model(model_name).version('v1')
result_sdf2 = model_ver.run(test_df, function_name="predict")
result_sdf2.show()
test_df.write.mode('overwrite').save_as_table('DIAMONDS_TEST')
mv_explanations = model_ver.run(train_df, function_name="explain")
mv_explanations
import shap

mv_explanations_pd = mv_explanations.to_pandas()

mv_explanations_pd_2 = mv_explanations_pd[['CUT_OE_explanation',
                                           'COLOR_OE_explanation',
                                           'CLARITY_OE_explanation',
                                           'CARAT_explanation',
                                           'DEPTH_explanation',
                                           'TABLE_PCT_explanation',
                                           'X_explanation',
                                           'Y_explanation',
                                           'Z_explanation']]

# Wrapping the explanations DataFrame into a SHAP recognized object
shap_exp = shap._explanation.Explanation(mv_explanations_pd_2.values, 
                                         feature_names = mv_explanations_pd_2.columns)
shap.plots.bar(shap_exp)
train_df.write.mode('overwrite').save_as_table('DIAMONDS_TRAIN')