# Multiprocess Ensembler
Two things are important here. Your time, and your results. Let's see if we can optimize for both! Use this notebook when you already have train, test, and validation data. Then you can train & tune a large number of models, and pull the results back in using an ensembling approach that takes the maximum prediction out of each classifier.

Finally, you'll use SageMaker Search to find the best performing models from your bucket, and run parallel batch transform jobs to run inference on all of your newly trained models.

In [2]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import boto3
import os
from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker
from sagemaker import get_execution_role
from sklearn.model_selection import train_test_split
import numpy as np

import sagemaker
from random import shuffle
import multiprocessing
from multiprocessing import Pool
import csv
import nltk
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

In [3]:
# put the name of your bucket here
bucket = 'ensemble-modeling'

sess = sagemaker.Session()
role = get_execution_role()
client = boto3.client('sagemaker')

### 1. Upload your train and test data sets
Make sure you have the label in the first column. If you don't already have a train and test set ready to go, try creating one with the amazon-sagemaker-example notebooks pre-installed on your notebook instance. That path is:
- amazon-sagemaker-examples/introduction_to_applying_machine_learning/xgboost_direct_marketing

If you run all cells in that notebook, you can copy the data to this directory with:

In [36]:
!cp /home/ec2-user/SageMaker/xgboost_direct_marketing_2019-08-12/train.csv .
!cp /home/ec2-user/SageMaker/xgboost_direct_marketing_2019-08-12/validation.csv .
!cp /home/ec2-user/SageMaker/xgboost_direct_marketing_2019-08-12/test.csv .

In [4]:
train = pd.read_csv('train.csv', names = list(range(89)))
validation = pd.read_csv('validation.csv', names = list(range(89)))

In [5]:
train_labels = np.array(train[0]).astype("float32")
train_features = np.array(train.drop(0, axis=1)).astype("float32")
val_labels = np.array(validation[0]).astype("float32")
val_features  = np.array(validation.drop(0, axis=1)).astype("float32")

### 2. Define functions

In [6]:
def get_base_estimator(clf, sess, role):

    container = get_image_uri(boto3.Session().region_name, clf)

    est = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(bucket, clf),
                                    sagemaker_session=sess)
    return est

In [7]:
def get_estimator(clf, sess, role):
    
    container = get_image_uri(boto3.Session().region_name, clf)

    
    if clf == 'xgboost':
        est = get_base_estimator(clf, sess, role)
        est.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)
        
    elif clf == 'linear-learner':
        
        est = sagemaker.LinearLearner(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                               num_classes=2)

    elif clf == 'knn':
        est = sagemaker.KNN(role=sagemaker.get_execution_role(),
                                              k = 10,
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='classifier',
                                                sample_size = 200)
        

        
        
    elif clf == 'factorization-machines':
        est = sagemaker.FactorizationMachines(role=sagemaker.get_execution_role(),
                                               train_instance_count=1,
                                               train_instance_type='ml.m4.xlarge',
                                               predictor_type='binary_classifier',
                                                num_factors = 2)
        
        
    return est

In [8]:
def copy_to_s3(bucket):
    os.system('aws s3 cp train.csv s3://{}/train/train.csv'.format(bucket))
    os.system('aws s3 cp test.csv s3://{}/test/test.csv'.format(bucket))
    os.system('aws s3 cp validation.csv s3://{}/validation/validation.csv'.format(bucket))
        
copy_to_s3(bucket)

In [9]:
def get_tuner(clf, est):
        
    # this should search for the most recent hyperparameter tuning job, pull it in, and use for a warm start
        
    if clf == 'xgboost':
        objective_metric_name = 'validation:auc'

        hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),
                        'min_child_weight': ContinuousParameter(1, 10),
                        'alpha': ContinuousParameter(0, 2),
                        'max_depth': IntegerParameter(1, 10)}
        
    elif clf == 'knn':
        
        objective_metric_name = 'test:accuracy'

        hyperparameter_ranges = {'k': IntegerParameter(1, 1024),
                        'sample_size': IntegerParameter(256, 20000000)}
        
    elif clf == 'linear-learner':
        objective_metric_name = 'test:recall'
        
        hyperparameter_ranges = {'l1': ContinuousParameter(0.0000001,1),
                            'use_bias': CategoricalParameter([True, False])}
        
    elif clf == 'factorization-machines':
        objective_metric_name = 'test:binary_classification_accuracy'
        
        hyperparameter_ranges = {'bias_wd': IntegerParameter(1, 1000)}
        
    tuner = HyperparameterTuner(est,
                    objective_metric_name,
                    hyperparameter_ranges,
                    max_jobs=30,
                    max_parallel_jobs=3)
    
    return tuner

In [10]:
def run_training_job(clf):

    
    # this should loop through splits in k-fold cross validation
    
    # build the estimator
    est = get_estimator(clf, sess, role)

    # get the hyperparameter tuner config 
    if clf == 'xgboost':
        
        tuner = get_tuner(clf, est)
        
        
        tuner.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

    else:
        # set the records
        train_records = est.record_set(train_features, train_labels, channel='train')
        test_records = est.record_set(val_features, val_labels, channel='validation')

        tuner = get_tuner(clf, est)
        
        tuner.fit([train_records, test_records])

In [11]:
def magic_loop(models_to_run):
    pool = Pool(processes=multiprocessing.cpu_count())
    transformed_rows = pool.map(run_training_job, models_to_run)
    pool.close() 
    pool.join()

In [12]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/train'.format(bucket), content_type='csv')

s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/validation/'.format(bucket), content_type='csv')

### 3. Define the models you want to use

In [46]:
# clfs = ['xgboost', 'linear-learner', 'factorization-machines', 'knn']

clfs = [ 'xgboost']

magic_loop(clfs)

### 4. Select the best models
Now, we're going to use SageMaker search to find the best performing models from the hyperparameter tuning jobs we just ran.

In [13]:
import boto3
smclient = boto3.client(service_name='sagemaker')

# Search the training job by Amazon S3 location of model artifacts
search_params={
   "MaxResults": 100,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "InputDataConfig.DataSource.S3DataSource.S3Uri",
            "Operator": "Contains",
             
             # set this to have a word that is in your bucket name
            "Value": '{}'.format(bucket)
         },
        { 
            "Name": "TrainingJobStatus",
            "Operator": "Equals",
            "Value": 'Completed'
         }, 
    ],
     
   },
    
    "SortBy": "Metrics.validation:auc",
    "SortOrder": "Descending"
}
results = smclient.search(**search_params)

In [14]:
from sagemaker.model import Model

def get_models(results):

    role = sagemaker.get_execution_role()

    models = []

    for each in results['Results']:

        job_name = each['TrainingJob']['TrainingJobName']


        artifact = each['TrainingJob']['ModelArtifacts']['S3ModelArtifacts']

        # get training image
        image =  each['TrainingJob']['AlgorithmSpecification']['TrainingImage']

        m = Model(artifact, image, role = role, sagemaker_session = sess, name = job_name)

        models.append(m)
        
    return models[:15]

models = get_models(results)

### 5. Ensemble Batch Transform
Now, we're going to run a separate batch transform job for each model. 

In [51]:
def run_batch_transform(model, bucket):

    transformer = model.transformer(
        instance_count=1,
        instance_type='ml.m4.xlarge',
        output_path='s3://{}/batch_results/{}'.format(bucket, model.name)
    )

    transformer.transform(data='s3://{}/test/test.csv'.format(bucket), content_type='text/csv')

    
for model in models:
    run_batch_transform(model, bucket)

Using already existing model: ee-190801-1852-030-9332e42c
Using already existing model: ee-190801-1852-027-c1697b33
Using already existing model: xgboost-190801-1850-024-bb419b7b
Using already existing model: ee-190801-1852-007-e64ab3dc
Using already existing model: ee-190801-1852-017-d9b8d13d
Using already existing model: xgboost-190801-1850-016-53897fd8
Using already existing model: xgboost-190801-1850-004-6a0fda05
Using already existing model: ee-190801-1852-016-5401644a
Using already existing model: xgboost-190801-1850-006-2f5b942e
Using already existing model: ee-190801-1852-028-19d1cc27
Using already existing model: xgboost-190801-1850-029-5cfc670c
Using already existing model: ee-190801-1852-018-43d4a224
Using already existing model: ee-190801-1852-011-97ee218e
Using already existing model: xgboost-190801-1850-005-66a65cac
Using already existing model: ee-190801-1852-010-03ce44d3


### 6. Consolidate batch results
Finally, we'll pull together all of the batch job inferences. For each one, we'll take the maximum confidence level and consider that a positive prediction. Then we'll see how well that performs, relative to using a single XGBoost model. 

This next cell is going to copy everything in the S3 bucket path under batch results to your local notebook instance.

In [53]:
os.system('aws s3 sync s3://{}/batch_results/ /home/ec2-user/SageMaker/batch_results/'.format(bucket))

0

In [24]:
def get_dataframe():
    '''
    Loops through the directory on your local notebook instance where the batch results were stored, 
        and generates a dataframe where each column is the output from a different model.
    '''
    frames  = []
    
    for sub_dir in os.listdir('/home/ec2-user/SageMaker/batch_results'):
        if '.ipynb' not in sub_dir and '.out' not in sub_dir:

            old_file = '/home/ec2-user/SageMaker/batch_results/{}/test.csv.out'.format(sub_dir)
            
            new_file = '/home/ec2-user/SageMaker/batch_results/{}/test.csv'.format(sub_dir)
            
            # remove the .out file formate
            os.system('cp {} {}'.format( old_file, new_file))
            
            df = pd.read_csv('/home/ec2-user/SageMaker/batch_results/{}/test.csv'.format(sub_dir), names = [sub_dir])

            frames.append(df)
            
    df = pd.concat(frames, axis=1)
                
    return df

In [40]:
def consolidate_results(df):

    df['max'] = 0
    df['min'] = 0
    df['diff'] = 0

    for idx, row in df.iterrows():

        top = max(row)
        bottom = min(row)

        diff = top - bottom

        df.loc[idx, 'max'] = top
        df.loc[idx, 'min'] = bottom
        df.loc[idx, 'diff'] = diff

    return df

bare_df = get_dataframe()
consolidated_df = consolidate_results(bare_df)

In [42]:
def add_label_to_results(df):
    test_data = pd.read_csv('test.csv')
    y_true = test_data['0'].values.tolist()
    df['y_true'] = y_true
    return df
    
    
df = add_label_to_results(consolidated_df)

In [43]:
df.head()

Unnamed: 0,xgboost-190730-1958-013-578ad74a,xgboost-190730-2044-005-b614948b,xgboost-190730-2020-026-b0f76816,xgboost-190730-1958-023-2e4c37ce,xgboost-190730-2020-029-b7c0ecf1,xgboost-190730-2020-028-3a863835,xgboost-190730-2044-004-739de6d8,xgboost-190730-2044-022-2352eede,xgboost-190730-2044-021-949ec8ac,xgboost-190730-1958-015-b671b4dc,xgboost-190730-2020-019-c5dba7b3,xgboost-190730-1958-024-b8a2fd71,xgboost-190730-2044-019-440eba24,xgboost-190730-1958-021-72e38862,xgboost-190730-2020-030-483f572c,max,min,diff,y_true
0,0.089065,0.092084,0.108009,0.094674,0.098098,0.09716,0.103718,0.100188,0.098029,0.086106,0.090717,0.074131,0.09253,0.098769,0.104903,0.108009,0.0,0.108009,1
1,0.189402,0.179116,0.210121,0.141365,0.208647,0.168847,0.176325,0.168533,0.171908,0.179298,0.190306,0.186766,0.182808,0.195914,0.189338,0.210121,0.0,0.210121,1
2,0.242742,0.254854,0.273822,0.267924,0.250157,0.256963,0.293109,0.282113,0.277161,0.222159,0.27004,0.251583,0.278672,0.275336,0.251772,0.293109,0.0,0.293109,1
3,0.029594,0.035632,0.021417,0.031481,0.026864,0.030731,0.039072,0.040475,0.040877,0.030913,0.036439,0.02426,0.038267,0.028601,0.035436,0.040877,0.0,0.040877,0
4,0.031521,0.032894,0.03192,0.040215,0.029252,0.033342,0.038584,0.039658,0.040352,0.031937,0.035477,0.037843,0.03744,0.028855,0.031971,0.040352,0.0,0.040352,0


### 7. Generate Confusion Matrix
At the end, let's chart a plot for the performance of each of these models. Did the ensembling help? Which model appears to be the best?

In [61]:
def get_confusion_matrix(df, model_column, accuracy=None):
    
    mx = pd.crosstab(index=df['y_true'], columns=np.round(df[model_column]), rownames=['actuals'], colnames=['predictions'])

    # lower right corner
    tps = mx.iloc[1, 1]
        
    # upper right corner
    fps = mx.iloc[0, 1]
    
    # lower left corner
    fns = mx.iloc[1, 0]
    
    recall = np.round(tps / (tps + fns), 4) * 100
    
    precision = np.round(tps / (tps + fps), 4) * 100
    
    print ('Precision = {}%, Recall = {}%'.format(precision, recall))
    
    if accuracy:
        
        # upper left corner 
        tns = mx.iloc[0, 0]
        
        accuracy = (tps + tns) / (fns + fps + tps + tns) * 100
        
        print ('Overall binary classification accuracy = {}%'.format(accuracy))
        
    return mx

### Results Without Ensembling

In [63]:
get_confusion_matrix(df,'xgboost-190730-2044-004-739de6d8', accuracy=True)

Precision = 19.25%, Recall = 67.88%
Overall binary classification accuracy = 89.46090335114133%


predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3591,44
1,390,93


### Results With Ensembling

In [64]:
get_confusion_matrix(df, 'max', accuracy=True)

Precision = 26.090000000000003%, Recall = 61.760000000000005%
Overall binary classification accuracy = 89.43661971830986%


predictions,0.0,1.0
actuals,Unnamed: 1_level_1,Unnamed: 2_level_1
0,3557,78
1,357,126


Interesting. Using this method we saw precision increase by 7 percentage points, but recall dropped by 7 points. Overall classfication accuracy did not seem to increase. 