In [1]:
import argparse
import os
import gc
import sys
import glob
import shutil
import itertools
import re
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from pandas.api.types import CategoricalDtype
import numpy as np
from time import time
from functools import reduce

import dask
import dask.dataframe as dd
from dask.distributed import Client, wait, progress, get_worker
from datetime import datetime

import sklearn
from sklearn.model_selection import train_test_split
import xgboost as xgb
from xgboost.dask import DaskDMatrix

# Notebook to python script conversion: set run_as_script=True. May want to clear all notebook outputs first, then save.
# Run this in CLI: jupyter nbconvert --to=script python_telco_data_ENC_ML_hdfs.ipynb --output=do-ml_gscv
run_as_script = False # If true, need to MANUALLY wrap converted script in "if __name__ == '__main__':" to resolve dask issues with multiprocess.


start_time_str = datetime.now().strftime("%m/%d/%Y %H:%M:%S")

# Vars to change when running on internal clusters (e.g. spark1, spark2a). Other params such as hdfs and dask hosts are preconfigured.
# run_type, cluster_type, data_format, etl_output_dir, enc_output_dir
# Example local run: ./do-ml.py --run_type=gpu
# Example cluster run: ./do-ml.py --run_type=cpu --cluster_type=spark1 --data_format=parquet --etl_output_dir=/data/erife-output/churn/10k/gpu/churn-etl.parquet --enc_output_dir=/data/erife-output/churn/churn-encoded

# etl_num_parts = 4 # Used for local testing of physical data partitioning only
default_run_type = 'gpu'
default_enable_gridSearchCV = True # Run gridSearchCV for HPO.
default_cv_strategy = 'node' # Select 'node' or 'cluster'. Node trains each fold independently at all grid points. Cluster schedules everything at once.
default_data_format = 'parquet'

# Default data dirs assumes local run. 
# Use ABSOLUTE directories for enc_output_dir since externally started dask cluster cannot resolve relative dirs.
# default_etl_output_dir = '/data/telco_churn/etl_data/sf1k_' + default_data_format # CHANGE ME
# default_enc_output_dir = '/data/telco_churn/enc_data' # WARNING: Directory will be wiped with each run.
default_etl_output_dir = './churn-etl.parquet' # CHANGE ME
default_enc_output_dir = './enc_data' # WARNING: Directory will be wiped with each run.

default_hdfs_host = 'localhost' # Also default_dask_host
default_hadoop_home = os.environ.get('HADOOP_HOME')
default_java_home = os.environ.get('JAVA_HOME')

parser = argparse.ArgumentParser()
parser.add_argument('--run_type', help='Run type (default="%s").' % default_run_type, default=default_run_type)
parser.add_argument('--enable_gridSearchCV', help='Run GridSearchCV (default="%s").' % default_enable_gridSearchCV, default=default_enable_gridSearchCV)
parser.add_argument('--cv_strategy', help='Run type (default="%s").' % default_cv_strategy, default=default_cv_strategy)
parser.add_argument('--num_folds', help='Number of folds for cross-validation.', default=8, type=int)
parser.add_argument('--cuda_visible_devices', help='CUDA_VISIBLE_DEVICES for multi-GPU configs (default="0").', default='0')
parser.add_argument('--data_format', help='Data source format (default="%s).' % default_data_format, default=default_data_format)
parser.add_argument('--storage_backend', help='Data storage backend. Select "local" or "hdfs" (default="local").', default='local')
parser.add_argument('--cluster_type', help='Dask cluster type. Select "local", "spark1", "spark2a" (default="local").', default='local')
parser.add_argument('--etl_output_dir', help='ETL data output directory "%s".' % default_etl_output_dir, default=default_etl_output_dir)
parser.add_argument('--enc_output_dir', help='Encoded data output directory "%s".' % default_enc_output_dir, default=default_enc_output_dir)
parser.add_argument('--hdfs_host', help='HDFS namenode (default="localhost").', default=default_hdfs_host)
parser.add_argument('--hdfs_port', help='HDFS namenode port (default=9000).', default=9000, type=int)
parser.add_argument('--dask_host', help='Dask host (default=hdfs_host).', default=default_hdfs_host)
parser.add_argument('--dask_port', help='Dask scheduler port (default=8786).', default=8786, type=int)

parser.add_argument('--hadoop_home', help='Hadoop home directory (default="%s").' % default_hadoop_home, default=default_hadoop_home)
parser.add_argument('--java_home', help='Java home directory (default="%s").' % default_java_home, default=default_java_home)

parser.add_argument('--enc_num_row_groups', help='Encoded data parquet file num_row_groups (default=8).', default=8, type=int)
parser.add_argument('--max_depth', help='xgboost max_depth param (default=10).', default=10, type=int)
parser.add_argument('--num_boost_round', help='xgboost num_boost_round param (default=200).', default=200, type=int)
parser.add_argument('--score_metric', help='Scoring metric for inferencing (default=acc).', default='auc')


if run_as_script == True:
    args = parser.parse_args()
else:
    # Run within notebook. Passing empty string allows argparse to work within jupyter notebook.
    args = parser.parse_args([])
    
# Run settings:
run_type = args.run_type # Select cpu/gpu
enable_gridSearchCV = args.enable_gridSearchCV
cv_strategy = args.cv_strategy
num_folds = args.num_folds
cuda_visible_devices = args.cuda_visible_devices
data_format = args.data_format # csv or parquet
storage_backend = args.storage_backend # 'local' or 'hdfs'
cluster_type = args.cluster_type # local

# Data dirs:
etl_output_dir = args.etl_output_dir
enc_output_dir = args.enc_output_dir

# Parameters for encoding ETL data and outputting to parquet:
enc_num_row_groups = args.enc_num_row_groups

# HDFS settings:
hdfs_host = args.hdfs_host
hdfs_port = args.hdfs_port

# Dask settings:
dask_host = args.dask_host
dask_port = args.dask_port

# Env settings:
hadoop_home = args.hadoop_home
java_home = args.java_home

# xgboost params:
max_depth = args.max_depth
num_boost_round = args.num_boost_round
score_metric = args.score_metric


def setenv():
    """
    Function to be ran on dask cluster to overide env vars.
    Example: client.run(setenv)
    """
    import os
    import subprocess
    
    # Need env configured on all workers for ARROW_LIBHDFS_DIR so that pyarrow HDFS can be used.
    # https://stackoverflow.com/questions/55922487/what-could-be-the-explaination-of-this-pyarrow-lib-arrowioerror-hdfs-file-does
    classpath = subprocess.Popen([hadoop_home+"/bin/hdfs", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]
    
    os.environ["HADOOP_HOME"] = hadoop_home
    os.environ["JAVA_HOME"] = java_home
    os.environ["CLASSPATH"] = classpath.decode("utf-8")
    os.environ["ARROW_LIBHDFS_DIR"] = hadoop_home+"/lib/native"
#     os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices


if cluster_type == 'local':
    if run_type == 'gpu':
        from dask_cuda import LocalCUDACluster

#         # LocalCUDACluster for multi-GPU on same node:
# #         cuda_cluster = LocalCUDACluster() #n_workers=1, threads_per_worker=8) # p3.2xlarge has 1x V100, 8 vCPU
#         cuda_cluster = LocalCUDACluster(n_workers=2, threads_per_worker=8) # p3.8xlarge has 4x V100, 32 vCPU
#         client = Client(cuda_cluster)
    
        # Something broke in localCUDACluster. Start dask-scheduler and dask-cuda-worker manually. Then connect to endpoint.
        client = Client(n_workers=1, threads_per_worker=8)
#         client = Client('tcp://127.0.0.1:8786')
    else:
        # xgboost training seems to benefit from fewer workers and more threads. Same with inference against entire directory. 
        # Set n_workers equal to number of physical machines.
#         client = Client(n_workers=2, threads_per_worker=30)
        client = Client(n_workers=1, threads_per_worker=32)
    
elif cluster_type == 'spark1':
    storage_backend = 'hdfs'
    hdfs_host = '10.150.162.36'
    dask_host_str = 'tcp://'+hdfs_host+':8786'
    hadoop_home = '/opt/hadoop-3.2.1'
    java_home = '/usr/lib/jvm/java-1.8.0-openjdk-amd64'
    cuda_visible_devices = "0,1"
    
    client = Client(dask_host_str)
    client.run(setenv) # Apply env settings on entire dask cluster
    
elif cluster_type == 'spark2a':
    storage_backend = 'hdfs'
    hdfs_host = '10.150.166.218'
    dask_host_str = 'tcp://'+hdfs_host+':8786'
    hadoop_home = '/opt/hadoop/hadoop-3.2.1'
    java_home = '/usr/lib/jvm/java-1.8.0-openjdk-amd64'
    cuda_visible_devices = "0,1"
    
    client = Client(dask_host_str)
    client.run(setenv) # Apply env settings on entire dask cluster
#     client = Client('tcp://127.0.0.1:8786') # localhost
#     client = Client(n_workers=6, threads_per_worker=20) # overide
    
else:
    raise ValueError('Choose cluster_type as "local", "spark1", "spark2a".')
    
print('xgboost version: ', xgb.__version__)
client

xgboost version:  1.5.0-dev


0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads:  8,Total memory:  251.65 GiB

0,1
Comm: tcp://127.0.0.1:44487,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads:  8
Started:  Just now,Total memory:  251.65 GiB

0,1
Comm: tcp://127.0.0.1:37321,Total threads: 8
Dashboard: http://127.0.0.1:33155/status,Memory: 251.65 GiB
Nanny: tcp://127.0.0.1:34445,
Local directory: /home/btong/github/data-science-blueprints/churn/dask-worker-space/worker-tikamitf,Local directory: /home/btong/github/data-science-blueprints/churn/dask-worker-space/worker-tikamitf
GPU: Quadro RTX 8000,GPU memory: 47.46 GiB


In [2]:
if storage_backend == 'local':
    etl_files = glob.glob(etl_output_dir+'/*.'+data_format) # ETL data directory (to be encoded)
    
elif storage_backend == 'hdfs':
    hdfs = pa.hdfs.connect(hdfs_host, hdfs_port)
    hdfs_con_str = 'hdfs://'+hdfs_host+':'+str(hdfs_port)
    
    # Assumes ETL data already on hdfs:
    etl_output_dir = hdfs_con_str + etl_output_dir
    enc_output_dir = hdfs_con_str + enc_output_dir
#     xgb_hdfs_output_dir = hdfs_con_str + '/data/erife-output/churn/xgb_models' # Output directory for xgboost model file. Use None for no output.
    xgb_hdfs_output_dir = None
    
    # Issue with xgboost .save_model() when writing out to hdfs. Requires compiling xgboost with hdfs feature enabled. Write locally and store backup on hdfs.
    
    etl_files = hdfs.ls(etl_output_dir[len(hdfs_con_str):]) # ETL data directory (to be encoded)
    etl_files = [hdfs_con_str+fn for fn in etl_files if fn.split('.')[-1]==data_format]
    
    print('Number of '+data_format+' found in ETL folder:', len(etl_files))


# xgboost params:
xgb_params = { 
    'random_state': 0,
    'eta': 0.1,
    'gamma': 0.1,
    'max_depth': max_depth, 
#     'num_boost_round': num_boost_round, # Number of boosting rounds: num_boost_round in xgboost, numRound in xgboost4j, n_estimators in scikit-learn.
    'max_leaves': 4*256,
    'objective': 'binary:logistic',
    'eval_metric': 'auc', # Applied to eval_set/test_data if provided
    'booster': 'gbtree',
}

if run_type == 'cpu':
    tree_method = 'hist'
elif run_type == 'gpu':   
    tree_method = 'gpu_hist'

xgb_params['tree_method'] = tree_method

# Load JSON with Schema for ETL Data

In [3]:
with open('churn_etl_schema.json') as json_file: 
    schema_json = json.load(json_file)

label_col = 'Churn' # Needs to be numeric encoded (i.e., with StringIndexer)

cat_str_cols = list(schema_json['categorical'].keys())
num_cols = schema_json['numeric']
ignore_cols = schema_json['unique']

all_cols = cat_str_cols + num_cols + ignore_cols

# Determine categorical binary cols which need to be label encoded:
binary_cols = [cat for cat in schema_json['categorical'] if len(schema_json['categorical'][cat])==2]

# One hot encode only non-binary, categorical columns:
ohe_cols = [cc for cc in cat_str_cols if cc not in binary_cols] # Preserve order

print('Columns to one-hot-encode:', ohe_cols)
print('Binary categorical columns:', binary_cols)
print('Numeric columns:', num_cols)

# TODO: low cardinality numeric cols. These may require one-hot-encoding.

# Get unique categories for binary and one-hot columns. Set alphabetical with explicit order to be used as global index.
cat_index_codes = {cc: CategoricalDtype(sorted(schema_json['categorical'][cc]), ordered=True) for cc in cat_str_cols}

# Generate column names with prefixes:
dummy_cols = [name+'_'+val for name in ohe_cols for val in schema_json['categorical'][name]]

# Update column labels:
dummy_cols = [re.sub('[()]', '', cc.replace(' ', '-')) for cc in dummy_cols]
all_enc_cols = num_cols + binary_cols + dummy_cols + ['cv_idx']

# print('All encoded output columns:', all_enc_cols)
print()
print('Number of columns after encoding: ', len(all_enc_cols))


Columns to one-hot-encode: ['MultipleLines', 'OnlineBackup', 'InternetService', 'OnlineSecurity', 'Contract', 'PaymentMethod', 'StreamingMovies', 'StreamingTV', 'TechSupport', 'DeviceProtection']
Binary categorical columns: ['PaperlessBilling', 'Partner', 'gender', 'Dependents', 'PhoneService']
Numeric columns: ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges', 'Churn']

Number of columns after encoding:  42


In [4]:
# Feature engineering via taking combinations of top few feature categorical columns:
from itertools import combinations

combo_cols = ['Contract', 'OnlineSecurity', 'InternetService', 'PaymentMethod'] # Top categorical features identified by SHAP
# combo_cols = ['SeniorCitizen'] + ohe_cols + binary_cols # Take all columns

combo_pairs = list(combinations(combo_cols, 2))
print('Number of column combinations', len(combo_pairs))

# Combine pairs of columns, categorize, then OHE. 

Number of column combinations 6


In [5]:
def telco_metadata_ttsplitter(data_path, test_size=0.2):
    """
    Generate metadata for original customerID in telco churn data. Use metadata to construct stratified train_test_split with less leakage.
    """
    with Client(n_workers=8, threads_per_worker=1) as client_local:
        # Read subset of columns needed to perform train/test split based on original customer source:
        df_etl = dd.read_parquet(data_path, columns=['customerID', label_col]) #.persist()

        # Truncate customerID to obtain original customer info:
        df_etl['customerID_src'] = df_etl['customerID'].str[:10]

        # Stratified split requires information in Churn column. Select first replica in each unique customerID_src group.
        df_etl_uniq = df_etl[['customerID_src', label_col]].groupby('customerID_src').first()
        df_etl_uniq = df_etl_uniq[[label_col]].reset_index()
        df_etl_uniq = df_etl_uniq.sort_values('customerID_src') # Ensure consistent ordering before running train_test_split()
        df_etl_uniq = df_etl_uniq.compute()

    # Can get rid of a lot of code by going directly to source csv file. 

    # Use sklearn train_test_split() to stratify and split keys:
    strat_train_idx, strat_test_idx = train_test_split(range(len(df_etl_uniq)), test_size=test_size, stratify=df_etl_uniq[label_col].values, random_state=42)
    df_etl_uniq['train_set'] = False
    df_etl_uniq.loc[strat_train_idx, 'train_set'] = True
    df_etl_uniq = df_etl_uniq.drop(columns=label_col)
    return(df_etl_uniq)

tic = time()
test_size = 0.2 # Fraction of data used in test set within train_test_split()
tt_split_toc = telco_metadata_ttsplitter(etl_output_dir, test_size)
tt_split_toc = tt_split_toc.sample(frac=1, random_state=0) # Shuffle rows to ensure data well mixed.
tt_split_toc['cv_idx'] = np.mod(np.arange(len(tt_split_toc)), num_folds) # Assign fold indices for cross-validation
print('Metadata generation time: ', str(np.round(time()-tic,2)) + 's')
tt_split_toc

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41555 instead


Metadata generation time:  2.2s


Unnamed: 0,customerID_src,train_set,cv_idx
2188,7838-LAZFO,True,0
2287,8189-DUKMV,True,1
1066,3758-CKOQL,True,2
1594,5619-PTMIK,True,3
2528,9115-YQHGA,False,4
...,...,...,...
6623,6960-HVYXR,True,3
3527,4676-WLUHT,False,4
657,2403-BCASL,False,5
5669,3722-WPXTK,False,6


# Read ETL'd Files and Output Encoded Data to Disk

In [6]:
def naive_ohe_enc_parts(run_type, data_path, ohe_cols, binary_cols, cat_index_codes, drop_cols=None, test_size=0.2):
    """
    Data encoding pipeline using pandas, operating on dictionary encoded parquet files. Updated to handle global categories.
    
    run_type: str
        Select 'cpu' or 'gpu' as hardware for running.
    data_path: str
        Location of parquet files (either directory or filename).
    split_data: bool
        Split data into train/test set.
    ohe_cols: list
        List of strings with categorical columns to be one-hot-encoded.
    binary_cols: list
        List of strings with binary categorical columns to encode. 
    cat_index_codes: dictionary of CategoricalDtype
        Global mapping of ordered classes in each categorical column.
    drop_cols: list
        List of strings with columns to drop.
    test_size: float between [0,1]
        Split data into train/test set.
    """
    if run_type == 'gpu':
        import cudf as hw
    elif run_type == 'cpu':
        import pandas as hw
    else:
        raise ValueError('Select cpu or gpu. Case not handled.')
    
    if data_format == 'parquet':
        data_pd = hw.read_parquet(data_path, read_dictionary=binary_cols+ohe_cols)
    elif data_format == 'csv':
        data_pd = hw.read_csv(data_path, dtype=cat_index_codes) # Apply global categories
        
    data_pd = data_pd.dropna() # Make sure data doesn't contain NA's
    
    # Merge table with training metadata for subsequent train/test split:
    data_pd['customerID_src'] = data_pd['customerID'].str[:10]
    data_pd = data_pd.merge(tt_split_toc[['customerID_src', 'train_set', 'cv_idx']], on='customerID_src', how='left')
    
    if drop_cols != None:
        data_pd.drop(columns=drop_cols, inplace=True)
    
    if data_format == 'parquet':
        # Update categorical dictionary to global values:
        for cc in binary_cols+ohe_cols:
    #         data_pd[cc].cat.set_categories(list(cat_index_codes[cc].categories), ordered=True, inplace=True) # Works in pandas, but not cudf
            data_pd[cc] = data_pd[cc].astype(cat_index_codes[cc]) # Works in pandas, but not cudf
        
    # One hot encoding of categorical columns (3+ categories):
    data_pd_enc = hw.get_dummies(data_pd, columns=ohe_cols, drop_first=False, dtype='bool')
    
    # Binary encode columns (e.g., [Female, Male], [False, True], or [No, Yes] => [0, 1]):
    for cc in binary_cols:
        data_pd_enc[cc] = data_pd_enc[cc].cat.codes.astype('bool')
        
    # Convert uint8 to int8 for Spark compatibility.
    uint8_cols = data_pd_enc.select_dtypes('uint8').columns
    data_pd_enc[uint8_cols] = data_pd_enc[uint8_cols].astype('int8')
    
    # Replace special characters in column name:
    data_pd_enc.columns = [re.sub('[()]', '', cc.replace(' ', '-')) for cc in data_pd_enc.columns]
    return(data_pd_enc)


@dask.delayed
def enc_and_write_pq(fn):
    enc_naive_pd = naive_ohe_enc_parts('cpu', fn, ohe_cols, binary_cols, cat_index_codes, drop_cols='customerID', test_size=test_size)
#     enc_naive_pd[['MonthlyCharges', 'TotalCharges']] = enc_naive_pd[['MonthlyCharges', 'TotalCharges']].astype('float32')
    
    row_group_size=np.ceil(len(enc_naive_pd)/enc_num_row_groups)
    # Dask might be assembling cols based on order of completion. Rearrange columns prior to writing output for consistency!
    
    
    if test_size == 0:
        # All data passed into training:
        enc_naive_pd = enc_naive_pd[all_enc_cols]
        enc_naive_pd.to_parquet(os.path.join(enc_output_dir+'/train', fn.split('/')[-1].replace('.csv', '.parquet')), flavor='spark', row_group_size=row_group_size)
    else:
        enc_naive_pd_grp = enc_naive_pd.groupby('train_set')
        
        train_df = enc_naive_pd_grp.get_group(True)
        train_df[all_enc_cols].to_parquet(os.path.join(enc_output_dir+'/train', fn.split('/')[-1].replace('.csv', '.parquet')), flavor='spark', row_group_size=row_group_size)

        test_df = enc_naive_pd_grp.get_group(False)
        test_df[all_enc_cols].to_parquet(os.path.join(enc_output_dir+'/test', fn.split('/')[-1].replace('.csv', '.parquet')), flavor='spark', row_group_size=row_group_size)
    return(fn)


# Wipe contents of enc_output_dir:
if storage_backend == 'hdfs':
    try:
        hdfs.delete(enc_output_dir[len(hdfs_con_str):], recursive=True)
    except:
        pass
    
    hdfs.mkdir(enc_output_dir[len(hdfs_con_str):])
    hdfs.mkdir(enc_output_dir[len(hdfs_con_str):]+'/train')
    hdfs.mkdir(enc_output_dir[len(hdfs_con_str):]+'/test')
        
elif storage_backend == 'local':
    try:
        # RECURSIVELY DELETE DIRECTORY and then add it
        shutil.rmtree(enc_output_dir)
    except:
        pass
    
    os.mkdir(enc_output_dir)
    os.mkdir(enc_output_dir+'/train')
    os.mkdir(enc_output_dir+'/test')


# Create local dask client to perform encoding. GPU run near memory limit so needed to use nthreads=1 in primary dask cluster.
tic = time()
with Client(n_workers=8, threads_per_worker=1) as client_local:
    out = [enc_and_write_pq(fn) for fn in etl_files]
    dask.compute(out, scheduler=client_local)
t_encode = time() - tic

print('Data encoding took:', '{:0.2f}'.format(t_encode) + 's')

if data_format == 'parquet':
    etl_num_recs = len(dd.read_parquet(etl_output_dir))
    enc_train_num_recs = len(dd.read_parquet(enc_output_dir+'/train'))
    enc_test_num_recs = len(dd.read_parquet(enc_output_dir+'/test'))
    diff_num_recs = etl_num_recs - enc_train_num_recs - enc_test_num_recs
    print('Number of input records from ETL files:', etl_num_recs)
    print('  Encoded records in train set:', enc_train_num_recs)
    print('  Encoded records in test set:', enc_test_num_recs)
    print('Number of records removed during encoding:', diff_num_recs)
    
    if diff_num_recs != 0:
        raise ValueError('Different number of records between ETL input and ENC output detected.')
        
    # TODO: add fast line counter for csv without having to read entire file?

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40585 instead


Data encoding took: 2.45s
Number of input records from ETL files: 703200
  Encoded records in train set: 562500
  Encoded records in test set: 140700
Number of records removed during encoding: 0


# Load Encoded Data and Run ML Pipeline

In [7]:
def dask_xgb_train_from_enc(client, run_type, data_path, label_col, xgb_params, xgb_model_name):
    """
    Use dask + xgboost for training against already encoded data.
    """
    if run_type == 'cpu':
        ddf_enc = dd.read_parquet(data_path)
    elif run_type == 'gpu':
        import dask_cudf
        import cupy as cp
        ddf_enc = dask_cudf.read_parquet(data_path)
    else:
        raise ValueError('Select run_type of cpu or gpu.')
    
    tic = time()
    ddf_enc = client.persist(ddf_enc)
    wait([ddf_enc])
    
    feature_cols = [cc for cc in ddf_enc.columns if cc not in [label_col, 'cv_idx']]
    X = ddf_enc[feature_cols].astype('float32')
    y = ddf_enc[label_col].astype('float32')
    
#     # Persist data:
#     X = client.persist(X)
#     y = client.persist(y)
#     wait([X, y])
    
    # Use XGBOOST API:
    if run_type == 'gpu':
        # See example: https://github.com/dmlc/xgboost/blob/master/demo/dask/gpu_training.py
        # `DaskDeviceQuantileDMatrix` is used instead of `DaskDMatrix`, be careful
        # that it can not be used for anything else than training.
        dtrain = xgb.dask.DaskDeviceQuantileDMatrix(client, X, y)
    else:
        dtrain = DaskDMatrix(client, X, y)
    
    del ddf_enc, X, y # Cleanup
    data_load_time = np.round(time() - tic, 2)
    
    tic = time()
    xgb_model = xgb.dask.train(client, xgb_params, dtrain, num_boost_round=num_boost_round)['booster']
    train_model_time = np.round(time() - tic, 2)
    
    # Save xgb model to disk:
    tic = time()
    xgb_model.save_model(xgb_model_name+'.model') # Save model to current dir
    xgb_model.save_model('/tmp/'+xgb_model_name+'.json') # Experimental JSON format. To be nested in output file with timings/metrics.
    
    # Move local xgb-model to hdfs if xgb_hdfs_output_dir specified:
    if (storage_backend == 'hdfs'):
        if xgb_hdfs_output_dir != None:
            with open(xgb_model_name+'.model','rb') as f:
                hdfs.upload(os.path.join(xgb_hdfs_output_dir, xgb_model_name+'.model'), f)
    export_model_time = np.round(time() - tic, 2)
    
    metrics = {
        'data_load_time': data_load_time,
        'train_model_time': train_model_time,
        'export_model_time': export_model_time
        }
    
    del dtrain
    gc.collect() # Manual garbage collection needed since python may not clean up GPU resources automatically.
    return(metrics)


print('Starting xgboost '+run_type+' training....')
xgb_model_name = 'python_telco_dask_xgb_'+run_type

tic = time()
training_metrics = dask_xgb_train_from_enc(client, run_type, enc_output_dir+'/train', label_col, xgb_params, xgb_model_name)
t_dask_train = time() - tic

training_metrics['total_time'] = t_dask_train
print(training_metrics)
print('Dask xgboost '+run_type+' training time:', '{:0.2f}'.format(t_dask_train) + 's')

Starting xgboost gpu training....


[20:52:46] task [xgboost.dask]:tcp://127.0.0.1:37321 got new rank 0


{'data_load_time': 4.55, 'train_model_time': 5.01, 'export_model_time': 0.08, 'total_time': 10.778921604156494}
Dask xgboost gpu training time: 10.78s


# Load XGBOOST Model and Run Inferencing

In [8]:
def score_pred(run_type, y_true, y_pred, score_metric):
    """
    Prediction scoring function.
    """
    if run_type == 'gpu':
#         import dask.dataframe as hw # Has internal switching for compatibility. Takes a long time to perform AUC score, but fast at prediction.
        from cuml.metrics import roc_auc_score
        from cuml.metrics.accuracy import accuracy_score
        # TODO: inplace scoring to pair with inplace prediction? Only works with certain methods.
    else:
        # TODO: need parallelized roc_auc_score computation for auc.
        from sklearn.metrics import roc_auc_score # Single-threaded?
#         from sklearn.metrics import accuracy_score # Single-threaded
        from dask_ml.metrics import accuracy_score # CPU parallelized

    if score_metric == 'auc':
        # AUC uses probabilities. https://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_auc_score.html
        score_value = roc_auc_score(y_true, y_pred)
    elif score_metric == 'acc':
        # Accuracy score uses threshold value due to ==. https://scikit-learn.org/stable/modules/generated/sklearn.metrics.accuracy_score.html
        score_value = accuracy_score(y_true.round(), y_pred.round()) # y_pred needs to be converted explicitly. Dask doesn't know shape of y, y_pred if dask.dataframe.
    return(score_value)
    
def dask_xgb_infer(client, run_type, data_path, label_col, score_metric='auc'):
    """
    Use dask + xgboost for inferencing against already encoded data.
    """    
    if run_type == 'gpu':
        import dask_cudf as hw # Needed for xgb.dask.inplace_predict(). 
#         import dask.dataframe as hw # Has internal switching for compatibility. Takes a long time to perform AUC score, but fast at prediction.
    else:
        import dask.dataframe as hw
    
    model_file = 'python_telco_dask_xgb_'+run_type+'.model'
    model = xgb.Booster(model_file=model_file)
    
    tic = time()
    ddf_enc = hw.read_parquet(data_path).persist()

    # Format data for xgboost:
    feature_cols = [cc for cc in ddf_enc.columns if cc not in [label_col, 'cv_idx']]
    X = ddf_enc[feature_cols].astype('float32')
    y = ddf_enc[label_col].astype('float32')
    
#     # Persist data:
#     X = client.persist(X)
#     y = client.persist(y)
#     wait([X, y])
    data_load_time = np.round(time() - tic, 2)
    
    # Run predictions:
    tic = time()
    model.set_param('predictor', run_type + '_predictor')
    model = client.scatter(model, broadcast=True)

    # See https://xgboost.readthedocs.io/en/latest/tutorials/dask.html#running-prediction:
#     y_pred = xgb.dask.predict(client, model, xgb.dask.DaskDMatrix(client, X, y)) # More efficient to avoid xgb.dask.DaskDMatrix()
#     y_pred = xgb.dask.predict(client, model, X) # Use inplace_predict() can sometimes be faster.

    # IMPORTANT: X.values required to get consistent CPU scoring. Issue with column ordering in dask.dataframe?
#     y_pred = xgb.dask.inplace_predict(client, model, X.values) # Use inplace_predict() can sometimes be faster.
    y_pred = xgb.dask.predict(client, model, X.values) # inplace_predict() doesn't work with xgb 1.4.0dev on rapids 21.06 build.

    # Force computation of y's required for libraries without dask.dataframe support:
    y_pred = y_pred.compute()
    y = y.compute()
    wait([y_pred, y])
        
    predict_time = np.round(time() - tic, 2)
    
    # Score predictions:
    print('Scoring predictions in '+ data_path)
    tic = time()
    score_value = score_pred(run_type, y, y_pred, score_metric)
    score_time = np.round(time() - tic, 2)

    # TODO: have inference run distributively. Need roc_auc_score equivalent for dask-gpu. Maybe dask_cudf has it?
    metrics = {
        'data_load_time': data_load_time,
        'predict_time': predict_time,
        'score':
            {score_metric: score_value,
            score_metric+'_time': score_time
                }
        }
    
    del ddf_enc, X, y, y_pred, model
    gc.collect()
    return(metrics)


print('Starting inference rounds....')
tic = time()
if test_size == 0:
    inference_metrics_train = dask_xgb_infer(client, run_type, enc_output_dir+'/train', label_col, score_metric=score_metric)
    inference_metrics_test = []
else:
    # Inference against entire data set in two steps:
    inference_metrics_train = dask_xgb_infer(client, run_type, enc_output_dir+'/train', label_col, score_metric=score_metric)
    inference_metrics_test = dask_xgb_infer(client, run_type, enc_output_dir+'/test', label_col, score_metric=score_metric)
t_dask_infer = np.round(time() - tic, 2)

end_time_str = datetime.now().strftime("%m/%d/%Y %H:%M:%S")

print('Inference against training data: ', inference_metrics_train)
print('Inference against (hold-out) test data: ', inference_metrics_test)
print('Total prediction time:', '{:0.2f}'.format(t_dask_infer) + 's')

Starting inference rounds....
Scoring predictions in ./enc_data/train
Scoring predictions in ./enc_data/test
Inference against training data:  {'data_load_time': 0.05, 'predict_time': 5.05, 'score': {'auc': 0.9999648332595825, 'auc_time': 1.18}}
Inference against (hold-out) test data:  {'data_load_time': 0.07, 'predict_time': 3.76, 'score': {'auc': 0.8218247294425964, 'auc_time': 0.01}}
Total prediction time: 10.30s


# Run GridSearchCV for Hyper-parameter Optimization

In [9]:
def get_grid_pt(grid_idx):
    """
    Convert grid point to dict.
    """
    # Update xgb_params:
    xgb_params_ = xgb_params.copy()
    cur_param = dict(gridsearch_df.iloc[grid_idx])
    cur_param_start = cur_param.copy()
    #     print('Running CV with: ', cur_param_start)

    # Move num_boost_round out of dict:
    if 'num_boost_round' in cur_param.keys():
        n_rounds = int(cur_param['num_boost_round'])
        cur_param.pop('num_boost_round')
    else:
        n_rounds = num_trees

    # Some issues with dtype conversion when converting to dict?
    try:
        cur_param['max_depth'] = int(cur_param['max_depth'])
    except:
        pass

    xgb_params_.update(cur_param)
    return(xgb_params_)

def assemble_ddmatrix(cv_train, cv_test, method=None):
    """
    Assemble DMatrix for CPU/GPU.
    """
    if method == 'dask':
        if run_type == 'gpu':
            # See example: https://github.com/dmlc/xgboost/blob/master/demo/dask/gpu_training.py
            # `DaskDeviceQuantileDMatrix` is used instead of `DaskDMatrix`, be careful
            # that it can not be used for anything else than training.
            dcv_train = xgb.dask.DaskDeviceQuantileDMatrix(client, *cv_train)
            dcv_test = xgb.dask.DaskDeviceQuantileDMatrix(client, *cv_test)
        else:
            dcv_train = DaskDMatrix(client, *cv_train)
            dcv_test = DaskDMatrix(client, *cv_test)
            
    elif method=='local':
        if run_type == 'gpu':
            dcv_train = xgb.DeviceQuantileDMatrix(*cv_train)
            dcv_test = xgb.DeviceQuantileDMatrix(*cv_test)
        else:
            dcv_train = xgb.DMatrix(*cv_train)
            dcv_test = xgb.DMatrix(*cv_test)
    else:
        raise ValueError("Method not recognized. Select method in ['dask', 'local']")
    return(dcv_train, dcv_test)

def xgb_cv_training_dask(xgb_params_cur, grid_idx, fold_idx, dcv_train, dcv_test):
    """
    Use entire dask cluster each iteration for training. 
    """
    tic = time()
    xgb_fold_model = xgb.dask.train(client, xgb_params_cur, dcv_train, num_boost_round=num_trees, 
                                    evals=[(dcv_train, 'train'), (dcv_test, 'test')], verbose_eval=False)
    eval_history = xgb_fold_model['history']
        
    total_fold_train_time = np.round(time() - tic, 2)

    boost_history = {}
    for key, value in eval_history.items():
        boost_history[key+'-'+score_metric] = value[score_metric]

    boost_history = pd.DataFrame(boost_history).reset_index().rename(columns={'index': 'num_boost_round'})
    boost_history['grid_idx'] = grid_idx
    boost_history['fold_idx'] = fold_idx
    boost_history['fold_train_time'] = total_fold_train_time
    return(boost_history)


def GridSearchCV_dask_sequential(client, train_data_loc):
    """
    Sequential training of fully distributed dask xgboost model with grid search and cross-validation.
    """
    from dask_ml.model_selection import KFold

    if run_type == 'cpu':
        import dask.dataframe as hw
    elif run_type == 'gpu':
        import dask_cudf as hw

    df_enc_train = hw.read_parquet(train_data_loc).persist()
    bool_cols = list(df_enc_train.select_dtypes(bool).columns)
    df_enc_train[bool_cols] = df_enc_train[bool_cols].astype('int8') # DMatrix doesn't understand boolean
    
    cv_idx = df_enc_train['cv_idx'].to_dask_array(lengths=True)
    feature_cols = [cc for cc in df_enc_train.columns if cc not in [label_col, 'cv_idx']]
    X_train = df_enc_train[feature_cols].to_dask_array(lengths=True)
    y_train = df_enc_train[label_col].to_dask_array(lengths=True)

    # Grid search performed sequentially with each (fold_idx, grid_idx) using the entire dask cluster. 
    cv_results = []
    tic = time()
    iter_cnt = 0
    for fold_idx in range(num_folds):
        t_start = time()

        # Slice X_train into KFolds for cross-validation.
        Xii_train = X_train[cv_idx != fold_idx]
        yii_train = y_train[cv_idx != fold_idx]

        Xii_test = X_train[cv_idx == fold_idx]
        yii_test = y_train[cv_idx == fold_idx]
        
        dcv_train, dcv_test = assemble_ddmatrix([Xii_train, yii_train], [Xii_test, yii_test], method='dask')
        del Xii_train, yii_train, Xii_test, yii_test
        gc.collect()
        
        for grid_idx in range(num_grid_pts):
            xgb_params_cur = get_grid_pt(grid_idx)
            cv_results.append(xgb_cv_training_dask(xgb_params_cur, grid_idx, fold_idx, dcv_train, dcv_test))
            iter_cnt += 1
            print('Completed cv'+str(fold_idx)+', grid_idx='+str(grid_idx)+
                  '. Elapsed time within cv'+str(fold_idx)+': '+str(np.round(time()-t_start ,2))+'s.  ['+str(iter_cnt)+'/'+str(num_grid_pts*num_folds)+']')
        
        del dcv_train, dcv_test
        gc.collect
        
    del df_enc_train, X_train, y_train
    gc.collect()
    return(pd.concat(cv_results))

@dask.delayed
def GridSearchCV_per_node(fold_idx, data_loc):
    """
    Train single model per node. Make sure number of nodes is compatible with number of KFolds. 
    For example, 9-folds is compatible with [1,3,9] nodes and 8-fold compatible with [1,2,4,8], etc. 
    This reduces the impact of stragglers since each node operates on same amount of data and sweeps through all grid points.
    """
    t_start = time()
    if run_type == 'cpu':
        import pandas as hw
    elif run_type == 'gpu':
        import cudf as hw

    df_enc_train = hw.read_parquet(data_loc)
    
    if run_type == 'gpu':
        bool_cols = list(df_enc_train.select_dtypes(bool).columns)
        df_enc_train[bool_cols] = df_enc_train[bool_cols].astype('int8') # DMatrix doesn't understand boolean
    else:
        df_enc_train = df_enc_train.astype('float32')
        
    cv_idx = df_enc_train['cv_idx'].values
    feature_cols = [cc for cc in df_enc_train.columns if cc not in [label_col, 'cv_idx']]
    X_train = df_enc_train[feature_cols]
    y_train = df_enc_train[label_col]

    # Slice single fold and sweep across all grid points:
    # Slice X_train into KFolds for cross-validation.
    Xii_train = X_train[cv_idx != fold_idx]
    yii_train = y_train[cv_idx != fold_idx]

    Xii_test = X_train[cv_idx == fold_idx]
    yii_test = y_train[cv_idx == fold_idx]

    del df_enc_train, X_train, y_train
    gc.collect()
    
#     dcv_train, dcv_test = assemble_ddmatrix([Xii_train, yii_train], [Xii_test, yii_test], method='local')
    
    # Need to construct dmatrix within function. Calling external function has issues with dask.delayed pickling.
    if run_type == 'gpu':
        dcv_train = xgb.DeviceQuantileDMatrix(Xii_train, yii_train)
        dcv_test = xgb.DeviceQuantileDMatrix(Xii_test, yii_test)
    else:
        dcv_train = xgb.DMatrix(Xii_train, yii_train)
        dcv_test = xgb.DMatrix(Xii_test, yii_test)

    del Xii_train, yii_train, Xii_test, yii_test
    gc.collect()
    
    print('Loaded data and constructed DMatrix for cv'+str(fold_idx)+' in '+str(np.round(time()-t_start,2))+'s')
    
    cv_results = []
    iter_cnt = 0
    for grid_idx in range(num_grid_pts):
        tic = time()
        eval_history = {}
        xgb_params_cur = get_grid_pt(grid_idx)
        
        # Dask delayed doesn't like having eval_history dict in separate function.
        xgb_fold_model = xgb.train(xgb_params_cur, dcv_train, num_boost_round=num_trees, 
                                        evals=[(dcv_train, 'train'), (dcv_test, 'test')], evals_result=eval_history, verbose_eval=False)
        iter_cnt += 1
        print('Completed cv'+str(fold_idx)+', grid_idx='+str(grid_idx)+
              '. Elapsed time within cv'+str(fold_idx)+': '+str(np.round(time()-t_start ,2))+'s.  ['+str(iter_cnt)+'/'+str(num_grid_pts)+']')
        
        
        total_fold_train_time = np.round(time() - tic, 2)

        boost_history = {}
        for key, value in eval_history.items():
            boost_history[key+'-'+score_metric] = value[score_metric]

        boost_history = pd.DataFrame(boost_history).reset_index().rename(columns={'index': 'num_boost_round'})
        boost_history['grid_idx'] = grid_idx
        boost_history['fold_idx'] = fold_idx
        boost_history['fold_train_time'] = total_fold_train_time
        cv_results.append(boost_history)
        
    del dcv_train, dcv_test, xgb_fold_model
    gc.collect()
    return(pd.concat(cv_results))

if enable_gridSearchCV == True:
#     cv_strategy = 'node' # Select 'cluster' or 'node'. 'node' should have lower TCO if model can be trained in single node.
    num_trees = 200 # Number of boosting rounds. Held constant without early stopping.
    param_grid = {
#         'max_depth': [2], # range() cannot be saved to JSON. Use explicit list.
#         'eta': [0.1],
        'max_depth': list(range(4, 11, 2)), # range() cannot be saved to JSON. Use explicit list.
        'eta': [0.01, 0.05, 0.1] # eta==learning_rate
    }
    
    param_names = list(param_grid.keys())
    gridsearch_df = pd.DataFrame(list(itertools.product(*param_grid.values())), columns=param_names)
    num_grid_pts = reduce(lambda x,y: x*y, [len(param_grid[key]) for key in param_grid.keys()])
    
    print('Dask cross-validation strategy:', cv_strategy)
    print('Number of grid points:', str(num_grid_pts))
    print('Total number of evaluations (with cross-validation):', num_grid_pts*num_folds)

    tic = time()
    if cv_strategy == 'cluster':
        # Has issues when running multiple CV folds on dask. Mem not being released from dask.
        gs_results = GridSearchCV_dask_sequential(client, enc_output_dir+'/train')
    elif cv_strategy == 'node':
        gs_results_delayed = [GridSearchCV_per_node(fold_idx, enc_output_dir+'/train') for fold_idx in range(num_folds)]
#         progress(gs_results_delayed) # Show progress in CLI. Won't be in notebook unless last line of cell. Blocking until complete.
        gs_results = pd.concat(dask.compute(gs_results_delayed, scheduler=client)[0])
    
    t_gridSearchCV = np.round(time() - tic, 2)
    
    # Compute mean score per fold:
    gscv_tbl = (gs_results
                     .groupby(['grid_idx', 'num_boost_round'])
                     .mean()
                    ).drop(columns=['fold_idx']).rename(columns={
        'fold_train_time': 'mean_fold_train_time', 
        'train-'+score_metric: 'train-'+score_metric+'-mean', 'test-'+score_metric: 'test-'+score_metric+'-mean'})

    gscv_tbl = gscv_tbl.merge(gridsearch_df, left_on='grid_idx', right_index=True, how='left').reset_index()
    
    # Get best round within each grid point:
    ref_score_col = 'test-'+score_metric+'-mean'
    best_idx = gscv_tbl[ref_score_col].argmax()
    best_params_ = dict(gscv_tbl.iloc[best_idx])
    print('Best params:', best_params_)
    
    # Pick best score within each run_id, then use mean_fit_time to determine if model is desirable.
    # .idxmax() returns first occurence of maxima.
    best_idx_per_group = gscv_tbl.groupby('grid_idx')[ref_score_col].idxmax()
    gscv_tbl_sm = gscv_tbl.iloc[best_idx_per_group].sort_values(ref_score_col, ascending=False)
    
    print('Total grid search cv wall time: '+str(t_gridSearchCV) + 's')


Dask cross-validation strategy: node
Number of grid points: 12
Total number of evaluations (with cross-validation): 96
Best params: {'grid_idx': 2.0, 'num_boost_round': 49.0, 'train-auc-mean': 0.881754625, 'test-auc-mean': 0.842367625, 'mean_fold_train_time': 5.25625, 'max_depth': 4.0, 'eta': 0.1}
Total grid search cv wall time: 212.77s


# Output Results to JSON

In [10]:
# Record important variables/metrics to JSON output:
output_dict = {}

output_dict['cluster_type'] = cluster_type
output_dict['run_type'] = run_type
output_dict['data_format'] = data_format
output_dict['test_size'] = test_size
output_dict['enc_num_row_groups'] = enc_num_row_groups
output_dict['storage_backend'] = storage_backend
output_dict['t_encode'] = np.round(t_encode, 2)
output_dict['t_train'] = np.round(t_dask_train, 2)
output_dict['t_infer'] = np.round(t_dask_infer, 2)
output_dict['training_metrics'] = training_metrics
output_dict['inference_metrics_train'] = inference_metrics_train
output_dict['inference_metrics_test'] = inference_metrics_test

if enable_gridSearchCV == True:
    output_dict['t_gridSearchCV'] = t_gridSearchCV
#     output_dict['best_model_hold_out_test_score'] = best_model_hold_out_test_score
    output_dict['gridSearchCV_results'] = {
        'num_trees': num_trees,
        'param_grid': str(param_grid),
#         'cv_results_': str(gs_results.cv_results_),
#         'best_params_': str(gs_results.best_params_),
#         'best_score_': gs_results.best_score_,
#         'best_estimator_': str(gs_results.best_estimator_)
        'best_params_': str(best_params_),
        'best_score_by_group_': gscv_tbl_sm.to_json(), 
        'cv_results_': gscv_tbl.to_json(),   
    }
    

# if run_type == 'gpu':
#     output_dict['fil_metrics'] = fil_metrics

output_dict['xgb_params'] = xgb_params

# Load xgb model output json and insert into metrics file:
with open('/tmp/'+xgb_model_name+'.json') as json_file:
    xgb_model_json = json.load(json_file)

output_dict['xgb_save_model'] = xgb_model_json

output_dict['start_time'] = start_time_str
output_dict['end_time'] = end_time_str


# TODO: write metrics file to hdfs?

# Output ML log files:
if not os.path.exists('ml_logs'):
    os.makedirs('ml_logs')

with open('ml_logs/'+run_type+'_'+re.sub('[/: ]', '', start_time_str)+'.json', 'w') as json_file:
    json.dump(output_dict, json_file)
    
print('****************************')    
print('Dask xgboost '+run_type+' training time:', '{:0.2f}'.format(t_dask_train) + 's')
print('Total prediction time:', '{:0.2f}'.format(t_dask_infer) + 's')

if enable_gridSearchCV == True:
    print('Grid search time:', str(t_gridSearchCV) + 's')

# client.close()

****************************
Dask xgboost gpu training time: 10.78s
Total prediction time: 10.30s
Grid search time: 212.77s
