In [40]:
import ast
import json
import warnings
import os

import pandas as pd
import pickle
import yaml

In [41]:
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.metrics import accuracy_score
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.preprocessing import OneHotEncoder
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.registry import Registry
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
from snowflake.snowpark import types as T
from snowflake.snowpark.functions import col

warnings.simplefilter(action="ignore", category=UserWarning)

In [42]:
session = Session.builder.configs(SnowflakeLoginOptions()).getOrCreate()

model_version = yaml.safe_load(open("../params.yaml"))["model"]["version"]

In [43]:
titanic_df = session.table("titanic")

In [44]:
titanic_df.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SURVIVED"  |"PCLASS"  |"AGE"  |"SIBSP"  |"PARCH"  |"FARE"   |"ADULT_MALE"  |"DECK"  |"ALIVE"  |"ALONE"  |"SEX"   |"EMBARKED"  |"CLASS"  |"WHO"  |"EMBARK_TOWN"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0           |3         |22.00  |1        |0        |7.2500   |True          |NULL    |False    |False    |MALE    |S           |THIRD    |MAN    |SOUTHAMPTON    |
|1           |1         |38.00  |1        |0        |71.2833  |False         |C       |True     |False    |FEMALE  |C           |FIRST    |WOMAN  |CHERBOURG      |
|1           |3         |26.00  |0        |0        |7.9250   |False         |NULL    |True     |True     |FEMALE  |S           |THIRD    |WOMAN  |SOUTHAMPTON    |
|1           |1 

In [45]:
# Columns with null values and their respective counts
{
    column_name: count_of_null
    for column_name, count_of_null in {
        col_name: titanic_df.where(col(col_name).is_null()).count()
        for col_name in titanic_df.columns
    }.items()
    if count_of_null > 0
}

{'AGE': 177, 'DECK': 688, 'EMBARKED': 2, 'EMBARK_TOWN': 2}

In [46]:
titanic_df = titanic_df.drop(
    ["AGE", "DECK", "ALIVE", "ADULT_MALE", "EMBARKED", "SEX", "PCLASS", "ALONE"]
)

In [47]:
titanic_df = titanic_df.withColumn("FARE", titanic_df["FARE"].astype(T.FloatType()))

titanic_df.show()

------------------------------------------------------------------------------
|"SURVIVED"  |"SIBSP"  |"PARCH"  |"CLASS"  |"WHO"  |"EMBARK_TOWN"  |"FARE"   |
------------------------------------------------------------------------------
|0           |1        |0        |THIRD    |MAN    |SOUTHAMPTON    |7.25     |
|1           |1        |0        |FIRST    |WOMAN  |CHERBOURG      |71.2833  |
|1           |0        |0        |THIRD    |WOMAN  |SOUTHAMPTON    |7.925    |
|1           |1        |0        |FIRST    |WOMAN  |SOUTHAMPTON    |53.1     |
|0           |0        |0        |THIRD    |MAN    |SOUTHAMPTON    |8.05     |
|0           |0        |0        |THIRD    |MAN    |QUEENSTOWN     |8.4583   |
|0           |0        |0        |FIRST    |MAN    |SOUTHAMPTON    |51.8625  |
|0           |3        |1        |THIRD    |CHILD  |SOUTHAMPTON    |21.075   |
|1           |0        |2        |THIRD    |WOMAN  |SOUTHAMPTON    |11.1333  |
|1           |1        |0        |SECOND   |CHILD  |

In [48]:
cat_cols = ["CLASS", "WHO", "EMBARK_TOWN"]
num_cols = ["SIBSP", "PARCH", "FARE"]

In [49]:
impute_cat = SimpleImputer(
    input_cols=cat_cols,
    output_cols=cat_cols,
    strategy="most_frequent",
    drop_input_cols=True,
)

titanic_df = impute_cat.fit(titanic_df).transform(titanic_df)
titanic_df.show()

------------------------------------------------------------------------------
|"CLASS"  |"WHO"  |"EMBARK_TOWN"  |"SURVIVED"  |"SIBSP"  |"PARCH"  |"FARE"   |
------------------------------------------------------------------------------
|THIRD    |MAN    |SOUTHAMPTON    |0           |1        |0        |7.25     |
|FIRST    |WOMAN  |CHERBOURG      |1           |1        |0        |71.2833  |
|THIRD    |WOMAN  |SOUTHAMPTON    |1           |0        |0        |7.925    |
|FIRST    |WOMAN  |SOUTHAMPTON    |1           |1        |0        |53.1     |
|THIRD    |MAN    |SOUTHAMPTON    |0           |0        |0        |8.05     |
|THIRD    |MAN    |QUEENSTOWN     |0           |0        |0        |8.4583   |
|FIRST    |MAN    |SOUTHAMPTON    |0           |0        |0        |51.8625  |
|THIRD    |CHILD  |SOUTHAMPTON    |0           |3        |1        |21.075   |
|THIRD    |WOMAN  |SOUTHAMPTON    |1           |0        |2        |11.1333  |
|SECOND   |CHILD  |CHERBOURG      |1           |1   

In [50]:
with open('../models/imputer.pkl', 'wb') as file: 
      
    # A new file will be created 
    pickle.dump(impute_cat, file) 


In [51]:
OHE = OneHotEncoder(
    input_cols=cat_cols,
    output_cols=cat_cols,
    drop_input_cols=True,
    drop="first",
    handle_unknown="ignore",
)

titanic_df = OHE.fit(titanic_df).transform(titanic_df)
titanic_df.show()

  state_pandas[[_CATEGORY]] = state_pandas[[_CATEGORY]].applymap(convert_to_string_excluding_nan)
  state_pandas[[_FITTED_CATEGORY]] = state_pandas[[_FITTED_CATEGORY]].applymap(convert_to_string_excluding_nan)
  state_pandas[[_CATEGORY]] = state_pandas[[_CATEGORY]].applymap(convert_to_string_excluding_nan)
  state_pandas[[_FITTED_CATEGORY]] = state_pandas[[_FITTED_CATEGORY]].applymap(convert_to_string_excluding_nan)


--------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CLASS_SECOND"  |"CLASS_THIRD"  |"WHO_MAN"  |"WHO_WOMAN"  |"EMBARK_TOWN_QUEENSTOWN"  |"EMBARK_TOWN_SOUTHAMPTON"  |"SURVIVED"  |"SIBSP"  |"PARCH"  |"FARE"   |
--------------------------------------------------------------------------------------------------------------------------------------------------------------
|0.0             |1.0            |1.0        |0.0          |0.0                       |1.0                        |0           |1        |0        |7.25     |
|0.0             |0.0            |0.0        |1.0          |0.0                       |0.0                        |1           |1        |0        |71.2833  |
|0.0             |1.0            |0.0        |1.0          |0.0                       |1.0                        |1           |0        |0        |7.925    |
|0.0             |0.0            |0.0        |

In [52]:
with open('../models/onehotencoder.pkl', 'wb') as file: 
      
    # A new file will be created 
    pickle.dump(OHE, file) 


In [53]:
data_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/data'
titanic_pd = pd.DataFrame(titanic_df.collect())
cleaned_dir = os.makedirs(data_dir + "/training", exist_ok=True)
titanic_pd.to_csv("../data/training/titanic.csv", index=False)



In [None]:
train_df, test_df = titanic_df.random_split(weights=[0.8, 0.2], seed=8)

In [None]:
parameters = {
    "n_estimators": [100, 200, 300, 400, 500],
    "learning_rate": [0.1, 0.2, 0.3, 0.4, 0.5],
    "max_depth": list(range(3, 6, 1)),
    "min_child_weight": list(range(1, 6, 1)),
}

In [None]:
parameters

In [None]:
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} SET WAREHOUSE_SIZE=LARGE;"
).collect()

Data scientists may not have the ability to change the warehouse size.  They will usually have access to a larger warehouse and can easily switch as well using session.use_warehouse('bigger_warehouse')

In [None]:
grid_search = GridSearchCV(
    estimator=XGBClassifier(),
    param_grid=parameters,
    n_jobs=-1,
    scoring="accuracy",
    input_cols=train_df.drop("SURVIVED").columns,
    label_cols="SURVIVED",
    output_cols="PRED_SURVIVED",
)

# Train
grid_search.fit(train_df)

In [None]:
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} SET WAREHOUSE_SIZE=XSMALL;"
).collect()

In [None]:
result = grid_search.predict(test_df)

In [None]:
accuracy = accuracy_score(
    df=result, y_true_col_names="SURVIVED", y_pred_col_names="PRED_SURVIVED"
)

print(f"Accuracy: {accuracy}")

In [None]:
# Print each combination of hyperparameters with their accuracy
results = grid_search.to_sklearn().cv_results_
data = {"accuracy": results["mean_test_score"]}
for i, param in enumerate(results["params"]):
    for key, value in param.items():
        if key not in data:
            data[key] = [None] * len(results["params"])
        data[key][i] = value

# Create DataFrame
hp_df = pd.DataFrame(data).sort_values(by="accuracy", ascending=False)
hp_df.head()

# Model Registry


In [None]:
optimal_model = grid_search.to_sklearn().best_estimator_

In [None]:
# create function to add one to our model versions if it already exists


def check_and_update(df, model_name):
    if df.empty:
        return "V_1"
    elif df[df["name"] == model_name].empty:
        return "V_1"
    else:
        # Increment model_version if df is not a pandas Series
        lst = sorted(ast.literal_eval(df["versions"][0]))
        last_value = lst[-1]
        prefix, num = last_value.rsplit("_", 1)
        new_last_value = f"{prefix}_{int(num)+1}"
        lst[-1] = new_last_value
        return new_last_value

In [None]:
# Create Registry Object
reg = Registry

# Get sample input data to pass into the registry logging function
X = train_df.drop("SURVIVED").limit(100)


# Define model name and version (use uppercase for name)

titanic_model = reg.log_model(
    model_name="TITANIC",
    version_name=model_version,
    model=optimal_model,
    sample_input_data=X,
)

# Add evaluation metric
titanic_model.set_metric(
    metric_name="accuracy",
    value=hp_df["accuracy"][0],
)

session.file.put("../models/imputer.pkl", "@ml_data/models_" + model_version, overwrite=True)
session.file.put("../models/onehotencoder.pkl", "@ml_data/models_" + model_version, overwrite=True)
session.file.put("../data/training/titanic.csv", "@ml_data/data_" + model_version + "/training", overwrite=True)

In [None]:
reg.show_models()

In [None]:
hyperparameters = {
    k: v for k, v in optimal_model.get_params().items() if v and k != "missing"
}
titanic_model.set_metric(metric_name="hyperparameters", value=hyperparameters)

In [None]:
pd.options.display.max_colwidth = 500
reg.get_model(model_name).show_versions()

If you have multiple versions of the model, we want the UDF to be deployed as the version with the highest accuracy


In [None]:
reg_df = reg.get_model(model_name).show_versions()
reg_df["accuracy"] = reg_df["metadata"].apply(
    lambda x: json.loads(x)["metrics"]["accuracy"]
)
best_model = reg_df.sort_values(by="accuracy", ascending=False)

In [None]:
deployed_version = best_model["name"].iloc[0]
deployed_version

Set the default version to the deployed version (best model)

In [None]:
m = reg.get_model(model_name)
m.default = deployed_version
mv = m.default
mv.version_name

In [None]:
remote_prediction = mv.run(test_df, function_name="predict_proba")
remote_prediction.show()

In [None]:
# To test in SQL write test data back to a table

test_df.write.mode("overwrite").save_as_table("TEST_DATA")

## Add images to stage for Streamlit App


In [None]:
session.file.put("../streamlit_images/*", "@ML_DATA")

## Calling Model from SQL 

In [None]:
#Copy this code in a snowflake worksheet or run via session.sql
inference_df = session.sql('''
select *, TITANIC!predict_proba(*):output_feature_0
as surv_pred
from (
select * exclude survived
from test_data)
            ''')
inference_df.show()

# Calling model from a new notebook

In [None]:
# Point to the registry

reg = Registry(session=session)

# Get the default version of your model (Model with best accuracy in our case)

mv = reg.get_model("titanic").default

remote_prediction = mv.run(test_df, function_name="predict_proba")
remote_prediction.drop('"output_feature_0"').with_column_renamed(
    '"output_feature_1"', "pred_survived"
).show()

## To delete your model and all of it's versions

In [None]:
#reg.delete_model("TITANIC")