# Amazon SageMaker Batch Transform: Trigger a Batch Transform job with SageMaker Pipelines
_**Use SageMaker's XGBoost to train a binary classification model and for a list of tumors in batch file, predict if each is malignant**_

_**It also shows how to use the input output joining / filter feature in Batch transform in details**_

---



## Background
This purpose of this notebook is to train a model using SageMaker's XGBoost and UCI's breast cancer diagnostic data set to illustrate at how to run batch inferences and how to use the Batch Transform I/O join feature. UCI's breast cancer diagnostic data set is available at https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+%28Diagnostic%29. The data set is also available on Kaggle at https://www.kaggle.com/uciml/breast-cancer-wisconsin-data. The purpose here is to use this data set to build a predictve model of whether a breast mass image indicates benign or malignant tumor. 



---

## Setup

Let's start by specifying:

* The SageMaker role arn used to give training and batch transform access to your data. The snippet below will use the same role used by your SageMaker notebook instance. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.
* The S3 bucket that you want to use for training and storing model objects.

In [None]:
import os
import boto3
import sagemaker

role = sagemaker.get_execution_role()
sess = sagemaker.Session()

bucket = sess.default_bucket()
prefix = "DEMO-breast-cancer-prediction-xgboost-highlevel"

---
## Data sources

> Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

> Breast Cancer Wisconsin (Diagnostic) Data Set [https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+(Diagnostic)].

> _Also see:_ Breast Cancer Wisconsin (Diagnostic) Data Set [https://www.kaggle.com/uciml/breast-cancer-wisconsin-data].

## Data preparation


Let's download the data and save it in the local folder with the name data.csv and take a look at it.

In [None]:
import pandas as pd
import numpy as np

s3 = boto3.client("s3")

filename = "wdbc.csv"
s3.download_file("sagemaker-sample-files", "datasets/tabular/breast_cancer/wdbc.csv", filename)
data = pd.read_csv(filename, header=None)

# specify columns extracted from wbdc.names
data.columns = [
    "id",
    "diagnosis",
    "radius_mean",
    "texture_mean",
    "perimeter_mean",
    "area_mean",
    "smoothness_mean",
    "compactness_mean",
    "concavity_mean",
    "concave points_mean",
    "symmetry_mean",
    "fractal_dimension_mean",
    "radius_se",
    "texture_se",
    "perimeter_se",
    "area_se",
    "smoothness_se",
    "compactness_se",
    "concavity_se",
    "concave points_se",
    "symmetry_se",
    "fractal_dimension_se",
    "radius_worst",
    "texture_worst",
    "perimeter_worst",
    "area_worst",
    "smoothness_worst",
    "compactness_worst",
    "concavity_worst",
    "concave points_worst",
    "symmetry_worst",
    "fractal_dimension_worst",
]

# save the data
data.to_csv("data.csv", sep=",", index=False)

data.sample(8)

#### Key observations:
* The data has 569 observations and 32 columns.
* The first field is the 'id' attribute that we will want to drop before batch inference and add to the final inference output next to the probability of malignancy.
* Second field, 'diagnosis', is an indicator of the actual diagnosis ('M' = Malignant; 'B' = Benign).
* There are 30 other numeric features that we will use for training and inferencing.

Let's replace the M/B diagnosis with a 1/0 boolean value. 

In [None]:
data["diagnosis"] = data["diagnosis"].apply(lambda x: ((x == "M")) + 0)
data.sample(8)

Let's split the data as follows: 80% for training, 10% for validation and let's set 10% aside for our batch inference job. In addition, let's drop the 'id' field on the training set and validation set as 'id' is not a training feature. For our batch set however, we keep the 'id' feature. We'll want to filter it out prior to running our inferences so that the input data features match the ones of training set and then ultimately, we'll want to join it with inference result. We are however dropping the diagnosis attribute for the batch set since this is what we'll try to predict.

In [None]:
# data split in three sets, training, validation and batch inference
rand_split = np.random.rand(len(data))
train_list = rand_split < 0.8
val_list = (rand_split >= 0.8) & (rand_split < 0.9)
batch_list = rand_split >= 0.9

data_train = data[train_list].drop(["id"], axis=1)
data_val = data[val_list].drop(["id"], axis=1)
data_batch = data[batch_list].drop(["diagnosis"], axis=1)
data_batch_noID = data_batch.drop(["id"], axis=1)

Let's upload those data sets in S3

In [None]:
train_file = "train_data.csv"
data_train.to_csv(train_file, index=False, header=False)
sess.upload_data(train_file, key_prefix="{}/train".format(prefix))

validation_file = "validation_data.csv"
data_val.to_csv(validation_file, index=False, header=False)
sess.upload_data(validation_file, key_prefix="{}/validation".format(prefix))

batch_file = "batch_data.csv"
data_batch.to_csv(batch_file, index=False, header=False)
sess.upload_data(batch_file, key_prefix="{}/batch".format(prefix))

batch_file_noID = "batch_data_noID.csv"
data_batch_noID.to_csv(batch_file_noID, index=False, header=False)
sess.upload_data(batch_file_noID, key_prefix="{}/batch".format(prefix))

---

## Training job and model creation

The below cell uses the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick off the training job using both our training set and validation set. Not that the objective is set to 'binary:logistic' which trains a model to output a probability between 0 and 1 (here the probability of a tumor being malignant).

In [None]:
%%time
from time import gmtime, strftime

job_name = "xgb-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, job_name)
image = sagemaker.image_uris.retrieve(
    framework="xgboost", region=boto3.Session().region_name, version="1.5-1"
)

sm_estimator = sagemaker.estimator.Estimator(
    image,
    role,
    instance_count=1,
    instance_type="ml.m5.4xlarge",
    volume_size=50,
    input_mode="File",
    output_path=output_location,
    sagemaker_session=sess,
)

sm_estimator.set_hyperparameters(
    objective="binary:logistic",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    verbosity=0,
    num_round=100,
)

train_data = sagemaker.inputs.TrainingInput(
    "s3://{}/{}/train".format(bucket, prefix),
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
validation_data = sagemaker.inputs.TrainingInput(
    "s3://{}/{}/validation".format(bucket, prefix),
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)
data_channels = {"train": train_data, "validation": validation_data}

# Start training by calling the fit method in the estimator
sm_estimator.fit(inputs=data_channels, logs=True)

---

## Batch Transform

In SageMaker Batch Transform, we introduced 3 new attributes - __input_filter__, __join_source__ and __output_filter__. In the below cell, we use the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick-off several Batch Transform jobs using different configurations of these 3 new attributes. Please refer to [this page](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) to learn more about how to use them.




#### 1. Create a transform job with the default configurations
Let's first skip these 3 new attributes and inspect the inference results. We'll use it as a baseline to compare to the results with data processing.

In [None]:
input_location

In [None]:
%%time

sm_transformer = sm_estimator.transformer(1, "ml.m5.xlarge")

# start a transform job
input_location = "s3://{}/{}/batch/{}".format(
    bucket, prefix, batch_file_noID
)  # use input data without ID column
sm_transformer.transform(input_location, content_type="text/csv", split_type="Line")
sm_transformer.wait()

Let's inspect the output of the Batch Transform job in S3. It should show the list probabilities of tumors being malignant.

In [None]:
import re


def get_csv_output_from_s3(s3uri, batch_file):
    file_name = "{}.out".format(batch_file)
    match = re.match("s3://([^/]+)/(.*)", "{}/{}".format(s3uri, file_name))
    output_bucket, output_prefix = match.group(1), match.group(2)
    s3.download_file(output_bucket, output_prefix, file_name)
    return pd.read_csv(file_name, sep=",", header=None)

In [None]:
output_df = get_csv_output_from_s3(sm_transformer.output_path, batch_file_noID)
output_df.head(8)

In [None]:
output_location = sm_transformer.output_path
output_location

In [None]:
pipelines_output_location = "{}-pipelines".format(output_location)
pipelines_output_location

In [None]:
xgboost_model_name = sm_transformer.model_name
xgboost_model_name

## Orchestrate Jobs to run SageMaker Batch Transform with Amazon SageMaker Pipelines

Amazon SageMaker Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-built models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.

## Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `instance_type` - The `ml.*` instance type of the batch transform job.
* `batch_data_input` - The S3 bucket URI location of the batch data.

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

instance_type = ParameterString(name="BatchTransformInstanceType", default_value="ml.m5.xlarge")

batch_data_input = ParameterString(
    name="BatchDataInput",
    default_value=input_location,
)

## Define a Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.

Specifically, pass in the `ModelName`. The `CreateModelStep` `properties` attribute matches the object model of the [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) response object.

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()

In [None]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=xgboost_model_name,
    instance_type=instance_type,
    instance_count=1,
    output_path=pipelines_output_location,
    sagemaker_session=pipeline_session,
)

Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier.


In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


transform_step_args = transformer.transform(
    data=batch_data_input,
    content_type="text/csv",
    split_type="Line",
)

step_transform = TransformStep(
    name="MyBatchTransform", 
    step_args=transform_step_args
)

## Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the data dependency DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"MyBatchPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        instance_type,
        batch_data_input,
    ],
    steps=[step_transform],
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=role)

Start the pipeline and accept all the default parameters.

In [None]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [None]:
execution.describe()

Wait for the execution to complete.

In [None]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [None]:
execution.list_steps()

In [None]:
output_df = get_csv_output_from_s3(pipelines_output_location, batch_file_noID)
output_df.head(8)

### Start a pipeline with `ml.m5.2xlarge` instances

In [None]:
# Execute pipeline with explicit parameters
execution = pipeline.start(parameters=dict(BatchTransformInstanceType="ml.m5.2xlarge"))

In [None]:
execution.describe()

In [None]:
execution.wait()

In [None]:
execution.list_steps()