# Batch Predictions


Amazon SageMaker Batch Transform allows you to make predictions on batches of data in S3 without setting up a REST endpoint. Batch predictions are also called “offline” predictions since they do not require an online REST endpoint. Typically meant for higher-throughput workloads that can tolerate higher latency and lower freshness, batch prediction servers typically do not run 24 hours per day like real-time prediction servers. They run for a few hours on a batch of data, then shut down - hence the term, “batch.” Batch Transform manages all of the resources needed to perform the inferences including the launch and termination of the cluster after the job completes.

![](img/batch_transform_tensorflow.gif)

In [1]:
import boto3
import sagemaker
import pandas as pd

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

# Setup Batch Transform Model

In [2]:
%store -r training_job_name

In [3]:
print(training_job_name)

tensorflow-training-2020-07-25-18-45-23-722


In [4]:
!aws s3 cp s3://$bucket/$training_job_name/output/model.tar.gz ./model.tar.gz

download: s3://sagemaker-us-west-2-140773038493/tensorflow-training-2020-07-25-18-45-23-722/output/model.tar.gz to ./model.tar.gz


In [5]:
!tar -xvzf ./model.tar.gz

tensorflow/
tensorflow/saved_model/
tensorflow/saved_model/0/
tensorflow/saved_model/0/assets/
tensorflow/saved_model/0/variables/
tensorflow/saved_model/0/variables/variables.index
tensorflow/saved_model/0/variables/variables.data-00000-of-00001
tensorflow/saved_model/0/saved_model.pb
tensorboard/
transformers/
transformers/fine-tuned/
transformers/fine-tuned/config.json
transformers/fine-tuned/tf_model.h5
metrics/
metrics/confusion_matrix.png


In [6]:
!saved_model_cli show --all --dir ./tensorflow/saved_model/0/

2020-07-25 19:48:39.527435: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer.so.6'; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda-10.0/lib64:/usr/local/cuda-10.0/extras/CUPTI/lib64:/usr/local/cuda-10.0/lib:/usr/local/cuda-10.0/efa/lib:/opt/amazon/efa/lib:/opt/amazon/efa/lib64:/usr/lib64/openmpi/lib/:/usr/local/lib:/usr/lib:/usr/local/mpi/lib:/lib/:/usr/lib64/openmpi/lib/:/usr/local/lib:/usr/lib:/usr/local/mpi/lib:/lib/:/usr/lib64/openmpi/lib/:/usr/local/lib:/usr/lib:/usr/local/mpi/lib:/lib/:
2020-07-25 19:48:39.527516: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer_plugin.so.6'; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda-10.0/lib64:/usr/local/cuda-10.0/extras/CUPTI/lib64:/usr/local/cuda-10.0/lib:/usr/local/cuda-10.0/ef

In [7]:
!pygmentize ./src_batch_tsv/inference.py

[34mimport[39;49;00m [04m[36mjson[39;49;00m
[34mimport[39;49;00m [04m[36mtensorflow[39;49;00m [34mas[39;49;00m [04m[36mtf[39;49;00m
[34mfrom[39;49;00m [04m[36mtransformers[39;49;00m [34mimport[39;49;00m DistilBertTokenizer

review_body_column_idx_tsv = [34m13[39;49;00m

classes=[[34m1[39;49;00m, [34m2[39;49;00m, [34m3[39;49;00m, [34m4[39;49;00m, [34m5[39;49;00m]

max_seq_length=[34m128[39;49;00m

tokenizer = DistilBertTokenizer.from_pretrained([33m'[39;49;00m[33mdistilbert-base-uncased[39;49;00m[33m'[39;49;00m)

[34mdef[39;49;00m [32minput_handler[39;49;00m(data, context):
    transformed_instances = []

    [34mfor[39;49;00m instance [35min[39;49;00m data:

        data_str = instance.decode([33m'[39;49;00m[33mutf-8[39;49;00m[33m'[39;49;00m)

        data_str_split = data_str.split([33m'[39;49;00m[33m\t[39;49;00m[33m'[39;49;00m)
[37m#        print(len(data_str_split))[39;49;00m
        [34mif[39;49;

# Configure TensorFlow Serving for Batch Inference

In [8]:
from sagemaker.tensorflow.serving import Model

batch_env = {
  # Configures whether to enable record batching.
  'SAGEMAKER_TFS_ENABLE_BATCHING': 'true',

  # Name of the model - this is important in multi-model deployments
  'SAGEMAKER_TFS_DEFAULT_MODEL_NAME': 'saved_model',

  # Configures how long to wait for a full batch, in microseconds.
  'SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS': '50000', # microseconds

  # Corresponds to "max_batch_size" in TensorFlow Serving.
  'SAGEMAKER_TFS_MAX_BATCH_SIZE': '10000',

  # Number of seconds for the SageMaker web server timeout
  'SAGEMAKER_MODEL_SERVER_TIMEOUT': '7200', # Seconds

  # Configures number of batches that can be enqueued.
  'SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES': '10000'
}

# Configure the Parallelism and Payload Size
To increase performance, you can increase the max_concurrent_transforms parameter.  Tune this on a single instance before trying to scale out the number of instances - especially if you have a small file count, the multiple instances can be a big waste.  Note that `max_concurrent_transforms * max_payload <= 100`

In [9]:
max_concurrent_transforms=1
max_payload=1      # Megabytes (not number of records)

# Setup Instance Type and Instance Count for Our Cluster

In [10]:
instance_type='ml.m5.xlarge'
instance_count=1

# Setup Input Data and Configuration
This include Single vs. MultiRecord, compression_type, accept_type, content_type, split types, etc.

In [11]:
strategy='MultiRecord'
compression_type='Gzip'
accept_type='text/csv'
content_type='text/csv'
assemble_with='Line'
split_type='Line'

In [12]:
input_csv_s3_uri = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(input_csv_s3_uri)

s3://sagemaker-us-west-2-140773038493/amazon-reviews-pds/tsv/


In [13]:
!aws s3 ls --recursive $input_csv_s3_uri

2020-07-25 17:24:19   18997559 amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2020-07-25 17:24:22   27442648 amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


# Setup Batch Transformer 
We are using a previously-trained model specified at `model_s3_uri`.

In [14]:
model_s3_uri = 's3://{}/{}/output/model.tar.gz'.format(bucket, training_job_name)

batch_model = Model(entry_point='inference.py',
                    source_dir='src_batch_tsv',       
                    model_data=model_s3_uri,
                    role=role,
                    framework_version='2.1.0',
                    env=batch_env)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


In [15]:
batch_predictor = batch_model.transformer(strategy=strategy, 
                                          instance_type=instance_type,
                                          instance_count=instance_count,
                                          accept=accept_type,
                                          assemble_with=assemble_with,
                                          max_concurrent_transforms=max_concurrent_transforms,
                                          max_payload=max_payload, # This is in Megabytes (not number of records)
                                          env=batch_env)

'create_image_uri' will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.


# Start Batch Predictions

In [16]:
batch_predictor.transform(data=input_csv_s3_uri,
                          split_type=split_type,
                          compression_type=compression_type,
                          content_type=content_type,
#                          join_source='Input', # Mismatched line count between input and output
                          experiment_config=None,
                          wait=False)

In [17]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/transform-jobs/{}?region={}&tab=Monitor">Batch Prediction Job</a></b>'.format(region, batch_predictor.latest_transform_job.job_name, region)))


In [18]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/TransformJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a></b>'.format(region, batch_predictor.latest_transform_job.job_name)))


In [19]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/s3/buckets/{}/{}/?region={}">Batch Prediction S3 Output</a></b>'.format(bucket, batch_predictor.latest_transform_job.job_name, region)))


In [21]:
print('Waiting for batch prediction job: ' + batch_predictor.latest_transform_job.job_name)

batch_predictor.wait(logs=False)

Waiting for batch prediction job: tensorflow-inference-2020-07-25-19-49-2-2020-07-25-19-49-25-241
*


UnexpectedStatusException: Error for Transform job tensorflow-inference-2020-07-25-19-49-2-2020-07-25-19-49-25-241: Failed. Reason: AlgorithmError: See job logs for more information

# _Wait Until the ^^ Batch Transform Job ^^ Completes_

# Check Output Data

After the transform job has completed, download the output data from S3.

For each file in the input data, we have a corresponding file with a ".out" extension.  This .out file contains the predicted labels for each input row. 

In [22]:
# Download the output data from S3 to local filesystem
batch_prediction_output_s3_uri = batch_predictor.output_path

In [23]:
!aws s3 cp --recursive $batch_prediction_output_s3_uri/ batch_prediction_output/

In [24]:
!ls batch_prediction_output/

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();