In [None]:
#Snowpark for Python
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

import numpy as np
#Override np.float_ with np.float64
np.float_ = np.float64

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier

# data science libs
import pandas as pd  
# need to add numpy code from previous notebook to handle int64 issue for numpy
import matplotlib.pyplot as plt 
import seaborn as sns 

from snowflake.ml.modeling.metrics import mean_absolute_percentage_error 

# other libs 
import json
import joblib 
import cachetools 

# warning suppression 
import warnings; warnings.simplefilter('ignore')


In [None]:
# Get active session (current snowflake session)
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# add a query tag to the session 
session.query_tag = {"origin":"sf_sit-is","name":"e2e_ml_snowparkpython", "version":{"major":1,"minor":0}}

# Set session context
session.use_role("ACCOUNTADMIN")

# get current solution prefix from warehouse name
solution_prefix = session.get_current_warehouse()
#.strip("_").split("_DS_WH")[0]

# Get the current role, warehouse, and database/schema
print(f"Current role: {session.get_current_role()} | Current warehouse: {session.get_current_warehouse()} | DB SCHEMA: {session.sql('select current_database(), current_schema()').collect()}")

In [None]:
# Data Loading
# note that by default this is a snowpark/snowflake data frame
diamonds_df = session.table('DIAMONDS')
diamonds_df

In [None]:
#strip double quotes from column names

# Function to strip double quotes from column names
def strip_double_quotes_from_column_names(df):
    new_columns = [col.replace('"', '') for col in df.columns]
    return df.to_df(*new_columns)

# Apply the function to the DataFrame
diamonds_df = strip_double_quotes_from_column_names(diamonds_df)


In [None]:
# Categorize all the features for processing
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To store the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_", "X", "Y", "Z"]

LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']


In [None]:
# load the preprocessing model which alreadt exists in the model registry
# model_registry = Registry(session, database_name="DATASCIENCE", schema_name="PUBLIC")
# preprocessing_pipeline = model_registry.get_model('pre_process_diamond')


In [None]:
#session.use_database(f"{solution_prefix}_PROD")
#session.use_schema("ANALYTICS")
db=session.get_current_database()
#.strip('')

# Construct the file path using the solution_prefix
file_path = f"@{db}.PUBLIC.models/preprocessing_pipeline.joblib.gz"

session.file.get(file_path, '/tmp')
PIPELINE_FILE = "/tmp/preprocessing_pipeline.joblib.gz"
preprocessing_pipeline = joblib.load(PIPELINE_FILE)


In [None]:

diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)
train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
# apply the preprocessing pipeline to the training and test data frames
#train_df = preprocessing_pipeline.transform(diamonds_train_df)
test_df = preprocessing_pipeline.transform(diamonds_test_df)



In [None]:
# create the model - a regression ML model from the XGBoost ML library 
regressor = XGBRegressor(
    input_cols=CATEGORICAL_COLUMNS_OE + NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)
#train the model
regressor.fit(train_df)
#do the prediction with the model and put the prediction into a snowpark dataframe
result = regressor.predict(test_df)


In [None]:
# using the same model do another prediction into a pandas dataframe
regressor.predict(test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].to_pandas())

# Display a smiley face upon successful execution
from IPython.display import display, HTML
display(HTML('<h1>😊</h1>'))



In [None]:
mape = mean_absolute_percentage_error(df=result,
                                      y_true_col_names="PRICE",
                                      y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE")
print(f"Mean absolute percentage error: {mape}")




In [None]:
# Plot actual price vs predicted price
g = sns.relplot(data=result["PRICE","PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE",y="PREDICTED_PRICE", kind="scatter")
#add a red trend line
g.ax.axline((0,0), slope=1, color="r")
plt.show()

In [None]:
# hyperparameters are parameters which control how an ML model learns from the data for example
# the kind of distance measurement to be used in a k-nearest neighbour model
# parameters regularly are output from an ML model but in this case hyperparameters control how
# the ml model learns. Since hyperparameters dictate how the model learns, it is a good idea
# to find what the best hyperparameters are so as to ensure your model learns in the best way
# GridSearchCV is a tool that can be used to find the best hyperparameter settings for your model
# see: https://datagy.io/sklearn-gridsearchcv/#:~:text=In%20this%20tutorial%2C%20you’ll%20learn%20how%20to%20use,for%20the%20best%20model%20is%20Scikit-Learn’s%20GridSearchCV%20class.
#One way to tune your hyper-parameters is to use a grid search. This is probably the simplest method as well as the most crude. In a grid search, you try a 
#grid of hyper-parameters and evaluate the performance of each combination of hyper-parameters.

#estimator= takes an estimator object, such as a classifier or a regression model.
#param_grid= takes a dictionary or a list of dictionaries. The dictionaries should be key-value pairs, where the key is the hyper-parameter and the value are the cases of hyper-parameter values to test.
#cv= takes an integer that determines the cross-validation strategy to apply. If None is passed, then 5 is used.
#scoring= takes a string or a callable. This represents the strategy to evaluate the performance of the test set.
#n_jobs= represents the number of jobs to run in parallel. Since this is a time-consuming process, running more jobs in parallel (if your computer can handle it) can speed up the process.

# tested this 10/05/2025 started at 13:32 - finished around 13:33 (so very fast actually)

# use GridSearchCV

grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators": [100, 200, 300, 400, 500, 600],
        "learning_rate": [0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs=5,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)


In [None]:
print(grid_search.to_sklearn().best_estimator_)


In [None]:
# using the mean average percentage error score (MAPE) to guage the percentage of error
# which result from the hyperparameters 'learning_rate' and 'n_estimators' we already know
# from selecting the model with the best parameters that the best combination of these two
#hyperparameters is a learning_rate of 4 combined with n_estimator of 500, but we can now
# see this below visually

gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1
gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})
sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")
plt.show()

In [None]:
result = grid_search.predict(test_df)
result = result.to_pandas()
#result.select("PRICE", "PREDICTED_PRICE")
result = result[["PRICE", "PREDICTED_PRICE"]]
#result
#result.select("PRICE", "PREDICTED_PRICE").show()
mape = mean_absolute_percentage_error(df=result,
                                      y_true_col_names="PRICE",
                                      y_pred_col_names="PREDICTED_PRICE")
#result.select("PRICE", "PREDICTED_PRICE").show()
#print(f"Mean absolute percentage error: {mape}")
