# Multithreaded Ensembling
Two things are important here. Your time, and your results. Let's see if we can optimize for both! Use this notebook when you already have train, test, and validation data. Then you can train & tune a large number of models, and pull the results back in using an ensembling approach that takes the maximum prediction out of each classifier.

Finally, you'll use SageMaker Search to find the best performing models from your bucket, and run multi-threaded batch transform jobs to run inference on all of your newly trained models.

In [29]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import boto3
import os
from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker
from sagemaker import get_execution_role
from sklearn.model_selection import train_test_split
import numpy as np

import sagemaker
from random import shuffle
import multiprocessing
from multiprocessing import Pool
import csv
import nltk
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

### 1. Upload your train and test data sets
Make sure you have the label in the first column.

In [30]:
train = pd.read_csv('train.csv', names = list(range(89)))
test = pd.read_csv('test.csv', names = list(range(89)))

In [31]:
train_labels = np.array(train[0]).astype("float32")
train_features = np.array(train.drop(0, axis=1)).astype("float32")
test_labels = np.array(test[0]).astype("float32")
test_features  = np.array(test.drop(0, axis=1)).astype("float32")

### 2. Define functions

In [32]:
def get_base_estimator(clf, sess, role):

    container = get_image_uri(boto3.Session().region_name, clf)

    est = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, clf),
                                    sagemaker_session=sess)
    return est

In [33]:
def get_estimator(clf, sess, role):
    
    container = get_image_uri(boto3.Session().region_name, clf)

    
    if clf == 'xgboost':
        est = get_base_estimator(clf, sess, role)
        est.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)
        
    elif clf == 'linear-learner':
        
        est = sagemaker.LinearLearner(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                               num_classes=2)

    elif clf == 'knn':
        est = sagemaker.KNN(role=sagemaker.get_execution_role(),
                                              k = 10,
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='classifier',
                                                sample_size = 200)
        

        
        
    elif clf == 'factorization-machines':
        est = sagemaker.FactorizationMachines(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                                num_factors = 2)
        
        
    return est

In [34]:
def copy_to_s3():
    os.system('!aws s3 cp train.csv s3://ensemble-modeling/csv/train/train.csv')
    os.system('!aws s3 cp test.csv s3://ensemble-modeling/csv/test/test.csv')
    os.system('!aws s3 cp test.csv s3://ensemble-modeling/csv/validation/validation.csv')
        
copy_to_s3()

In [35]:
def get_tuner(clf, est):
        
    if clf == 'xgboost':
        objective_metric_name = 'validation:auc'

        hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                        'min_child_weight': ContinuousParameter(1, 10),
                        'alpha': ContinuousParameter(0, 2),
                        'max_depth': IntegerParameter(1, 10)}
        
    elif clf == 'knn':
        
        objective_metric_name = 'test:accuracy'

        hyperparameter_ranges = {'k': IntegerParameter(1, 1024),
                        'sample_size': IntegerParameter(256, 20000000)}
        
    elif clf == 'linear-learner':
        objective_metric_name = 'test:recall'
        
        hyperparameter_ranges = {'l1': ContinuousParameter(0.0000001,1),
                            'use_bias': CategoricalParameter([True, False])}
        
    elif clf == 'factorization-machines':
        objective_metric_name = 'test:binary_classification_accuracy'
        
        hyperparameter_ranges = {'bias_wd': IntegerParameter(1, 1000)}
        
    tuner = HyperparameterTuner(est,
                    objective_metric_name,
                    hyperparameter_ranges,
                    max_jobs=30,
                    max_parallel_jobs=3)
    
    return tuner

In [38]:
def run_training_job(clf):

    # build the estimator
    est = get_estimator(clf, sess, role)

    # get the hyperparameter tuner config 
    if clf == 'xgboost':
        
        tuner = get_tuner(clf, est)
        
        
        tuner.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

    else:
        # set the records
        train_records = est.record_set(train_features, train_labels, channel='train')
        test_records = est.record_set(test_features, test_labels, channel='validation')

        tuner = get_tuner(clf, est)
        
        tuner.fit([train_records, test_records])
    
    
run_training_job('linear-learner')

In [24]:
def magic_loop(models_to_run):
    pool = Pool(processes=multiprocessing.cpu_count())
    transformed_rows = pool.map(run_training_job, models_to_run)
    pool.close() 
    pool.join()

In [37]:
sess = sagemaker.Session()
role = get_execution_role()
client = boto3.client('sagemaker')
bucket = 'ensemble-modeling'

s3_input_train = sagemaker.s3_input(s3_data='s3://{}/train'.format(bucket), content_type='csv')
s3_input_test = sagemaker.s3_input(s3_data='s3://{}/test/'.format(bucket), content_type='csv')

# XGboost only likes a validation channel for hyperparameter tuning, not a test channel. So we'll set that up
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/validation/'.format(bucket), content_type='csv')

### 3. Define the models you want to use

In [39]:
# clfs = ['xgboost', 'linear-learner', 'factorization-machines', 'knn']

clfs = [ 'xgboost']

magic_loop(clfs)

### 4. Select the best models
Now, we're going to use SageMaker search to find the best performing models from the hyperparameter tuning jobs we just ran.

In [65]:
import boto3
smclient = boto3.client(service_name='sagemaker')

# Search the training job by Amazon S3 location of model artifacts
search_params={
   "MaxResults": 100,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "InputDataConfig.DataSource.S3DataSource.S3Uri",
            "Operator": "Contains",
             
             # set this to have a word that is in your bucket name
            "Value": 'ensemble'
         },
        { 
            "Name": "TrainingJobStatus",
            "Operator": "Equals",
            "Value": 'Completed'
         }, 
    ],
     
   },
    
    "SortBy": "Metrics.validation:auc",
    "SortOrder": "Descending"
}
results = smclient.search(**search_params)

In [67]:
from sagemaker.model import Model

def get_models(results):

    role = sagemaker.get_execution_role()

    models = []

    for each in results['Results']:

        job_name = each['TrainingJob']['TrainingJobName']


        artifact = each['TrainingJob']['ModelArtifacts']['S3ModelArtifacts']

        # get training image
        image =  each['TrainingJob']['AlgorithmSpecification']['TrainingImage']

        m = Model(artifact, image, role = role, sagemaker_session = sess, name = job_name)

        models.append(m)
        
    return models[:15]

models = get_models(results)

### 5. Ensemble Batch Transform
Now, we're going to run a separate batch transform job for each model. 

In [123]:
test_data = pd.read_csv('actual_test.csv')

In [125]:
test_data.shape

(4118, 60)

In [86]:
test_df = test_data.drop('0', axis=1)

test_df.to_csv('test_data.csv', index=False, header=False)

In [88]:
!aws s3 cp test_data.csv s3://ensemble-modeling/batch_test/test.csv

upload: ./test_data.csv to s3://ensemble-modeling/batch_test/test.csv


In [91]:
def run_batch_transform(model):

    transformer = model.transformer(
        instance_count=1,
        instance_type='ml.m4.xlarge',
        output_path='s3://ensemble-modeling/batch_results/{}'.format(model.name)
    )

    transformer.transform(data='s3://ensemble-modeling/batch_test/test.csv', content_type='text/csv')

    
for model in models:
    run_batch_transform(model)

Using already existing model: xgboost-190730-1958-024-b8a2fd71


In [92]:
# pool = Pool(processes=multiprocessing.cpu_count())
# transformed_rows = pool.map(run_batch_transform, models)
# pool.close() 
# pool.join()

### 6. Consolidate batch results
Finally, we'll pull together all of the batch job inferences. For each one, we'll take the maximum confidence level and consider that a positive prediction. Then we'll see how well that performs, relative to using a single XGBoost model. 

In [103]:
# !aws s3 sync s3://ensemble-modeling/batch_results/ /home/ec2-user/SageMaker/batch_results/

In [104]:
y_true = test_data['0'].values.tolist()

In [142]:
def get_dataframe(y_true):
    
    frames  = []
    
    for sub_dir in os.listdir('/home/ec2-user/SageMaker/batch_results'):
        if '.ipynb' not in sub_dir:

            old_file = '/home/ec2-user/SageMaker/batch_results/{}/test.csv.out'.format(sub_dir)
            
            new_file = '/home/ec2-user/SageMaker/batch_results/{}/test.csv'.format(sub_dir)
            
            os.system('cp {} {}'.format( old_file, new_file))
            
            df = pd.read_csv('/home/ec2-user/SageMaker/batch_results/{}/test.csv'.format(sub_dir), names = [sub_dir])

            frames.append(df)
            
    df = pd.concat(frames, axis=1)
    
    df['y_true'] = y_true
            
    return df
        
df = get_dataframe(y_true)

In [149]:
def consolidate_results(df):

    df['max'] = 0
    df['min'] = 0
    df['diff'] = 0

    for idx, row in df.iterrows():
        top = max(row)
        bottom = min(row)

        diff = top - bottom


        df.at[idx, 'max'] = top
        df.at[idx, 'min'] = bottom
        df.at[idx, 'diff'] = diff
        
    return df

df = consolidate_results(df)

In [155]:
df.head()

Unnamed: 0,xgboost-190730-2044-022-2352eede,xgboost-190730-2044-004-739de6d8,xgboost-190730-2020-030-483f572c,xgboost-190730-2020-029-b7c0ecf1,xgboost-190730-2044-005-b614948b,xgboost-190730-2044-019-440eba24,xgboost-190730-2020-028-3a863835,xgboost-190730-1958-021-72e38862,xgboost-190730-2020-019-c5dba7b3,xgboost-190730-2044-021-949ec8ac,xgboost-190730-1958-024-b8a2fd71,xgboost-190730-1958-023-2e4c37ce,xgboost-190730-1958-015-b671b4dc,xgboost-190730-2020-026-b0f76816,xgboost-190730-1958-013-578ad74a,y_true,max,min,diff
0,0.100188,0.103718,0.104903,0.098098,0.092084,0.09253,0.09716,0.098769,0.090717,0.098029,0.074131,0.094674,0.086106,0.108009,0.089065,1,1,0,1
1,0.168533,0.176325,0.189338,0.208647,0.179116,0.182808,0.168847,0.195914,0.190306,0.171908,0.186766,0.141365,0.179298,0.210121,0.189402,1,1,0,1
2,0.282113,0.293109,0.251772,0.250157,0.254854,0.278672,0.256963,0.275336,0.27004,0.277161,0.251583,0.267924,0.222159,0.273822,0.242742,1,1,0,1
3,0.040475,0.039072,0.035436,0.026864,0.035632,0.038267,0.030731,0.028601,0.036439,0.040877,0.02426,0.031481,0.030913,0.021417,0.029594,0,0,0,0
4,0.039658,0.038584,0.031971,0.029252,0.032894,0.03744,0.033342,0.028855,0.035477,0.040352,0.037843,0.040215,0.031937,0.03192,0.031521,0,0,0,0


### 7. Generate Confusion Matrix
At the end, let's chart a plot for the performance of each of these models. Did the ensembling help? Which model appears to be the best?

### Results without ensembling

In [184]:
def get_confusion_matrix(df, model_column):
    
    mx = pd.crosstab(index=df['y_true'], columns=np.round(df[model_column]), rownames=['actuals'], colnames=['predictions'])

    # lower right corner
    tps = mx.iloc[1, 1]
        
    # upper right corner
    fps = mx.iloc[0, 1]
    
    # lower left corner
    fns = mx.iloc[1, 0]
    
    precision = np.round(tps / (tps + fns), 4) * 100
    
    recall = np.round(tps / (tps + fps), 4) * 100
    
    print ('Precision = {}%, Recall = {}%'.format(precision, recall))
    
    return mx

get_confusion_matrix(df,'xgboost-190730-2044-022-2352eede')

Precision = 19.46%, Recall = 70.15%


predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3595,40
1,389,94


### Results With Ensembling

In [185]:
get_confusion_matrix(df, 'max')

Precision = 100.0%, Recall = 100.0%


predictions,0,1
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3635,0
1,0,483


Amazing! In this small example, we were able to see remarkable performance, simply by dramatically increasing the number of models we used to make inference.