In [None]:
# access data from snowflake
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *

connection_parameters = {
    "account": "",
    "user": "", 
    "host": "", # e.g. "sn00111.snowflakecomputing.com"
    "password": "",
    "role": "ACCOUNTADMIN",
    "warehouse": "HOL_WH",
    "database": "HOL_DB",
    "schema": "PUBLIC"
    }
session = Session.builder.configs(connection_parameters).create()

maintenance_df = session.table('maintenance')
humidity_df = session.table('humidity')
hum_udi_df = session.table('city_udf')

# Look at Each of the dataframes

In [None]:
maintenance_df.to_pandas().head()

In [None]:
humidity_df.to_pandas().head()

In [None]:
hum_udi_df.to_pandas().head()

In [None]:
# join together the dataframes and prepare training dataset
maintenance_city = maintenance_df.join(hum_udi_df, ["UDI"])
maintenance_hum = maintenance_city.join(humidity_df, (maintenance_city.col("CITY") == humidity_df.col("CITY_NAME"))).select(col("TYPE"), 
col("AIR_TEMPERATURE_K"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED_RPM"), col("TORQUE_NM"), col("TOOL_WEAR_MIN"), col("HUMIDITY_RELATIVE_AVG"), col("MACHINE_FAILURE"))

In [None]:
# write training set to snowflake and materialize the data frame into a pandas data frame
maintenance_hum.write.mode("overwrite").save_as_table("MAINTENANCE_HUM")
maintenance_hum_df = session.table('MAINTENANCE_HUM').to_pandas()

In [None]:
# drop redundant column
maintenance_hum_df = maintenance_hum_df.drop(columns=["TYPE"])

# Build Model that predicts machine failure

In [None]:
# split data into train and test
import numpy as np
from sklearn.model_selection import train_test_split
y = maintenance_hum_df[["MACHINE_FAILURE"]].to_numpy()
X = maintenance_hum_df.drop(columns=["MACHINE_FAILURE"]).to_numpy()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = .25, random_state = 123)

In [None]:
from sklearn.linear_model import LogisticRegression
logistic_model = LogisticRegression(random_state=0).fit(X_train, y_train)

In [None]:
# evaluate model on test
from sklearn import metrics
from sklearn.metrics import roc_auc_score, roc_curve, RocCurveDisplay
import matplotlib.pyplot as plt
y_pred = logistic_model.predict_proba(X_test)[:,1]
fpr, tpr, thresholds = roc_curve(y_test, y_pred)
roc_auc = metrics.auc(fpr, tpr)
display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc,
                                   estimator_name='example estimator')
display.plot()
plt.show()

In [None]:
# auc score
roc_auc_score(y_test, y_pred)

# Variable Importance

In [None]:
from sklearn.inspection import permutation_importance
feature_names = ['AIR_TEMPERATURE_K',
       'PROCESS_TEMPERATURE', 'ROTATIONAL_SPEED_RPM', 'TORQUE_NM',
       'TOOL_WEAR_MIN', 'HUMIDITY_RELATIVE_AVG']
result = permutation_importance(
    logistic_model, X_test, y_test, n_repeats=10, random_state=42, n_jobs=2
)

forest_importances = pd.Series(result.importances_mean, index=feature_names)
fig, ax = plt.subplots()
forest_importances.plot.bar(yerr=result.importances_std, ax=ax)
ax.set_title("Feature importances using permutation on full model")
ax.set_ylabel("Mean accuracy decrease")
fig.tight_layout()
plt.show()

# Deploy Model to Snowflake

In [None]:
# dump model to local directory
import pickle
pickle.dump(logistic_model, open('model.pkl', 'wb'))

In [None]:
# create stage
session.sql("CREATE OR REPLACE STAGE HOL_DB.PUBLIC.maint_stage").collect()

In [None]:
# push model to stage in snowflake
# Model.get_model_path(model_name = 'mfr_model', version = 1, _workspace= ws)
session.file.put('model.pkl', "HOL_DB.PUBLIC.maint_stage", auto_compress=False, overwrite=True)

In [None]:
session.clear_imports()
session.clear_packages()

import cachetools
from snowflake.snowpark.types import PandasSeries, PandasDataFrame

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@maint_stage/model.pkl')
session.add_packages('pandas','scikit-learn','cachetools')

@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        model_file = import_dir + filename
        with open(model_file,'rb') as f:
            model = pickle.load(f)
            return model

@udf(name='predict_failure',session=session,replace=True,is_permanent=True,stage_location='@maint_stage')
def predict_failure(df: PandasDataFrame[int, int, int, int, int, int]) -> PandasSeries[float]:
    import sklearn
    import pandas as pd
    df.columns = ['AIR_TEMPERATURE_K', 'PROCESS_TEMPERATURE', 'ROTATIONAL_SPEED_RPM','TORQUE_NM','TOOL_WEAR_MIN','HUMIDITY_RELATIVE_AVG']
    model = load_model('model.pkl')
    return model.predict_proba(df)[:,1]