In [1]:
import sys
sys.path.append('/Users/AndrewKatsipanos/Library/CloudStorage/OneDrive-KubrickGroup/Documents/Snowpark/sfguide-getting-started-machine-learning/hol')
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.window import Window
import preprocessing
import optuna
from xgboost import XGBRegressor
import sklearn
import json

import getpass
import pandas as pd

  from pandas import MultiIndex, Int64Index


In [2]:
connection_parameters = json.load(open('../connection1.json'))
session = Session.builder.configs(connection_parameters).create()

In [3]:
df = session.table('SALARIES_FINAL')
df.limit(5).to_pandas()

Unnamed: 0,YEAR,EXPERIENCE,EMPLOYMENT,JOB,SALARY,EMPLOYEE_RESIDENCE,REMOTE_RATIO,COMPANY_LOCATION,COMPANY_SIZE
0,2023,SE,FT,Principal Data Scientist,85847,ES,100,ES,L
1,2023,MI,CT,ML Engineer,30000,US,100,US,S
2,2023,MI,CT,ML Engineer,25500,US,100,US,S
3,2023,SE,FT,Data Scientist,175000,CA,100,CA,M
4,2023,SE,FT,Data Scientist,120000,CA,100,CA,M


In [150]:
df.select('Job').limit(5).to_pandas()

Unnamed: 0,JOB
0,Principal Data Scientist
1,ML Engineer
2,ML Engineer
3,Data Scientist
4,Data Scientist


In [151]:
def sproc_preprocess(session:Session, 
                     raw_training_table: str) -> T.Variant:
    df = session.table(raw_training_table)

    df = df.with_column('JOB', 
                        F.regexp_replace('JOB', '[^a-zA-Z0-9]+', '_'))
                        

    cols_to_ohe = [x for x in df.columns if x not in ['REMOTE_RATIO', 'SALARY']]
    one_hot_encoder = preprocessing.OneHotEncoder(input_cols=cols_to_ohe)
    one_hot_encoder.fit(df)
    df = one_hot_encoder.transform(df)

    min_max = preprocessing.MinMaxScaler(input_cols=['REMOTE_RATIO'])
    min_max.fit(df)
    df = min_max.transform(df)

    df.write.save_as_table(table_name='ENCODED_SALARIES_FINAL', mode='overwrite')    
    return 'Preporcessing complete'


In [152]:
preprocessing_path = preprocessing.__path__[0]
print(preprocessing_path)

/Users/AndrewKatsipanos/Library/CloudStorage/OneDrive-KubrickGroup/Documents/Snowpark/sfguide-getting-started-machine-learning/hol/preprocessing


In [153]:
sproc_preprocess = session.sproc.register(func=sproc_preprocess,
                                     name='sproc_preprocess',
                                     is_permanent=True,
                                     replace=True,
                                     stage_location='@ML_MODELS',
                                     packages=['snowflake-snowpark-python','scipy'],
                                     imports=[preprocessing_path])

In [154]:
sproc_preprocess('SALARIES_FINAL',
                session = session)

'"Preporcessing complete"'

In [155]:
df_encoded = session.table('ENCODED_SALARIES_FINAL')
df_encoded.limit(5).to_pandas()

Unnamed: 0,SALARY,YEAR_2020,YEAR_2021,YEAR_2022,YEAR_2023,EXPERIENCE_EN,EXPERIENCE_EX,EXPERIENCE_MI,EXPERIENCE_SE,EMPLOYMENT_CT,...,JOB_PRINCIPAL_DATA_SCIENTIST,JOB_PRINCIPAL_MACHINE_LEARNING_ENGINEER,JOB_PRODUCT_DATA_ANALYST,JOB_PRODUCT_DATA_SCIENTIST,JOB_RESEARCH_ENGINEER,JOB_RESEARCH_SCIENTIST,JOB_SOFTWARE_DATA_ENGINEER,JOB_STAFF_DATA_ANALYST,JOB_STAFF_DATA_SCIENTIST,REMOTE_RATIO
0,85847,0,0,0,1,0,0,0,1,0,...,1,0,0,0,0,0,0,0,0,1.0
1,30000,0,0,0,1,0,0,1,0,1,...,0,0,0,0,0,0,0,0,0,1.0
2,25500,0,0,0,1,0,0,1,0,1,...,0,0,0,0,0,0,0,0,0,1.0
3,175000,0,0,0,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,1.0
4,120000,0,0,0,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,1.0


In [157]:
def sproc_create_transformer(session:Session, 
                    raw_training_table: str,
                    transformer_name:str) -> T.Variant:
    df = session.table(raw_training_table).to_pandas()
    from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
    from sklearn.compose import ColumnTransformer
    cols_to_ohe = [x for x in df.columns if x not in ['REMOTE_RATIO', 'SALARY']]
    col_trans = ColumnTransformer([('ohe', OneHotEncoder(sparse = False, handle_unknown = 'ignore'), cols_to_ohe),
                                    ('mm', MinMaxScaler(), ['REMOTE_RATIO'])])
    X = df.drop(['SALARY'], axis=1)
    X = col_trans.fit_transform(X)
    from joblib import dump
    dump(col_trans, '/tmp/'+transformer_name)
    session.file.put('/tmp/'+transformer_name, '@ML_MODELS', auto_compress=False, overwrite=True)
    return 'Transformer successfully created'

In [158]:
sproc_create_transformer = session.sproc.register(func=sproc_create_transformer,
                                     name='sproc_create_transformer',
                                     is_permanent=True,
                                     replace=True,
                                     stage_location='@ML_MODELS',
                                     packages=['snowflake-snowpark-python',
                                                'scikit-learn==1.1.1',
                                                'joblib'])

In [159]:
sproc_create_transformer('SALARIES_FINAL',
                         'transformer.sav',
                         session=session)

'"Transformer successfully created"'

In [160]:
def sproc_train(session: Session, 
                training_table: str, 
                feature_cols: list,
                target_col: str,
                model_name: str) -> T.Variant:
    
    local_training_data =  session.table(training_table).to_pandas()

    X = local_training_data[feature_cols]
    y = local_training_data[target_col]

    def objective(trial):
        params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 1000),
        'max_depth': trial.suggest_int('max_depth', 1, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.001, 0.5),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
        'gamma': trial.suggest_loguniform('gamma', 1e-8, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.3, 1),
        'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-8, 10.0),
        'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-8, 10.0),
        'scale_pos_weight': trial.suggest_int('scale_pos_weight',1,5000)
    }
    

        classifier_obj = XGBRegressor(**params,
                                        random_state=123,
                                        verbosity=0,
                                        booster = 'gbtree')

        score = sklearn.model_selection.cross_val_score(classifier_obj, X, y, n_jobs=1, cv=5, scoring='r2')
        r2 = score.mean()
        trial.set_user_attr(key="best_booster", value=classifier_obj)
        return r2
    
    # Callback to get best model
    def callback(study, trial):
        if study.best_trial.number == trial.number:
            study.set_user_attr(key="best_booster", value=trial.user_attrs["best_booster"])

    # Start Optimizing
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=100, callbacks=[callback])
    
    # Fit best model on data
    best_model=study.user_attrs["best_booster"]
    best_model.fit(X, y)
    
    # Save model as file and upload to Snowflake stage
    from joblib import dump
    dump(best_model, '/tmp/'+model_name)
    session.file.put('/tmp/'+model_name, '@ML_MODELS', auto_compress=False, overwrite=True)
    return study.best_trial.params

In [161]:
# Get the path of optuna package
import optuna
optuna_path = optuna.__path__[0]
print(optuna_path)

# Get path of cmaes package
import cmaes
cmaes_path = cmaes.__path__[0]
print(cmaes_path)

/Users/AndrewKatsipanos/anaconda3/envs/pysnowpark/lib/python3.8/site-packages/optuna
/Users/AndrewKatsipanos/anaconda3/envs/pysnowpark/lib/python3.8/site-packages/cmaes


In [162]:
sproc_train = session.sproc.register(func=sproc_train,
                                     name='sproc_train',
                                     is_permanent=True,
                                     replace=True,
                                     stage_location='@ML_MODELS',
                                     packages=['snowflake-snowpark-python',
                                                'scikit-learn==1.1.1',
                                                'xgboost==1.5.0',
                                                'joblib',
                                                'sqlalchemy',
                                                'tqdm',
                                                'colorlog'],
                                    imports=[optuna_path,cmaes_path])

In [163]:
training_table = 'ENCODED_SALARIES_FINAL'
feature_cols = df_encoded.columns
feature_cols.remove('SALARY')
target_col = 'SALARY'
model_name = 'xgb_hp_model.sav'

In [164]:
best_trial_params = sproc_train(training_table,
                                feature_cols,
                                target_col,
                                model_name,
                                session=session)
best_trial_params

'{\n  "colsample_bytree": 0.4715671319655674,\n  "gamma": 3.377652971972353e-05,\n  "learning_rate": 0.17322895181770787,\n  "max_depth": 1,\n  "min_child_weight": 9,\n  "n_estimators": 361,\n  "reg_alpha": 0.03204219273432517,\n  "reg_lambda": 0.018296918796761773,\n  "scale_pos_weight": 2400,\n  "subsample": 0.8834392630252407\n}'

In [165]:
pd.DataFrame(session.sql('list @ML_MODELS').collect())

Unnamed: 0,name,size,md5,last_modified
0,ml_models/c0a1a4820eff87add349c94b1ffa0ed32ac3...,353920,b7e75b05a438d4f78fb0cf4d27264351,"Mon, 15 May 2023 11:50:30 GMT"
1,ml_models/c63fd01776483bfda20e4ab09ae5ed464375...,13616,1f1ae49d787ef2b87b24b907e8b8eb8a,"Mon, 15 May 2023 11:50:30 GMT"
2,ml_models/d07ed8871d596551623ada8eafba6e999502...,9312,a9b2930cbb2e5d4f6c538d5ec8829b82,"Mon, 15 May 2023 12:11:40 GMT"
3,ml_models/model.joblib.gz,4560,3a458792adf5bc64edfeaaac1aeb9ea0,"Thu, 11 May 2023 14:40:41 GMT"
4,ml_models/transformer.joblib.gz,2768,7316d7b76fe1d8c6edb57fbed27f194a,"Thu, 11 May 2023 14:40:41 GMT"
5,ml_models/transformer.sav,8112,8901a69ec8edcc49657178ea314c78b4,"Wed, 17 May 2023 12:29:18 GMT"
6,ml_models/xgb_hp_model.sav,202224,2de522ace4519afbf739dcd35242d419,"Wed, 17 May 2023 12:39:10 GMT"


In [166]:
from cachetools import cached

@cached(cache={})
def load_model(model_path: str) -> object:
    from joblib import load
    model = load(model_path)
    return model


def udf_predict(df:pd.DataFrame) -> pd.Series:
    import os
    import sys
    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = 'xgb_hp_model.sav'
    model = load_model(import_dir+model_name)
    scored_data = pd.Series(model.predict(df))
    return scored_data

In [167]:
udf_predict = session.udf.register(func=udf_predict, 
                                    name="udf_predict", 
                                    stage_location='@ML_MODELS',
                                    input_types=[T.FloatType()]*len(feature_cols),
                                    return_type = T.FloatType(),
                                    replace=True, 
                                    is_permanent=True, 
                                    imports=['@ML_MODELS/xgb_hp_model.sav'],
                                    packages=['scikit-learn==1.1.1','pandas','joblib','cachetools','xgboost==1.5.0'], 
                                    session=session)

In [112]:
scored_df = df_encoded.with_column('PREDICTION', udf_predict(*feature_cols))
# scored_sdf.write.save_as_table(table_name='CREDIT_RISK_PREPARED_BALANCED_TEST_SCORED', mode='overwrite')
scored_df.limit(5).to_pandas()

Unnamed: 0,SALARY,YEAR_2020,YEAR_2021,YEAR_2022,YEAR_2023,EXPERIENCE_EN,EXPERIENCE_EX,EXPERIENCE_MI,EXPERIENCE_SE,EMPLOYMENT_CT,...,JOB_PRINCIPAL_MACHINE_LEARNING_ENGINEER,JOB_PRODUCT_DATA_ANALYST,JOB_PRODUCT_DATA_SCIENTIST,JOB_RESEARCH_ENGINEER,JOB_RESEARCH_SCIENTIST,JOB_SOFTWARE_DATA_ENGINEER,JOB_STAFF_DATA_ANALYST,JOB_STAFF_DATA_SCIENTIST,REMOTE_RATIO,PREDICTION
0,85847,0,0,0,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,1.0,147819.765625
1,30000,0,0,0,1,0,0,1,0,1,...,0,0,0,0,0,0,0,0,1.0,134357.890625
2,25500,0,0,0,1,0,0,1,0,1,...,0,0,0,0,0,0,0,0,1.0,134357.890625
3,175000,0,0,0,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,1.0,139880.25
4,120000,0,0,0,1,0,0,0,1,0,...,0,0,0,0,0,0,0,0,1.0,139880.25
