# Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records
_**Use SageMaker's XGBoost to train a binary classification model and for a list of tumors in batch file, predict if each is malignant**_

_**It also shows how to use the input output joining / filter feature in Batch transform in details**_

---



## Background
This purpose of this notebook is to train a model using SageMaker's XGBoost and UCI's breast cancer diagnostic data set to illustrate at how to run batch inferences and how to use the Batch Transform I/O join feature. UCI's breast cancer diagnostic data set is available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29. The data set is also available on Kaggle at https://www.kaggle.com/uciml/breast-cancer-wisconsin-data. The purpose here is to use this data set to build a predictve model of whether a breast mass image indicates benign or malignant tumor. 



---

## Setup

Let's start by specifying:

* The SageMaker role arn used to give training and batch transform access to your data. The snippet below will use the same role used by your SageMaker notebook instance. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.
* The S3 bucket that you want to use for training and storing model objects.

In [24]:
import os
import boto3
import sagemaker

role = sagemaker.get_execution_role()
sess = sagemaker.Session()

bucket=sess.default_bucket()
prefix = 'sagemaker/breast-cancer-prediction-xgboost' # place to upload training files within the bucket

---
## Data preparation

Data Source: https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data
        https://www.kaggle.com/uciml/breast-cancer-wisconsin-data

Let's download the data and save it in the local folder with the name data.csv and take a look at it.

In [25]:
import pandas as pd
import numpy as np

data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data', header = None)

# specify columns extracted from wbdc.names
data.columns = ["id","diagnosis","radius_mean","texture_mean","perimeter_mean","area_mean","smoothness_mean",
                "compactness_mean","concavity_mean","concave points_mean","symmetry_mean","fractal_dimension_mean",
                "radius_se","texture_se","perimeter_se","area_se","smoothness_se","compactness_se","concavity_se",
                "concave points_se","symmetry_se","fractal_dimension_se","radius_worst","texture_worst",
                "perimeter_worst","area_worst","smoothness_worst","compactness_worst","concavity_worst",
                "concave points_worst","symmetry_worst","fractal_dimension_worst"] 

# save the data
data.to_csv("data.csv", sep=',', index=False)

data.sample(8)


Unnamed: 0,id,diagnosis,radius_mean,texture_mean,perimeter_mean,area_mean,smoothness_mean,compactness_mean,concavity_mean,concave points_mean,...,radius_worst,texture_worst,perimeter_worst,area_worst,smoothness_worst,compactness_worst,concavity_worst,concave points_worst,symmetry_worst,fractal_dimension_worst
453,911201,B,14.53,13.98,93.86,644.2,0.1099,0.09242,0.06895,0.06495,...,15.8,16.93,103.1,749.9,0.1347,0.1478,0.1373,0.1069,0.2606,0.0781
32,85382601,M,17.02,23.98,112.8,899.3,0.1197,0.1496,0.2417,0.1203,...,20.88,32.09,136.1,1344.0,0.1634,0.3559,0.5588,0.1847,0.353,0.08482
342,89827,B,11.06,14.96,71.49,373.9,0.1033,0.09097,0.05397,0.03341,...,11.92,19.9,79.76,440.0,0.1418,0.221,0.2299,0.1075,0.3301,0.0908
459,9112712,B,9.755,28.2,61.68,290.9,0.07984,0.04626,0.01541,0.01043,...,10.67,36.92,68.03,349.9,0.111,0.1109,0.0719,0.04866,0.2321,0.07211
37,854941,B,13.03,18.42,82.61,523.8,0.08983,0.03766,0.02562,0.02923,...,13.3,22.81,84.46,545.9,0.09701,0.04619,0.04833,0.05013,0.1987,0.06169
319,894335,B,12.43,17.0,78.6,477.3,0.07557,0.03454,0.01342,0.01699,...,12.9,20.21,81.76,515.9,0.08409,0.04712,0.02237,0.02832,0.1901,0.05932
53,857392,M,18.22,18.7,120.3,1033.0,0.1148,0.1485,0.1772,0.106,...,20.6,24.13,135.1,1321.0,0.128,0.2297,0.2623,0.1325,0.3021,0.07987
103,862980,B,9.876,19.4,63.95,298.3,0.1005,0.09697,0.06154,0.03029,...,10.76,26.83,72.22,361.2,0.1559,0.2302,0.2644,0.09749,0.2622,0.0849


#### Key observations:
* The data has 569 observations and 32 columns.
* The first field is the 'id' attribute that we will want to drop before batch inference and add to the final inference output next to the probability of malignancy.
* Second field, 'diagnosis', is an indicator of the actual diagnosis ('M' = Malignant; 'B' = Benign).
* There are 30 other numeric features that we will use for training and inferencing.

Let's replace the M/B diagnosis with a 1/0 boolean value. 

In [26]:
data['diagnosis']=data['diagnosis'].apply(lambda x: ((x =="M"))+0)
data.sample(8)

Unnamed: 0,id,diagnosis,radius_mean,texture_mean,perimeter_mean,area_mean,smoothness_mean,compactness_mean,concavity_mean,concave points_mean,...,radius_worst,texture_worst,perimeter_worst,area_worst,smoothness_worst,compactness_worst,concavity_worst,concave points_worst,symmetry_worst,fractal_dimension_worst
406,905189,0,16.14,14.86,104.3,800.0,0.09495,0.08501,0.055,0.04528,...,17.71,19.58,115.9,947.9,0.1206,0.1722,0.231,0.1129,0.2778,0.07012
122,865423,1,24.25,20.2,166.2,1761.0,0.1447,0.2867,0.4268,0.2012,...,26.02,23.99,180.9,2073.0,0.1696,0.4244,0.5803,0.2248,0.3222,0.08009
183,873843,0,11.41,14.92,73.53,402.0,0.09059,0.08155,0.06181,0.02361,...,12.37,17.7,79.12,467.2,0.1121,0.161,0.1648,0.06296,0.1811,0.07427
20,8510653,0,13.08,15.71,85.63,520.0,0.1075,0.127,0.04568,0.0311,...,14.5,20.49,96.09,630.5,0.1312,0.2776,0.189,0.07283,0.3184,0.08183
16,848406,1,14.68,20.13,94.74,684.5,0.09867,0.072,0.07395,0.05259,...,19.07,30.88,123.4,1138.0,0.1464,0.1871,0.2914,0.1609,0.3029,0.08216
316,894090,0,12.18,14.08,77.25,461.4,0.07734,0.03212,0.01123,0.005051,...,12.85,16.47,81.6,513.1,0.1001,0.05332,0.04116,0.01852,0.2293,0.06037
78,8610862,1,20.18,23.97,143.7,1245.0,0.1286,0.3454,0.3754,0.1604,...,23.37,31.72,170.3,1623.0,0.1639,0.6164,0.7681,0.2508,0.544,0.09964
165,8712291,0,14.97,19.76,95.5,690.2,0.08421,0.05352,0.01947,0.01939,...,15.98,25.82,102.3,782.1,0.1045,0.09995,0.0775,0.05754,0.2646,0.06085


Let's split the data as follows: 80% for training, 10% for validation and let's set 10% aside for our batch inference job. In addition, let's drop the 'id' field on the training set and validation set as 'id' is not a training feature. For our batch set however, we keep the 'id' feature. We'll want to filter it out prior to running our inferences so that the input data features match the ones of training set and then ultimately, we'll want to join it with inference result. We are however dropping the diagnosis attribute for the batch set since this is what we'll try to predict.

In [27]:
#data split in three sets, training, validation and batch inference
rand_split = np.random.rand(len(data))
train_list = rand_split < 0.8
val_list = (rand_split >= 0.8) & (rand_split < 0.9)
batch_list = rand_split >= 0.9

data_train = data[train_list].drop(['id'],axis=1)
data_val = data[val_list].drop(['id'],axis=1)
data_batch = data[batch_list].drop(['diagnosis'],axis=1)
data_batch_noID = data_batch.drop(['id'],axis=1)


Let's upload those data sets in S3

In [28]:
#note that we are stripping off the header of the panda frame.
train_file = 'train_data.csv'
data_train.to_csv(train_file,index=False,header=False)
sess.upload_data(train_file, key_prefix='{}/train'.format(prefix))

validation_file = 'validation_data.csv'
data_val.to_csv(validation_file,index=False,header=False)
sess.upload_data(validation_file, key_prefix='{}/validation'.format(prefix))

batch_file = 'batch_data.csv'
data_batch.to_csv(batch_file,index=False,header=False)
sess.upload_data(batch_file, key_prefix='{}/batch'.format(prefix))
    
batch_file_noID = 'batch_data_noID.csv'
data_batch_noID.to_csv(batch_file_noID,index=False,header=False)
sess.upload_data(batch_file_noID, key_prefix='{}/batch'.format(prefix))   


's3://sagemaker-us-east-2-328296961357/sagemaker/breast-cancer-prediction-xgboost/batch/batch_data_noID.csv'

---

## Training job and model creation

The below cell uses the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick off the training job using both our training set and validation set. Not that the objective is set to 'binary:logistic' which trains a model to output a probability between 0 and 1 (here the probability of a tumor being malignant).

In [29]:
%%time
from time import gmtime, strftime
from sagemaker.amazon.amazon_estimator import get_image_uri


job_name = 'xgb-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = 's3://{}/{}/output/{}'.format(bucket, prefix, job_name)
image = get_image_uri(boto3.Session().region_name, 'xgboost')

sm_estimator = sagemaker.estimator.Estimator(image,
                                             role,
                                             train_instance_count=1,
                                             train_instance_type='ml.m5.4xlarge',
                                             train_volume_size=50,
                                             input_mode='File',
                                             output_path=output_location,
                                             sagemaker_session=sess)

sm_estimator.set_hyperparameters(objective="binary:logistic",
                                 max_depth=5,
                                 eta=0.2,
                                 gamma=4,
                                 min_child_weight=6,
                                 subsample=0.8,
                                 silent=0,
                                 num_round=100)

train_data = sagemaker.session.s3_input('s3://{}/{}/train'.format(bucket, prefix), distribution='FullyReplicated', 
                                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input('s3://{}/{}/validation'.format(bucket, prefix), distribution='FullyReplicated', 
                                             content_type='text/csv', s3_data_type='S3Prefix')
data_channels = {'train': train_data, 'validation': validation_data}


# Start training by calling the fit method in the estimator
sm_estimator.fit(inputs=data_channels, logs=True)

	get_image_uri(region, 'xgboost', '0.90-1').


2020-03-12 23:27:43 Starting - Starting the training job...
2020-03-12 23:27:45 Starting - Launching requested ML instances...
2020-03-12 23:28:40 Starting - Preparing the instances for training......
2020-03-12 23:29:39 Downloading - Downloading input data
2020-03-12 23:29:39 Training - Downloading the training image..[34mArguments: train[0m
[34m[2020-03-12:23:29:53:INFO] Running standalone xgboost training.[0m
[34m[2020-03-12:23:29:53:INFO] File size need to be processed in the node: 0.13mb. Available memory size in the node: 54849.04mb[0m
[34m[2020-03-12:23:29:53:INFO] Determined delimiter of CSV input is ','[0m
[34m[23:29:53] S3DistributionType set as FullyReplicated[0m
[34m[23:29:53] 448x30 matrix with 13440 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2020-03-12:23:29:53:INFO] Determined delimiter of CSV input is ','[0m
[34m[23:29:53] S3DistributionType set as FullyReplicated[0m
[34m[23:29:53] 61x30 matrix with 1830 e


2020-03-12 23:30:05 Uploading - Uploading generated training model
2020-03-12 23:30:05 Completed - Training job completed
Training seconds: 42
Billable seconds: 42
CPU times: user 422 ms, sys: 4.83 ms, total: 426 ms
Wall time: 2min 41s


---

## Batch Transform

In SageMaker Batch Transform, we introduced 3 new attributes - __input_filter__, __join_source__ and __output_filter__. In the below cell, we use the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick-off several Batch Transform jobs using different configurations of these 3 new attributes. Please refer to [this page](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) to learn more about how to use them.




#### 1. Create a transform job with the default configurations
Let's first skip these 3 new attributes and inspect the inference results. We'll use it as a baseline to compare to the results with data processing.

In [38]:
%%time
# set up a transformer
sm_transformer = sm_estimator.transformer(instance_count = 1, 
                                          instance_type = 'ml.m4.xlarge')

# start a transform job. Uses the above model to perform off-line inference. 
input_location = 's3://{}/{}/batch/{}'.format(bucket, prefix, batch_file_noID) # use input data without ID column
sm_transformer.transform(data=input_location, split_type='Line')
sm_transformer.wait()



....................[34mArguments: serve[0m
[34m[2020-03-13 00:18:21 +0000] [1] [INFO] Starting gunicorn 19.7.1[0m
[34m[2020-03-13 00:18:21 +0000] [1] [INFO] Listening at: http://0.0.0.0:8080 (1)[0m
[34m[2020-03-13 00:18:21 +0000] [1] [INFO] Using worker: gevent[0m
[34m[2020-03-13 00:18:21 +0000] [38] [INFO] Booting worker with pid: 38[0m
[34m[2020-03-13 00:18:21 +0000] [39] [INFO] Booting worker with pid: 39[0m
[34m[2020-03-13:00:18:21:INFO] Model loaded successfully for worker : 38[0m
[34m[2020-03-13 00:18:21 +0000] [40] [INFO] Booting worker with pid: 40[0m
[34m[2020-03-13:00:18:21:INFO] Model loaded successfully for worker : 39[0m
[34m[2020-03-13:00:18:21:INFO] Model loaded successfully for worker : 40[0m
[34m[2020-03-13 00:18:21 +0000] [41] [INFO] Booting worker with pid: 41[0m
[34m[2020-03-13:00:18:21:INFO] Model loaded successfully for worker : 41[0m
[34m[2020-03-13:00:18:51:INFO] Sniff delimiter as ','[0m
[34m[2020-03-13:00:18:51:INFO] Determined deli

Let's inspect the output of the Batch Transform job in S3. This is the result of running inference of batch_file_noID on our trained XGBoost model. It should show the list probabilities of tumors being malignant.

In [39]:
import json
import io
from urllib.parse import urlparse

def get_csv_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')    



In [40]:
output = get_csv_output_from_s3(sm_transformer.output_path, '{}.out'.format(batch_file_noID))
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)
# we lost header information becasue the header was stripped off when we did panda->csv conversion above

Unnamed: 0,0
0,0.763916
1,0.037508
2,0.989652
3,0.989749
4,0.036365
5,0.168103
6,0.988446
7,0.026124


#### Now that we finished model inferencing, let us explore Filtering and Joining featues of SageMaker Batch Transform

![alt text](BatchTransformDiagram.png "Title")

#### 2. Join the input and the prediction results 
Now, let's associate the prediction results with their corresponding input records. We can also use the __input_filter__ to exclude the ID column easily and there's no need to have a separate file in S3.

* Set __input_filter__ to "$[1:]": indicates that we are excluding column 0 (the 'ID') before processing the inferences and keeping everything from column 1 to the last column (all the features or predictors)  
  
  
* Set __join_source__ to "Input": indicates our desire to join the input data with the inference results  

* Leave __output_filter__ to default ('$'), indicating that the joined input and inference results be will saved as output.

In [41]:
# content_type / accept and split_type / assemble_with are required to use IO joining feature
sm_transformer.assemble_with = 'Line'
sm_transformer.accept = 'text/csv'

# start a transform job
input_location = 's3://{}/{}/batch/{}'.format(bucket, prefix, batch_file) # use input data with ID column cause InputFilter will filter it out
sm_transformer.transform(input_location, split_type='Line', content_type='text/csv', input_filter='$[1:]', join_source='Input')
sm_transformer.wait()



....................[34mArguments: serve[0m
[34m[2020-03-13 00:25:49 +0000] [1] [INFO] Starting gunicorn 19.7.1[0m
[34m[2020-03-13 00:25:49 +0000] [1] [INFO] Listening at: http://0.0.0.0:8080 (1)[0m
[34m[2020-03-13 00:25:49 +0000] [1] [INFO] Using worker: gevent[0m
[34m[2020-03-13 00:25:49 +0000] [37] [INFO] Booting worker with pid: 37[0m
[34m[2020-03-13 00:25:49 +0000] [38] [INFO] Booting worker with pid: 38[0m
[34m[2020-03-13 00:25:49 +0000] [39] [INFO] Booting worker with pid: 39[0m
[34m[2020-03-13 00:25:49 +0000] [40] [INFO] Booting worker with pid: 40[0m
[34m[2020-03-13:00:25:50:INFO] Model loaded successfully for worker : 38[0m
[34m[2020-03-13:00:25:50:INFO] Model loaded successfully for worker : 39[0m
[34m[2020-03-13:00:25:50:INFO] Model loaded successfully for worker : 37[0m
[34m[2020-03-13:00:25:50:INFO] Model loaded successfully for worker : 40[0m

[34m[2020-03-13:00:26:09:INFO] Sniff delimiter as ','[0m
[34m[2020-03-13:00:26:09:INFO] Determined del

Let's inspect the output of the Batch Transform job in S3. It should show the list of tumors identified by their original feature columns and their corresponding probabilities of being malignant.

In [34]:
output = get_csv_output_from_s3(sm_transformer.output_path, '{}.out'.format(batch_file))
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,22,23,24,25,26,27,28,29,30,31
0,846381,15.85,23.95,103.7,782.7,0.08401,0.1002,0.09938,0.05364,0.1847,...,27.66,112.0,876.5,0.1131,0.1924,0.2322,0.1119,0.2809,0.06287,0.763916
1,8510426,13.54,14.36,87.46,566.3,0.09779,0.08129,0.06664,0.04781,0.1885,...,19.26,99.7,711.2,0.144,0.1773,0.239,0.1288,0.2977,0.07259,0.037508
2,855625,19.07,24.81,128.3,1104.0,0.09081,0.219,0.2107,0.09961,0.231,...,33.17,177.4,1651.0,0.1247,0.7444,0.7242,0.2493,0.467,0.1038,0.989652
3,857637,19.21,18.57,125.5,1152.0,0.1053,0.1267,0.1323,0.08994,0.1917,...,28.14,170.1,2145.0,0.1624,0.3511,0.3879,0.2091,0.3537,0.08294,0.989749
4,858981,8.598,20.98,54.66,221.8,0.1243,0.08963,0.03,0.009259,0.1828,...,27.04,62.06,273.9,0.1639,0.1698,0.09001,0.02778,0.2972,0.07712,0.036365
5,859471,9.029,17.33,58.79,250.5,0.1066,0.1413,0.313,0.04375,0.2111,...,22.65,65.5,324.7,0.1482,0.4365,1.252,0.175,0.4228,0.1175,0.168103
6,862028,15.06,19.83,100.3,705.6,0.1039,0.1553,0.17,0.08815,0.1855,...,24.23,123.5,1025.0,0.1551,0.4203,0.5203,0.2115,0.2834,0.08234,0.988446
7,862965,12.18,20.52,77.22,458.7,0.08013,0.04038,0.02383,0.0177,0.1739,...,32.84,84.58,547.8,0.1123,0.08862,0.1145,0.07431,0.2694,0.06878,0.026124


#### 3. Update the output filter to keep only ID and prediction results
Let's change __output_filter__ to "$[0,-1]", indicating that when presenting the output, we only want to keep column 0 (the 'ID') and the last column (the inference result i.e. the probability of a given tumor to be malignant)

In [35]:
# start another transform job
sm_transformer.transform(input_location, split_type='Line', content_type='text/csv', input_filter='$[1:]', join_source='Input', output_filter='$[0,-1]')
sm_transformer.wait()



.......................[34mArguments: serve[0m
[34m[2020-03-12 23:41:30 +0000] [1] [INFO] Starting gunicorn 19.7.1[0m
[34m[2020-03-12 23:41:30 +0000] [1] [INFO] Listening at: http://0.0.0.0:8080 (1)[0m
[34m[2020-03-12 23:41:30 +0000] [1] [INFO] Using worker: gevent[0m
[34m[2020-03-12 23:41:30 +0000] [40] [INFO] Booting worker with pid: 40[0m
[34m[2020-03-12 23:41:30 +0000] [41] [INFO] Booting worker with pid: 41[0m
[34m[2020-03-12 23:41:30 +0000] [42] [INFO] Booting worker with pid: 42[0m
[34m[2020-03-12 23:41:30 +0000] [43] [INFO] Booting worker with pid: 43[0m
[34m[2020-03-12:23:41:30:INFO] Model loaded successfully for worker : 41[0m
[34m[2020-03-12:23:41:30:INFO] Model loaded successfully for worker : 42[0m
[34m[2020-03-12:23:41:30:INFO] Model loaded successfully for worker : 40[0m
[34m[2020-03-12:23:41:30:INFO] Model loaded successfully for worker : 43[0m
[34m[2020-03-12:23:41:53:INFO] Sniff delimiter as ','[0m
[34m[2020-03-12:23:41:53:INFO] Determined d

Now, let's inspect the output of the Batch Transform job in S3 again. It should show 2 columns: the ID and their corresponding probabilities of being malignant.

In [36]:
output = get_csv_output_from_s3(sm_transformer.output_path, '{}.out'.format(batch_file))
output_df = pd.read_csv(io.StringIO(output), sep=",", header=None)
output_df.head(8)

Unnamed: 0,0,1
0,846381,0.763916
1,8510426,0.037508
2,855625,0.989652
3,857637,0.989749
4,858981,0.036365
5,859471,0.168103
6,862028,0.988446
7,862965,0.026124


In summary, we can use newly introduced 3 attributes - __input_filter__, __join_source__, __output_filter__ to 
1. Filter / select useful features from the input dataset. e.g. exclude ID columns.
2. Associate the prediction results with their corresponding input records.
3. Filter the original or joined results before saving to S3. e.g. keep ID and probability columns only.