In [23]:
#default_exp modeling.premodel

In [24]:
#hide
from nbdev.showdoc import *
from sdsde import files
from sdsde.snowflake.query import SnowflakeConnect
from sdsde.wrapper.azurewrapper import blob_pusher
from sdsde.wrapper.azurewrapper import blob_puller

# Pre-modeling Functionality

These functions are designed to help with anything in the pre-modeling stage of the ML life cycle.

In [25]:
#export
import os
import sys
import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("azure.core").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.CRITICAL)
logging.getLogger("snowflake.connector").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)

## Data Lake Stages

### `stage_query_generator`

In [26]:
#export


def stage_query_generator(stage_name, url, sas_token, file_type='parquet'):
    """generates the snowflake query needed to create an external stage in
    azure blob

    Args:
    * stage_name (str): name of the stage in snowflake
    * url (str): azure formated string for account, container, and path
    * sas_token (str): blob sas token for shared access
    * file_type (str, optional): type of files expected in stage. Defaults to 'parquet'. Can use 'csv' as well.

    Returns:
    * str: snowflake query to create stage
    """
    stage_template = '''
    create or replace stage STAGE_NAME_HERE
      url='URL_HERE'
      credentials=(azure_sas_token='SAS_TOKEN_HERE')
      encryption=(type= 'NONE')
      file_format = (type = FILE_TYPE_HERE);
    '''
    stage_query = stage_template.replace('STAGE_NAME_HERE', stage_name)
    stage_query = stage_query.replace('SAS_TOKEN_HERE', sas_token)
    stage_query = stage_query.replace('URL_HERE', url)
    stage_query = stage_query.replace('FILE_TYPE_HERE', file_type)
    return stage_query

In [27]:
show_doc(stage_query_generator)

<h4 id="stage_query_generator" class="doc_header"><code>stage_query_generator</code><a href="__main__.py#L4" class="source_link" style="float:right">[source]</a></h4>

> <code>stage_query_generator</code>(**`stage_name`**, **`url`**, **`sas_token`**, **`file_type`**=*`'parquet'`*)

generates the snowflake query needed to create an external stage in
azure blob

Args:
* stage_name (str): name of the stage in snowflake
* url (str): azure formated string for account, container, and path
* sas_token (str): blob sas token for shared access
* file_type (str, optional): type of files expected in stage. Defaults to 'parquet'. Can use 'csv' as well.

Returns:
* str: snowflake query to create stage

In [28]:
query = stage_query_generator('STAGE_NAME_HERE', 'SAS_TOKEN_HERE', 'URL_HERE')
print(query)


    create or replace stage STAGE_NAME_HERE
      url='SAS_TOKEN_HERE'
      credentials=(azure_sas_token='SAS_TOKEN_HERE')
      encryption=(type= 'NONE')
      file_format = (type = parquet);
    


### `make_data_lake_stage`

In [29]:
#export


def make_data_lake_stage(sf_connection,
                         stage_name,
                         account,
                         container,
                         data_lake_path,
                         sas_token,
                         file_type='parquet'):
    """creates a data lake staging environment from snowflake

    Args:
    * sf_connection (``SnowflakeConnect``): snowflake connection
    * stage_name (str): name of stage in snowflake
    * account (str): blob storage account
    * container (str): blob container
    * data_lake_path (str): path in the container to stage in
    * sas_token (str): shared access token for blob
    * file_type (str, optional): type of files to stage. Defaults to 'parquet'.
    """
    stage_url = f'azure://{account}.blob.core.windows.net/{container}/{data_lake_path}'
    stage_query = stage_query_generator(stage_name, stage_url, sas_token, file_type='parquet')
    sf_connection.run_str_query(stage_query)

In [30]:
show_doc(make_data_lake_stage)

<h4 id="make_data_lake_stage" class="doc_header"><code>make_data_lake_stage</code><a href="__main__.py#L4" class="source_link" style="float:right">[source]</a></h4>

> <code>make_data_lake_stage</code>(**`sf_connection`**, **`stage_name`**, **`account`**, **`container`**, **`data_lake_path`**, **`sas_token`**, **`file_type`**=*`'parquet'`*)

creates a data lake staging environment from snowflake

Args:
* sf_connection (``SnowflakeConnect``): snowflake connection
* stage_name (str): name of stage in snowflake
* account (str): blob storage account
* container (str): blob container
* data_lake_path (str): path in the container to stage in
* sas_token (str): shared access token for blob
* file_type (str, optional): type of files to stage. Defaults to 'parquet'.

In [31]:
sf = SnowflakeConnect(sfAccount=os.environ['sfAccount'],
                   sfUser=os.environ['sfUser'],
                   sfPswd=os.environ['sfPswd'],
                   sfWarehouse=os.environ['sfWarehouse'],
                   sfDatabase=os.environ['sfDatabase'],
                   sfSchema=os.environ['sfSchema'],
                   sfRole=os.environ['sfRole'])

make_data_lake_stage(sf_connection=sf, 
                     stage_name='sdsdetest',
                     account=os.environ['azure_account'], 
                     container='sdsdetesting', 
                     data_lake_path='sdsde_library/testing', 
                     sas_token=os.environ['DATALAKE_SAS_TOKEN_SECRET'])

INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:connection to snowflake successful
INFO:sdsde.snowflake.query:testing connection
INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:executing query
INFO:sdsde.snowflake.query:data loaded from snowflake
INFO:sdsde.snowflake.query:connection to snowflake has been turned off
INFO:sdsde.snowflake.query:Stage area SDSDETEST successfully created.


## Feature Set Pulls

### `pull_static_feature_set_to_data_lake`

In [32]:
#export


def pull_static_feature_set_to_data_lake(sf_connection: object,
                                         stage_name: str,
                                         data_lake_path: str,
                                         features: list,
                                         grain='ECID',
                                         limit_statement='',
                                         overwrite=True,
                                         ):
    """given a list of features and a modeling gain this pulls data from snowflake to a
    data lake in raw file format. data will be in the format specified by the stage.
    parquet with snappy is recommended.

    Args:
    * sf_connection (object): snowflake connection
    * stage_name (str): stage name (already created)
    * data_lake_path (str): where in the data lake to dump data
    * features (list): feature set
    * grain (str, optional): gain from feature store for rows. Defaults to 'ECID'.
    * limit_statement (str, optional): limit statement to insert to SQL ie "limit 1000". Defaults to ''. Used for debugging.
    * overwrite (bool, optional): overwrite existing data or not. Defaults to True.
    """

    full_stage_path = os.path.join(stage_name, data_lake_path)
    select_query = f'''
    copy into @{full_stage_path} from
    (
        select
            {grain}
            FEATURES_HERE
        from "MACHINELEARNINGFEATURES"."PROD".FEATURESTORE_{grain}
        {limit_statement}
    )
    overwrite = {overwrite}
    '''
    logger.info(f'Pulling {len(features)} {grain} features to the datalake at path {data_lake_path}')
    for feature in features:
        select_query = select_query.replace('FEATURES_HERE', f', {feature}FEATURES_HERE')
    select_query = select_query.replace('FEATURES_HERE', '')
    response = sf_connection.run_str_query(select_query)
    logger.info(f'Copy response\n{response}')

In [33]:
show_doc(pull_static_feature_set_to_data_lake)

<h4 id="pull_static_feature_set_to_data_lake" class="doc_header"><code>pull_static_feature_set_to_data_lake</code><a href="__main__.py#L4" class="source_link" style="float:right">[source]</a></h4>

> <code>pull_static_feature_set_to_data_lake</code>(**`sf_connection`**:`object`, **`stage_name`**:`str`, **`data_lake_path`**:`str`, **`features`**:`list`, **`grain`**=*`'ECID'`*, **`limit_statement`**=*`''`*, **`overwrite`**=*`True`*)

given a list of features and a modeling gain this pulls data from snowflake to a
data lake in raw file format. data will be in the format specified by the stage.
parquet with snappy is recommended.

Args:
* sf_connection (object): snowflake connection
* stage_name (str): stage name (already created)
* data_lake_path (str): where in the data lake to dump data
* features (list): feature set
* grain (str, optional): gain from feature store for rows. Defaults to 'ECID'.
* limit_statement (str, optional): limit statement to insert to SQL ie "limit 1000". Defaults to ''. Used for debugging.
* overwrite (bool, optional): overwrite existing data or not. Defaults to True.

In [34]:
sf = SnowflakeConnect(sfAccount=os.environ['sfAccount'],
                   sfUser=os.environ['sfUser'],
                   sfPswd=os.environ['sfPswd'],
                   sfWarehouse=os.environ['sfWarehouse'],
                   sfDatabase=os.environ['sfDatabase'],
                   sfSchema=os.environ['sfSchema'],
                   sfRole=os.environ['sfRole'])

pull_static_feature_set_to_data_lake(sf_connection=sf,
                                     stage_name='sdsdetest', 
                                     data_lake_path='features/static/ecid/',
                                     features=['MARKETINGZONE', 'TOTALSEASONSSCANNED'],
                                     grain='ECID',
                                     limit_statement='limit 1000',
                                     overwrite=True
                                    )

INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:connection to snowflake successful
INFO:__main__:Pulling 2 ECID features to the datalake at path features/static/ecid/
INFO:sdsde.snowflake.query:testing connection
INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:executing query
INFO:sdsde.snowflake.query:data loaded from snowflake
INFO:sdsde.snowflake.query:connection to snowflake has been turned off
INFO:__main__:Copy response
   rows_unloaded  input_bytes  output_bytes
0           1000        10105         10105


### `temporal_and_static_dump_data_to_datalake`

In [35]:
#export


def temporal_and_static_dump_data_to_datalake(sf_connection: object,
                                              stage_name: str,
                                              data_lake_path: str,
                                              feature_dict: list,
                                              base_query: str,
                                              grain='ECID',
                                              limit_statement='',
                                              overwrite=True):
    """
    Populates stages datalake via a snowflake query.

    Args:
        sf_connection ([type]): SnowFlake Connection
        yaml_file (str, optional): Yaml file name . Defaults to 'dataload.yaml'.
        yaml_section (str, optional): Yaml section to read. Defaults to 'inputdata'.
        data_set (str, optional): Training or Test Set. Defaults to 'train_set'.
        overwrite (bool, optional): Overwrite exisiting file or not. Defaults to True.

    Returns:
        str: query used to created the dump
    """
    full_stage_path = os.path.join(stage_name, data_lake_path)
    # copy snowflake data into the stage
    select_query = f'''
    copy into @{full_stage_path} from
    (
    SELECT
        FEATURES_HERE
        TEMPORAL_HERE
    FROM
    (
     {base_query}
    ) base
    LEFT JOIN "MACHINELEARNINGFEATURES"."PROD"."FEATURESTORE_{grain}" mlf ON base."{grain}" = mlf.{grain}
    {limit_statement}
    )
    OVERWRITE={overwrite}
    ;
    '''
    for ind, feature in enumerate(feature_dict['static_features'].keys()):
        if ind == 0:
            if feature == 'ECID':
                select_query = select_query.replace('FEATURES_HERE', f'base.{feature}FEATURES_HERE')
            else:
                select_query = select_query.replace('FEATURES_HERE', f'mlf.{feature}FEATURES_HERE')
        else:
            if feature == 'ECID':
                select_query = select_query.replace('FEATURES_HERE', f', base.{feature}FEATURES_HERE')
            else:
                select_query = select_query.replace('FEATURES_HERE', f', mlf.{feature}FEATURES_HERE')
    for feature, values in feature_dict['temporal_features'].items():
        select_query = select_query.replace('TEMPORAL_HERE', f', machinelearningfeatures.{os.environ.get("prod_or_dev", "dev")}.{feature}({" , ".join(values["args"])})TEMPORAL_HERE')
    select_query = select_query.replace('FEATURES_HERE', '')
    select_query = select_query.replace('TEMPORAL_HERE', '')
    logging.info(f'query {select_query}')
    response = sf_connection.run_str_query(select_query)
    logging.info(f'Copy response\n{response}')
    return select_query

In [36]:
feature_dict = dict({
    'temporal_features': {
        'AvgResortsPerSeason_ECID_temporal': 
          {'args' : ['base.ECID', '20161101', '20210422'],
          'variabl_type': 'cont'}, 
    },
    'static_features': {
        'ECID': {'variable_type' : 'y'},
        'IsEpicMixActivated': {'variable_type' :'cat'}
    }
})

sf = SnowflakeConnect(sfAccount = os.environ.get('sfAccount', None), 
                   sfUser = os.environ.get('sfUser', None), 
                   sfPswd = os.environ.get('sfPswd', None), 
                   sfWarehouse = os.environ.get('sfWarehouse', None),
                   sfDatabase = os.environ.get('sfDatabase', None),
                   sfSchema = os.environ.get('sfSchema', None), 
                   sfRole = os.environ.get('sfRole', None))

make_data_lake_stage(sf_connection=sf, 
                     stage_name='sdsdetest',
                     account=os.environ['azure_account'], 
                     container='sdsdetesting', 
                     data_lake_path='projects/ltr/model-runs/latest', 
                     sas_token=os.environ.get('DATALAKE_SAS_TOKEN_SECRET', None))

base_query = """SELECT ECID FROM MACHINELEARNINGOUTPUTS.DEV.DL_LTR LIMIT 100"""
_ = temporal_and_static_dump_data_to_datalake(sf_connection=sf, stage_name='sdsdetest',
                                              data_lake_path='classification/feature_set/train_set',
                                              feature_dict=feature_dict, base_query=base_query,
                                              grain='ECID', overwrite=True, limit_statement='LIMIT 10')

INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:connection to snowflake successful
INFO:sdsde.snowflake.query:testing connection
INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:executing query
INFO:sdsde.snowflake.query:data loaded from snowflake
INFO:sdsde.snowflake.query:connection to snowflake has been turned off
INFO:sdsde.snowflake.query:Stage area SDSDETEST successfully created.
INFO:root:query 
    copy into @sdsdetest/classification/feature_set/train_set from
    (
    SELECT
        base.ECID, mlf.IsEpicMixActivated
        , machinelearningfeatures.dev.AvgResortsPerSeason_ECID_temporal(base.ECID , 20161101 , 20210422)
    FROM
    (
     SELECT ECID FROM MACHINELEARNINGOUTPUTS.DEV.DL_LTR LIMIT 100
    ) base
    LEFT JOIN "MACHINELEARNINGFEATURES"."PROD"."FEATURESTORE_ECID" mlf ON base."ECID" = mlf.ECID
    LIMIT 10
    )
    OVERWRITE=True
    ;
    
INFO:sdsde.snowflake.query:testing connect

## Feature Set Query from Data Lake

### `query_feature_set_from_data_lake`

In [37]:
#export


def query_feature_set_from_data_lake(sf_connection: object,
                                     stage_name: str,
                                     data_lake_path: str,
                                     features: list,
                                     limit_statement='',
                                     ):
    """once data resides in the data lake, this will allow you to query the data
    into python RAM

    Args:
    * sf_connection (object): snowflake connection
    * stage_name (str): data lake stage in snowflake
    * data_lake_path (str): extention path in the data lake
    * features (list): list of features
    * limit_statement (str, optional): limit statement to insert to SQL ie "limit 1000". Defaults to ''. Used for debugging.

    Returns:
    * [DataFrame]: feature set
    """

    # create query string
    query = f'''
        select
            FEATURES_HERE
        from @{os.path.join(stage_name, data_lake_path)}
        {limit_statement}
    '''
    for ind, feature in enumerate(features):
        if ind == 0:
            query = query.replace('FEATURES_HERE', f'$1:"_COL_{ind}" as {feature}FEATURES_HERE')
        else:
            query = query.replace('FEATURES_HERE', f', $1:"_COL_{ind}" as {feature}FEATURES_HERE')
    query = query.replace('FEATURES_HERE', '')

    # query data
    logger.info(f'Querying Feature Set From Data Lake {query}')

    df = sf_connection.run_str_query(query)
    df.columns = [i.upper() for i in df.columns]
    logger.info(f'Final Dataset Shape - {df.shape}')
    return df

In [38]:
show_doc(query_feature_set_from_data_lake)

<h4 id="query_feature_set_from_data_lake" class="doc_header"><code>query_feature_set_from_data_lake</code><a href="__main__.py#L4" class="source_link" style="float:right">[source]</a></h4>

> <code>query_feature_set_from_data_lake</code>(**`sf_connection`**:`object`, **`stage_name`**:`str`, **`data_lake_path`**:`str`, **`features`**:`list`, **`limit_statement`**=*`''`*)

once data resides in the data lake, this will allow you to query the data
into python RAM

Args:
* sf_connection (object): snowflake connection
* stage_name (str): data lake stage in snowflake
* data_lake_path (str): extention path in the data lake
* features (list): list of features
* limit_statement (str, optional): limit statement to insert to SQL ie "limit 1000". Defaults to ''. Used for debugging.

Returns:
* [DataFrame]: feature set

In [45]:
#skip
sf = SnowflakeConnect(sfAccount=os.environ['sfAccount'],
                   sfUser=os.environ['sfUser'],
                   sfPswd=os.environ['sfPswd'],
                   sfWarehouse=os.environ['sfWarehouse'],
                   sfDatabase=os.environ['sfDatabase'],
                   sfSchema=os.environ['sfSchema'],
                   sfRole=os.environ['sfRole'])

df = query_feature_set_from_data_lake(sf_connection=sf, 
                                      stage_name='sdsdetest', 
                                      data_lake_path='features/static/ecid/',
                                      features=['ECID', 'MARKETINGZONE', 'TOTALSEASONSSCANNED'],
                                      limit_statement='limit 100'
                                     )

INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:connection to snowflake successful
INFO:__main__:Querying Feature Set From Data Lake 
        select
            $1:"_COL_0" as ECID, $1:"_COL_1" as MARKETINGZONE, $1:"_COL_2" as TOTALSEASONSSCANNED
        from @dsdetest/features/static/ecid/
        limit 100
    
INFO:sdsde.snowflake.query:testing connection
INFO:sdsde.snowflake.query:sqlalchemy snowflake engine created
INFO:sdsde.snowflake.query:executing query
INFO:sdsde.snowflake.query:data loaded from snowflake
INFO:sdsde.snowflake.query:connection to snowflake has been turned off
INFO:__main__:Final Dataset Shape - (0, 3)


### ``query_feature_set_from_data_lake_dt``

In [40]:
#export


def query_feature_set_from_data_lake_dt(sf_connection: object,
                                        stage_name: str,
                                        data_lake_path: str,
                                        features: list,
                                        dtypes_list: list,
                                        limit_statement='',
                                        ):
    """once data resides in the data lake, this will allow you to query the data
    into python RAM

    Args:
    * sf_connection (object): snowflake connection
    * stage_name (str): data lake stage in snowflake
    * data_lake_path (str): extention path in the data lake
    * features (list): list of features
    * limit_statement (str, optional): limit statement to insert to SQL ie "limit 1000". Defaults to ''. Used for debugging.

    Returns:
    * [DataFrame]: feature set
    """
    query = f'''
        select
            FEATURES_HERE
        from @{os.path.join(stage_name, data_lake_path)}
        {limit_statement}
    '''
    for ind, feature in enumerate(zip(features, dtypes_list)):
        if ind == 0:
            query = query.replace('FEATURES_HERE', f'$1:"_COL_{ind}"::{feature[1].upper()} as {feature[0]}FEATURES_HERE')
        else:
            query = query.replace('FEATURES_HERE', f', $1:"_COL_{ind}"::{feature[1].upper()} as {feature[0]}FEATURES_HERE')
    query = query.replace('FEATURES_HERE', '')

    logging.info(f'Querying Feature Set From Data Lake {query}')
    df = sf_connection.run_str_query(query)
    df.columns = [i.upper() for i in df.columns]
    logging.info(f'Final Dataset Shape - {df.shape}')
    return df

### `query_pushed_parquet_table_data_lake`

In [41]:
#export


def query_pushed_parquet_table_data_lake(sf_connection: object,
                                         stage_name: str,
                                         data_lake_path: str,
                                         feature_dict: dict,
                                         limit_statement='',
                                         pattern='.*parquet'):
    """
    Function is used when there is a parquet table in azure datalake
    that need to be brought into memory for exploration

    Args:
    * sf_connection (object): Snowflake Engine
    * stage_name (str): Azure stage name
    * data_lake_path (str): Data Lake path
    * feature_dict (dict): feature dictionary
    * limit_statement (str, optional): limit statment. Defaults to ''.
    * pattern (str, optional): pattern to read partitions. Defaults to '.*parquet'.

    Returns:
    * pd.DataFrame: Data Frame
    """
    if not stage_name.endswith('/'):
        stage_name += '/'
    if data_lake_path.startswith('/'):
        logging.error('data_lake_path should not start with / please remove and re-run')
        sys.exit()
    query = f'''
        select
            FEATURES_HERE
        from @{os.path.join(stage_name, data_lake_path)} (pattern=>'{pattern}')
        {limit_statement}
    '''
    features = feature_dict.keys()
    for ind, feature in enumerate(features):
        if ind == 0:
            query = query.replace('FEATURES_HERE', f'$1:"{feature.lower()}" as {feature}FEATURES_HERE')
        else:
            query = query.replace('FEATURES_HERE', f', $1:"{feature.lower()}" as {feature}FEATURES_HERE')
    query = query.replace('FEATURES_HERE', '')
    logger.info(f'Querying Feature Set From Data Lake {query}')
    df = sf_connection.run_str_query(query)
    df.columns = [x.upper() for x in df.columns]
    logger.info('fixing dtypes')
    for k, v in feature_dict.items():
        if v['variable_type'] == 'cont':
            df[k] = df[k].astype('float')
    logger.info(f'Final Dataset Shape - {df.shape}')
    return df

# Create

In [42]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 01_azure.ipynb.
Converted 02_utils_dataframes.ipynb.
Converted 02_utils_parseyaml.ipynb.
Converted 02_utils_stfp.ipynb.
Converted 02_utils_traininghelpers.ipynb.
Converted 02_utils_traininghelpers_fastai.ipynb.
Converted 03_dstools_preparedata.ipynb.
Converted 04_snowflake_copyinto.ipynb.
Converted 04_snowflake_copyinto2.ipynb.
Converted 04_snowflake_query.ipynb.
Converted 05_azure_wrappers.ipynb.
Converted 06_modeling_inference.ipynb.
Converted 06_modeling_inference_fastai.ipynb.
Converted 06_modeling_premodel.ipynb.
Converted 06_modeling_preprocessing.ipynb.
Converted 06_modeling_preprocessing_fastai.ipynb.
Converted 06_modeling_training.ipynb.
Converted 06_modeling_training_fastai.ipynb.
Converted 07_Binary_Classification_Fastai_Example_Notebook.ipynb.
Converted index.ipynb.
