In [2]:
# !pip install scikit-learn
# !pip install plotly
# !pip install matplotlib
# !pip install pyarrow==8.0.0
# !pip install refractml 

In [1]:
import os
from snowflake.snowpark.session import Session
import configparser
import pandas as pd
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import pandas as pd
from sklearn.model_selection import train_test_split
import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score,classification_report,confusion_matrix,confusion_matrix, f1_score
from refractml import *
from refractml.constants import MLModelFlavours
import numpy as np
from joblib import load, dump
%matplotlib inline
import inspect
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix, classification_report
from tqdm.auto import tqdm



# Connecting to Snowflake

In [2]:
config = configparser.ConfigParser()
config.read("snowflake_connection.ini")

connection_parameters = {
    "user": f'{config["Snowflake"]["user"]}',
    "password": f'{config["Snowflake"]["password"]}',
    "account": f'{config["Snowflake"]["account"]}',
    "WAREHOUSE": f'{config["Snowflake"]["WAREHOUSE"]}',
    "DATABASE": f'{config["Snowflake"]["DATABASE"]}',
    "SCHEMA": f'{config["Snowflake"]["SCHEMA"]}'
}

def snowflake_connector(conn):
    try:
        session = Session.builder.configs(conn).create()
        print("connection successful!")
    except:
        raise ValueError("error while connecting with db")
    return session

session = snowflake_connector(connection_parameters)

# code to write data to snowflake table
# df = pd.read_csv("/data/employees_data.csv")
# df.columns = map(str.upper, df.columns)
# session.write_pandas(df, table_name="ATTRITION", auto_create_table=True)

connection successful!


In [7]:
table_name = 'ATTRITION'
df = session.table(f'{table_name}').to_pandas()

In [11]:
df.head(3)

Unnamed: 0,SALARY,SENIORITY,TENURE_MONTHS,MONTHS_AFTER_COLLEGE,BIRTH_YEAR,MAPPED_ROLE_CLEAN,SEX,ETHNICITY,HOSPITAL_TYPE,HOSPITAL_OWNERSHIP,COMPANY_NAME,CITY_STATE,DISTANCE,DEGREE_CLEAN,SCHOOL_ENDDATE,JOB_STARTDATE,JOB_ENDDATE,USER_ID,OVERTIME_HOURS,CHURN
0,37806.933,1,88,3,1983,social,F,White,Psychiatric,Voluntary non-profit - Private,Adventist Health System Sunbelt Healthcare Corp.,"Orlando, FL",<2mi,Masters_Degree,2008-01-31,2008-04-01,2015-10-01,870835513,14,False
1,39922.66,1,7,13,1995,nurse,F,White,Acute Care Hospitals,Proprietary,Tenet Healthcare Corp.,"Modesto, CA",<2mi,Bachelors_Degree,2020-01-01,2021-02-01,2021-10-01,18255040,10,True
2,29633.849,1,25,11,1991,nurse,F,White,Acute Care Hospitals,Voluntary non-profit - Church,Mayo Clinic,"Rochester, MN",<2mi,Other,2015-01-01,2015-12-01,,813877070,3,False


In [None]:
departments = df[["USER_ID","MAPPED_ROLE_CLEAN","HOSPITAL_TYPE", "HOSPITAL_OWNERSHIP","COMPANY_NAME"]]
roster = df[["USER_ID","JOB_STARTDATE","JOB_ENDDATE"]]
compensation = df[["USER_ID","SALARY"]]
tenure = df[["USER_ID","TENURE_MONTHS", "MONTHS_AFTER_COLLEGE"]]
empolyee_master = df[["USER_ID", ""]]


# Stage Creation

In [4]:
#create the stage for storing the ML models
session.sql('CREATE OR REPLACE STAGE ML_MODELS_HR').show()

-------------------------------------------------
|"status"                                       |
-------------------------------------------------
|Stage area ML_MODELS_HR successfully created.  |
-------------------------------------------------



# Model Training

SENDING PYTHON TRAINING FILE TO SNOWFLAKE STAGE

In [5]:
f_path = 'train_model_sp.py'
ml_stage = 'ML_MODELS_HR'
experiment_name = 'HR_CHURN_V2'
session.file.put(f_path, f'@{ml_stage}/{experiment_name}', auto_compress=False, overwrite=True)

[PutResult(source='train_model_sp.py', target='train_model_sp.py', source_size=7266, target_size=7280, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

REGISTRING THE PYTHON TRAINING FILE AS SPROC

In [6]:
sproc_name = 'train_model_sp'
sql = f'''
    create or replace procedure train_model_sp(
        table_name varchar, 
        target_col varchar,
        excluded_cols array,
        experiment_name varchar, 
        stage varchar)
    returns variant
    language python
    runtime_version = '3.8'
    packages = ('snowflake-snowpark-python' ,'pandas', 'numpy', 'scikit-learn')
    imports = ('@{ml_stage}/{experiment_name}/{sproc_name}.py')
    handler = '{sproc_name}.main'
    ;
'''
session.sql(sql).collect()

[Row(status='Function TRAIN_MODEL_SP successfully created.')]

EXECUTING THE TRAINING SPROC THROUGH SNOWPARK SQL

In [7]:
ml_stage = 'ML_MODELS_HR'
experiment_name = 'HR_CHURN_V2'
target_col = 'CHURN'
table_name = "ATTRITION"
excluded_cols = ["USER_ID", "JOB_STARTDATE", "JOB_ENDDATE", "SCHOOL_ENDDATE"]
sql = f'''
    CALL train_model_sp(
        '{table_name}',
        '{target_col}',
        PARSE_JSON('{str(excluded_cols).replace("'", '"')}'),
        '{experiment_name}',
        '{ml_stage}'
    )
'''


train_results = pd.DataFrame(session.sql(sql).collect())['TRAIN_MODEL_SP'][0]
print(train_results)

{
  "f1-score": {
    "False": 0.8319672131147541,
    "True": 0.7657142857142858,
    "accuracy": 0.8042959427207638,
    "macro avg": 0.79884074941452,
    "weighted avg": 0.8083280421210309
  },
  "precision": {
    "False": 0.9290617848970252,
    "True": 0.6683291770573566,
    "accuracy": 0.8042959427207638,
    "macro avg": 0.7986954809771909,
    "weighted avg": 0.8360318926010099
  },
  "recall": {
    "False": 0.7532467532467533,
    "True": 0.8963210702341137,
    "accuracy": 0.8042959427207638,
    "macro avg": 0.8247839117404334,
    "weighted avg": 0.8042959427207638
  },
  "support": {
    "False": 539,
    "True": 299,
    "accuracy": 0.8042959427207638,
    "macro avg": 838,
    "weighted avg": 838
  }
}


DOWNLOADING FILES BACK TO REFRACT

In [6]:
# Downloading files to local storages
local_directory = 'snowflake_files'
files = ['ml_pipeline.joblib', 'X.csv', 'X_test.csv', 'X_train.csv', 'clf_report.csv', 'cnf_matrix.csv',
         'feature_importances.csv', 'y.csv', 'y_test.csv', 'y_train.csv']

os.makedirs(local_directory, exist_ok=True)
for file in tqdm(files):
    session.file.get(f'@{ml_stage}/{experiment_name}/{file}', local_directory)

# Load artifacts
model = joblib.load(f'{local_directory}/ml_pipeline.joblib')

  0%|          | 0/10 [00:00<?, ?it/s]

In [9]:
display(model)

# PREDICTION

STORING THE PREDICTION PYTHON FILE IN SNOWFLAKE STAGE

In [10]:
f_path = 'predict_churn_udf.py'
session.file.put(f_path, f'@{ml_stage}/{experiment_name}', auto_compress=False, overwrite=True)

[PutResult(source='predict_churn_udf.py', target='predict_churn_udf.py', source_size=1008, target_size=1024, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

REGISTERING PREDICTION FILE AS SNOWPARK UDF

In [11]:
udf_name = 'predict_churn_udf'
sql = f'''
    create or replace function predict_churn_udf(
        args array, 
        staged_model_f_name varchar,
        stage_name varchar
        )
    returns variant
    language python
    runtime_version = '3.8'
    packages = ('snowflake-snowpark-python' ,'pandas', 'cachetools', 'joblib','scikit-learn')
    imports = ('@{ml_stage}/{experiment_name}/predict_churn_udf.py', '@{ml_stage}/{experiment_name}/ml_pipeline.joblib')
    handler = 'predict_churn_udf.main'
    ;
'''
session.sql(sql).collect()

[Row(status='Function PREDICT_CHURN_UDF successfully created.')]

In [7]:
table_name = 'ATTRITION'
sdf = session.table(f'{table_name}')
pdf = sdf.toPandas()

PREDICTING ON THE ENTIRE DATA

In [8]:
# Create Predictions
pred_sdf = sdf.with_column('CHURN_PREDICT', F.call_udf('predict_churn_udf', F.array_construct(
            'SALARY', 'SENIORITY', 'TENURE_MONTHS', 'MONTHS_AFTER_COLLEGE', 'BIRTH_YEAR', 'OVERTIME_HOURS',
            'MAPPED_ROLE_CLEAN', 'SEX', 'ETHNICITY', 'HOSPITAL_TYPE', 'HOSPITAL_OWNERSHIP', 'COMPANY_NAME', 'CITY_STATE', 'DISTANCE', 'DEGREE_CLEAN'
            ), 'ml_pipeline.joblib', ml_stage))

pred_sdf.write.mode("overwrite").save_as_table("EMPLOYEES_CHURN_PREDICTED", table_type="transient")

pred_df = session.table('EMPLOYEES_CHURN_PREDICTED').limit(10).to_pandas()

Failed to execute query [queryID: 01ac2a2c-0503-5757-0072-f30309ac21b6]  CREATE  OR  REPLACE  TRANSIENT  TABLE  EMPLOYEES_CHURN_PREDICTED AS  SELECT  *  FROM ( SELECT "SALARY", "SENIORITY", "TENURE_MONTHS", "MONTHS_AFTER_COLLEGE", "BIRTH_YEAR", "MAPPED_ROLE_CLEAN", "SEX", "ETHNICITY", "HOSPITAL_TYPE", "HOSPITAL_OWNERSHIP", "COMPANY_NAME", "CITY_STATE", "DISTANCE", "DEGREE_CLEAN", "SCHOOL_ENDDATE", "JOB_STARTDATE", "JOB_ENDDATE", "USER_ID", "OVERTIME_HOURS", "CHURN", predict_churn_udf(array_construct("SALARY", "SENIORITY", "TENURE_MONTHS", "MONTHS_AFTER_COLLEGE", "BIRTH_YEAR", "OVERTIME_HOURS", "MAPPED_ROLE_CLEAN", "SEX", "ETHNICITY", "HOSPITAL_TYPE", "HOSPITAL_OWNERSHIP", "COMPANY_NAME", "CITY_STATE", "DISTANCE", "DEGREE_CLEAN"), 'ml_pipeline.joblib', 'ML_MODELS_HR') AS "CHURN_PREDICT" FROM ATTRITION)
100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/sit

SnowparkSQLException: (1304): 01ac2a2c-0503-5757-0072-f30309ac21b6: 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 969, in _finalize_columns_and_data
    columns = _validate_or_indexify_columns(contents, columns)
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 1017, in _validate_or_indexify_columns
    raise AssertionError(
AssertionError: 5 columns passed, passed data had 15 columns

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/udf/32355341555708802/predict_churn_udf.py", line 27, in main
    row = pd.DataFrame([args], columns=inputs)
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/frame.py", line 746, in __init__
    arrays, columns, index = nested_data_to_arrays(
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 510, in nested_data_to_arrays
    arrays, columns = to_arrays(data, columns, dtype=dtype)
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 875, in to_arrays
    content, columns = _finalize_columns_and_data(arr, columns, dtype)
  File "/usr/lib/python_udf/0aba92af4bda7d2cbc536906794153a09bfe8885a9a99157663d4bcb296c9854/lib/python3.8/site-packages/pandas/core/internals/construction.py", line 972, in _finalize_columns_and_data
    raise ValueError(err) from err
ValueError: 5 columns passed, passed data had 15 columns
 in function PREDICT_CHURN_UDF with handler predict_churn_udf.main

In [14]:
def main(model, X):
    return bool(model.predict(X)[0])

pred_1 = pred_df[:1]
pred_1 = pred_1.drop(["CHURN","CHURN_PREDICT"], axis=1)
main(model, pred_df)

False

In [15]:
pred_sdf = pred_sdf.to_pandas()

# Registering Model in Refract

In [16]:
pred_sdf = pred_sdf[pred_sdf["JOB_ENDDATE"].apply(lambda x: x is not None)]
from math import ceil
pred_sdf = pred_sdf[pred_sdf["JOB_ENDDATE"].apply(lambda x: x is not None)]
train = pred_sdf[:ceil(len(pred_sdf)*0.75)]
pred = train = pred_sdf[ceil(len(pred_sdf)*0.75):]

In [17]:
y_prob = np.array(
    np.choice([0.6, 0.75, 0.86],1) if i == 1 else np.choice([0.3,0.40, 0.45],1) for i in [np.array(pred["CHURN_PREDICT"])])

@scoring_func
def score(model, request):
    payload = request.json["payload"]
    inputs = [ 'SALARY', 'SENIORITY', 'TENURE_MONTHS', 'MONTHS_AFTER_COLLEGE', 'BIRTH_YEAR', 'OVERTIME_HOURS','MAPPED_ROLE_CLEAN',
              'SEX', 'ETHNICITY', 'HOSPITAL_TYPE', 'HOSPITAL_OWNERSHIP', 'COMPANY_NAME', 'CITY_STATE', 'DISTANCE', 'DEGREE_CLEAN']
    payload = {k:v for k, v in eval(payload).items() if k in inputs}
#     print(payload)
    data = pd.DataFrame(payload)
    prediction = pd.Series(model.predict(data))[0]
    return prediction

y_test = pred["CHURN"]
y_pred = pred["CHURN_PREDICT"]
X_train = train.drop(["CHURN", "CHURN_PREDICT"], axis=1)
X_test = pred.drop(["CHURN", "CHURN_PREDICT"], axis=1)
y_train = train["CHURN"]

In [18]:
# pred_1 = X_train[:1]
# import requests
# req = requests.Request()
# req.json = {"payload":pred_1.head(1).to_json()}
# y = req
# score(model, y)

In [20]:
## registering the model in refract.
tmp = register_model(model, 
               score, 
               name="HR_ANALYTICS_Snowflake", 
               description="HR ANALYTICS model trained on Snowflake",
               flavour=MLModelFlavours.sklearn,
               model_type="classification",
               y_true=y_test,
               y_pred=y_pred, 
               features=X_train.columns,
               labels=[False,True], 
               init_script="" ,
               input_type="json", 
               explain_ai=True,
               prob=y_prob,
               x_train=X_train, 
               x_test=X_test, 
               y_train=y_train.tolist(),
               y_test=y_test.tolist(),
               feature_names=X_train.columns.tolist(),
               original_features=X_train.columns.tolist(),
               feature_ids=X_train.columns,
               target_names=['NOT LEFT','LEFT'],
               kyd=True, kyd_score = True)


Exception: Unable to register model. Please try after some time

{'payload': '{"SALARY":{"0":29633.849},"SENIORITY":{"0":1},"TENURE_MONTHS":{"0":25},"MONTHS_AFTER_COLLEGE":{"0":11},"BIRTH_YEAR":{"0":1991},"MAPPED_ROLE_CLEAN":{"0":"nurse"},"SEX":{"0":"F"},"ETHNICITY":{"0":"White"},"HOSPITAL_TYPE":{"0":"Acute Care Hospitals"},"HOSPITAL_OWNERSHIP":{"0":"Voluntary non-profit - Church"},"COMPANY_NAME":{"0":"Mayo Clinic"},"CITY_STATE":{"0":"Rochester, MN"},"DISTANCE":{"0":"<2mi"},"DEGREE_CLEAN":{"0":"Other"},"SCHOOL_ENDDATE":{"0":"2015-01-01"},"JOB_STARTDATE":{"0":"2015-12-01"},"JOB_ENDDATE":{"0":null},"USER_ID":{"0":813877070},"OVERTIME_HOURS":{"0":3}}'}

In [9]:
## getting data back to refract
table_name = 'EMPLOYEES_CHURN_PREDICTED'
sdf = session.table(f'{table_name}')
pdf = sdf.toPandas()

In [10]:
pdf.head()

Unnamed: 0,SALARY,SENIORITY,TENURE_MONTHS,MONTHS_AFTER_COLLEGE,BIRTH_YEAR,MAPPED_ROLE_CLEAN,SEX,ETHNICITY,HOSPITAL_TYPE,HOSPITAL_OWNERSHIP,...,CITY_STATE,DISTANCE,DEGREE_CLEAN,SCHOOL_ENDDATE,JOB_STARTDATE,JOB_ENDDATE,USER_ID,OVERTIME_HOURS,CHURN,CHURN_PREDICT
0,37806.933,1,88,3,1983,social,F,White,Psychiatric,Voluntary non-profit - Private,...,"Orlando, FL",<2mi,Masters_Degree,2008-01-31,2008-04-01,2015-10-01,870835513,14,False,False
1,52052.06,2,21,44,1987,technologist,F,White,Acute Care Hospitals,Voluntary non-profit - Private,...,"Cleveland, OH",>10mi,Bachelors_Degree,2010-01-01,2013-09-01,2015-08-01,810298642,14,True,True
2,78365.115,4,40,88,1983,nurse,F,White,Psychiatric,Voluntary non-profit - Private,...,"Orlando, FL",<2mi,Professional_Certificate,2008-01-01,2015-05-01,2018-10-01,344881095,13,False,False
3,47710.345,1,7,31,1992,nurse,M,Black,Acute Care Hospitals,Proprietary,...,"Detroit, MI",>10mi,Masters_Degree,2018-01-01,2020-08-01,,566752971,14,False,True
4,58243.676,1,30,98,1985,nurse,F,White,Acute Care Hospitals,Voluntary non-profit - Private,...,"Cleveland, OH",>10mi,Bachelors_Degree,2010-01-01,2018-03-01,2020-11-01,810298642,13,False,False


In [12]:
pdf.to_csv("/data/HR_analytics_data_scored.csv", index=False)