# Model Utils

> Functions Used In Modeling Efforts

In [None]:
#| default_exp modeling.utils

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
#| export

from data_system_utilities.azure.storage import FileHandling
from data_system_utilities.snowflake.utils import make_stage_query_generator

from machine_learning_utilities import preprocessing

from LTBP.data.utils import snowflake_query, get_yaml_dicts, generate_data_lake_query

from sklearn.model_selection import train_test_split

from rfpimp import *  # noqa:

import os
import logging
import pickle
import pandas as pd

  warn_incompatible_dep(


In [None]:
#| export




def return_dict_type(
    pre_process_type: dict  # {k:v} dictionary of columns name and tranformation type
):
    """
    Simplify the standard process for sklearn preprocessing pipelines
    """
    for k, v in pre_process_type.items():
        if v == "OrdinalEncoder":
            pre_process_dict = {
                f"{k}": {
                    "transformation": {
                        "name": "OrdinalEncoder",
                        "args": {
                            "handle_unknown": "use_encoded_value",
                            "unknown_value": -1,
                        },
                    },
                    "variable_type": "cat",
                }
            }
        if v == "OneHotEncoder":
            pre_process_dict = {
                f"{k}": {
                    "transformation": {
                        "name": "OneHotEncoder",
                        "args": {"handle_unknown": "ignore", "sparse": False},
                    },
                    "variable_type": "cat",
                }
            }
        if v == "StandardScaler":
            pre_process_dict = {
                f"{k}": {
                    "transformation": {"name": "StandardScaler", "args": {}},
                    "variable_type": "cont",
                }
            }
        if v == "RobustScaler":
            pre_process_dict = {
                f"{k}": {
                    "transformation": {"name": "RobustScaler", "args": {}},
                    "variable_type": "cont",
                }
            }
    return pre_process_dict


In [None]:
from data_system_utilities.file_parsers import yaml

features=yaml.yaml_reader('./LTBP/files/yaml_files/features.yaml')
experiment_name='BASELINE'
cat_vars =[{f.lower() : values['transformation'][experiment_name]} for f, values in features.items() 
            if values['var_type'][experiment_name] == 'cat'
            and values['input_definition'] != 'LABEL']

return_dict_type(cat_vars[0])

{'destinationgeoafinitylabel': {'transformation': {'name': 'OrdinalEncoder',
   'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}},
  'variable_type': 'cat'}}

In [None]:
#| hide
test_eq(return_dict_type(cat_vars[0]).keys(), cat_vars[0].keys())
test_eq(return_dict_type(cat_vars[0])[list(cat_vars[0].keys())[0]].keys(), ['transformation', 'variable_type'])

In [None]:
#| export


def create_sklearn_preprocess_baseline_dict(
    cat_vars: list,  # list of categorical variables with sklearn transformer
    cont_vars: list,  # list of continous variables with sklearn transformer
):
    """wrapper around ``return_dict_type`` to go through cat and cont vars
    """
    final_dict = {}
    if cat_vars is None:
        cat_vars = []
    if cont_vars is None:
        cont_vars = []
    for item in cat_vars + cont_vars:
        final_dict.update(return_dict_type(item))
    return final_dict


In [None]:
features=yaml.yaml_reader('./LTBP/files/yaml_files/features.yaml')
experiment_name='BASELINE'

cat_vars =[{f.lower() : values['transformation'][experiment_name]} for f, values in features.items() 
            if values['var_type'][experiment_name] == 'cat'
            and values['input_definition'] != 'LABEL']
cont_vars =[{f.lower(): values['transformation'][experiment_name]} for f, values in features.items() 
            if values['var_type'][experiment_name] == 'cont'
            and values['input_definition'] != 'LABEL']

feature_dict = create_sklearn_preprocess_baseline_dict(cat_vars=cat_vars, 
                                                       cont_vars=cont_vars)
logging.info(feature_dict)

INFO:root:{'destinationgeoafinitylabel': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'evercorepass': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'everpass': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'gendercode': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'guestbehavior': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'isepicmixactivated': {'transformation': {'name': 'OrdinalEncoder', 'args': {'handle_unknown': 'use_encoded_value', 'unknown_value': -1}}, 'variable_type': 'cat'}, 'marketingzone': {'transfo

In [None]:
#| hide
test_eq(feature_dict[list(cat_vars[0].keys())[0]].keys(), ['transformation', 'variable_type'])

In [None]:
#| export


def return_list_of_vars(variables):
    """returns lists key"""
    if variables is None:
        return None
    vars_list = []
    for item in variables:
        for k in item.keys():
            vars_list.append(k)
    return vars_list


In [None]:
cat_vars = return_list_of_vars(cat_vars)
cont_vars = return_list_of_vars(cont_vars)
logging.info(f'categorical variables: \n {cat_vars}')
logging.info(f'continous variables: \n {cont_vars}')

INFO:root:categorical variables: 
 ['destinationgeoafinitylabel', 'evercorepass', 'everpass', 'gendercode', 'guestbehavior', 'isepicmixactivated', 'marketingzone', 'mostcommonticketcomp', 'mostsubseasonvisited', 'mostvisitedregion', 'mostvisitedresort', 'onlysingleresortkey', 'partnerresortscannerflag', 'skierabilitylabel', 'totalseasonsscanned', 'visitmostinpeak']
INFO:root:continous variables: 
 ['age', 'avgvisitperseason', 'resortsvisited', 'subseasonsperyear', 'totalvisits']


In [None]:
#| hide
feature_dict=yaml.yaml_reader('./LTBP/files/yaml_files/features.yaml')
test_eq(cat_vars, [f.lower() for f, values in features.items() 
                    if values['var_type'][experiment_name] == 'cat'
                    and values['input_definition'] != 'LABEL'])
test_eq(cont_vars, [f.lower() for f, values in features.items() 
                    if values['var_type'][experiment_name] == 'cont'
                    and values['input_definition'] != 'LABEL'])

In [None]:
#| export


def prepare_training_set(df: pd.DataFrame,
                         y_var: list,
                         y_scaler_type: object,
                         sklearn_pipe: object,
                         etl_dict: dict,
                         models_dict: dict,
                         adls_path: str,
                         experiment_name: str,
                         connection_str: str,
                         identifiers: list = ['ECID'],
                         test_set: bool = True,
                         validation_split: float = .20,
                         test_split: float = .15,
                         seed: int = 1320,
                         as_type=int):
    """TODO: Working on Multi-Col Labels split and preprocess data set for model training purposes"""
    scaler = y_scaler_type
    # Sklearn basic split method
    X_train, X_valid, y_train, y_valid = train_test_split(df,
                                                          df[y_var].astype(as_type),
                                                          test_size=validation_split,
                                                          random_state=seed)
    if test_set is True:
        X_valid, X_test, y_valid, y_test = train_test_split(X_valid,
                                                            y_valid,
                                                            test_size=test_split,
                                                            random_state=seed)
        logging.info(f'Successfully Spilt Data\nTrain: {X_train.shape}, {y_train.shape}\nValid: {X_valid.shape}, {y_valid.shape}\nTest: {X_test.shape}, {y_test.shape}')
    else:
        y_test = None
        X_test = None
        logging.info(f'Successfully Spilt Data\nTrain: {X_train.shape}, {y_train.shape}\nValid: {X_valid.shape}, {y_valid.shape}')
    id_list = X_test[identifiers] if test_set is True else X_valid[identifiers]
    logging.info(f'Size of the id_list for the hold set {id_list.shape}')
    if scaler:
        y_train = scaler.fit_transform(y_train.reset_index()[y_var[0]])
        y_train = pd.DataFrame(y_train)
        y_train.columns = [y_var]
        y_valid = scaler.transform(y_valid.reset_index()[y_var])
        y_valid = pd.DataFrame(y_valid)
        y_valid.columns = [y_var]
        if test_set is True:
            y_test = scaler.transform(y_test.reset_index()[y_var])
            y_test = pd.DataFrame(y_test)
            y_test.columns = [y_var]
    else:
        logging.info('This project relies on the query to have accurate labels with no preprocessing..')
        y_train = y_train.reset_index()[y_var]
        y_train = pd.DataFrame(y_train)
        y_train.columns = [y_var]
        y_valid = y_valid.reset_index()[y_var]
        y_valid = pd.DataFrame(y_valid)
        y_valid.columns = [y_var]
        if test_set is True:
            y_test = y_test.reset_index()[y_var]
            y_test = pd.DataFrame(y_test)
            y_test.columns = [y_var]

    if scaler:
        logging.info('saving y_var scaler to adls')
        save_sklearn_object_to_data_lake(save_object=scaler,
                                         file_name=(os.environ.get('CI_COMMIT_SHA', 'LocalRunNBS')
                                                    + models_dict[experiment_name]['y_preprocess_object_name']),
                                         adls_path=adls_path,
                                         container_name=etl_dict['azure_container'],
                                         connection_str=connection_str)

    X_train = sklearn_pipe.fit_transform(X_train)
    cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
    X_train = pd.DataFrame(X_train)
    X_train.columns = cols

    X_valid = sklearn_pipe.transform(X_valid)
    cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
    X_valid = pd.DataFrame(X_valid)
    X_valid.columns = cols
    if test_set is True:
        X_test = sklearn_pipe.transform(X_test)
        cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
        X_test = pd.DataFrame(X_test)
        X_test.columns = cols

    save_sklearn_object_to_data_lake(save_object=sklearn_pipe,
                                     file_name=(os.environ.get('CI_COMMIT_SHA', 'LocalRunNBS')
                                                + models_dict[experiment_name]['x_preprocess_object_name']),
                                     adls_path=adls_path,
                                     container_name=etl_dict['azure_container'],
                                     connection_str=connection_str)
    return X_train, X_valid, X_test, y_train, y_valid, y_test, sklearn_pipe, scaler, id_list


In [None]:
# scaler_type = None
# test_set=True
# experiment_name='BASELINE'
# sklearn_pipe=pipe
# etl=etl
# models=models

# adls_path = os.path.join((os.path.join(etl['data_lake_path'], 'experiments', experiment_name)
#     if experiment 
#     else os.path.join(etl_dict['data_lake_path'], 
#     os.environ.get('CI_COMMIT_SHA', 'LocalRunNBS')))
#     , models['preprocessors_adls_path'])
# result = prepare_training_set(df,
#                               y_var=[k.upper() for k, v in features.items() if v['input_definition'] == 'LABEL'],
#                               y_scaler_type=models[experiment_name]['y_scaler_type'],
#                               adls_path=adls_path,
#                               sklearn_pipe=pipe,
#                               test_set=True,
#                               etl_dict=etl,
#                               models_dict=models,
#                               connection_str=os.environ[models["connection_str"]],
#                               identifiers=['ECID']
#                               )
# X_train, X_valid, X_test, y_train, y_valid, y_test, sklearn_pipe, scaler, id_list = result

This one needs a test, but as of right now holding off on this test as this test would take a little big more time than I want to spend on the documentation of this process at the current moment

The idea of this function is to ensure that the user is using the pre-processor in the correct fashion so that the validation set is not being considered in the pre-processing dictionary

In [None]:
#| export


def preprocess_data(X_train, X_valid, X_test, sklearn_pipe):
    X_train = sklearn_pipe.fit_transform(X_train)
    cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
    X_train = pd.DataFrame(X_train)
    X_train.columns = cols

    X_valid = sklearn_pipe.transform(X_valid)
    cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
    X_valid = pd.DataFrame(X_valid)
    X_valid.columns = cols

    X_test = sklearn_pipe.transform(X_test)
    cols = preprocessing.get_column_names_from_transformer(sklearn_pipe)
    X_test = pd.DataFrame(X_test)
    X_test.columns = cols

    return X_train, X_valid, X_test, sklearn_pipe

save_sklearn_object_to_data_lake

This function is simply wrapping DSU functionality together to allow for a model to be pushed to adls these tests are written and evaluated inside of DSU

In [None]:
#| export


def save_sklearn_object_to_data_lake(
    save_object, file_name, adls_path, container_name, connection_str
):
    """moves a sklearn object to azure data lake as a pickle file at a given path"""
    logging.info(
        f"Pushing Sklearn Object to Azure: {os.path.join(adls_path, file_name)}"
    )
    with open(file_name, "wb") as f:
        pickle.dump(save_object, f)
    az = FileHandling(connection_str)
    az.upload_file(
        azure_file_path=adls_path,
        local_file_path=file_name,
        container_name=container_name,
        overwrite=True,
    )
    os.unlink(file_name)
    logging.info(f"{file_name} successfully pushed to {adls_path}")

Will generate a test for this at a later time as LTBP doesn't need this type of massaging



The y_var inside of LTBP isn't a great use case, but RVF where we might want to scale the y_var in regression or time series using MinMaxScaler or StandardScaler this will give the flexibility needed

In [None]:
#| skip


def y_var_scaler(y_train, y_valid, y_test, y_var, scaler_type):
    """Write Doc String"""
    y_train = scaler_type.fit_transform(y_train.reset_index()[[y_var]])
    y_train = pd.DataFrame(y_train)
    y_train.columns = [y_var]
    y_valid = scaler_type.transform(y_valid.reset_index()[[y_var]])
    y_valid = pd.DataFrame(y_valid)
    y_valid.columns = [y_var]
    y_test = scaler_type.transform(y_test.reset_index()[[y_var]])
    y_test = pd.DataFrame(y_test)
    y_test.columns = [y_var]
    return y_train, y_valid, y_test, scaler

In [None]:
#| export


def create_stage_and_query_stage_sf(
    sf,  # Snowflake connection
    etl: dict,  # template etl input expected format
    udf_inputs: dict,  # template udf input expected format
    train_or_inference: str,  # training or inference
    experiment_name: str,  # name of experiment being ran
    indentification: list = ['ECID'],  # list of identification defaults to ECID
    experiment: bool = True,  # Boolean fed to function from script to say if its an experiment
    extra_statement: str = None,  # defaults to None to allow for experimentation
):
    features, udf_inputs, etl = get_yaml_dicts(['features.yaml', 'udf_inputs.yaml', 'etl.yaml'])
    stage_url = f"""azure://{etl['azure_account']}.blob.core.windows.net/{etl['azure_container']}/{etl['data_lake_path']}{(os.path.join('experiments', experiment_name)
        if experiment else os.path.join('LocalRunTest'))}""".replace(' ', '')
    stage_query = make_stage_query_generator(
        stage_name=etl["stage_name"] + etl['FY_folder'] + os.environ.get('CI_COMMIT_SHA', 'LocalRunTest'),
        url=stage_url,
        sas_token=os.environ["DATALAKE_SAS_TOKEN_SECRET"],
        file_type="parquet",
    )
    sf = snowflake_query()
    _ = sf.run_sql_str(stage_query)
    # TODO: Figure out a identification feature like season year
    # Udf grain is ECID, which is easy to get, but season year isn't obivous some thought is needed
    indentification = indentification if indentification is not None else [col.split('.')[-1] for col in udf_inputs[train_or_inference]['UDF_GRAIN']]
    columns = [col.upper() for col in features.keys()]
    query = generate_data_lake_query(stage_name=(etl["stage_name"]
                                                 + etl['FY_folder']
                                                 + os.environ.get('CI_COMMIT_SHA', 'LocalRunTest')),
                                     stage_path=train_or_inference.lower()+'_data/',
                                     columns=indentification + columns,
                                     extra_statement=extra_statement)
    logging.info(f'adls snowflake stage query {query}')
    sf = snowflake_query()
    df = sf.run_sql_str(query)
    logging.info(f'Preview dataframe queried {df.head()}')
    return df

In [None]:
# skip
# stage_url = f"""azure://{etl_dict['azure_account']}.blob.core.windows.net/
# {etl['azure_container']}/{etl_dict['data_lake_path']}{
# (os.path.join('experiments', etl_dict['exp_name'])
# if experiment 
# else os.path.join(os.environ.get('CI_COMMIT_SHA', 'LocalRunTest')))}""".replace('\n', '')
# stage_query = make_stage_query_generator(
#     stage_name=etl["stage_name"] + etl['FY_folder'] + os.environ.get('CI_COMMIT_SHA', 'LocalRunTest'),
#     url=stage_url,
#     sas_token=os.environ["DATALAKE_SAS_TOKEN_SECRET"],
#     file_type="parquet",
# )
# _ = sf.run_sql_str(stage_query)
# # TODO: Figure out a identification feature like season year 
# # Udf grain is ECID, which is easy to get, but season year isn't obivous some thought is needed
# indentification = [col.split('.')[-1] for col in udf_inputs[train_or_inference]['UDF_GRAIN']]
# columns = [col.upper() for col in features.keys()]
# query = generate_data_lake_query(stage_name=(etl["stage_name"] 
#                                              + etl['FY_folder'] 
#                                              + os.environ.get('CI_COMMIT_SHA', 'LocalRunTest')),
#      stage_path=train_or_inference.lower()+'_data/',
#      columns=indentification + columns,
#      extra_statement=None)
# logging.info(f'adls snowflake stage query {query}')
# df = sf.run_sql_str(query)
# logging.info(f'Preview dataframe queried {df.head()}')

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()