# SageMaker Image Classification Built-In Algorithm

## Introduction 
The Amazon SageMaker image classification algorithm is a supervised learning algorithm that supports multi-label classification. It takes an image as input and outputs one or more labels assigned to that image. It uses a convolutional neural network (ResNet) that can be trained from scratch or trained using transfer learning when a large number of training images are not available.

The outline of this notebook is 

1. Prepare images into RecordIO format

2. Train the SageMaker Image Classification built-in algorithm 

3. Create and deploy the model to an endpoint for doing inference 

4. Test realtime inference with the endpoint

5. Do batch inference using SageMaker Batch Transform

Lets start by importing some base libraries and some initial variables

In the cell below, replace **'your-unique-bucket-name'** with the name of bucket you created in the data-prep notebook

In [None]:
%%time
import boto3
import os
import re
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri

role = get_execution_role()

bucket = 'your-unique-bucket-name'

training_image = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='image-classification')

Install mxnet so we can use some of the tools to create RecordIO format datasets

In [None]:
! git clone https://github.com/apache/incubator-mxnet.git

## Data Preparation

Lets first list out the folders in our data folder 

In [None]:
! ls -1 ../data

Now we create a folder to store our RecordIO files

In [None]:
! mkdir recordio_dataset

We will now build our train and validation datasets in recordio format
First we generate list files using im2rec.py from mxnet <br>
The output will show the class label and its assigned number (implied from the folder structure)<br>
i.e.<br>
Priority 0<br>
Roundabout 1<br>
Signal 2

In [None]:
! python incubator-mxnet/tools/im2rec.py recordio_dataset/train ../data/train --recursive --list --num-thread 8

In [None]:
! python incubator-mxnet/tools/im2rec.py recordio_dataset/validation ../data/val --recursive --list --num-thread 8

Now we have generated the list files, we will use them to generate the respective training and validation recordio files

In [None]:
! python incubator-mxnet/tools/im2rec.py recordio_dataset/train.lst ../data/train 

In [None]:
! python incubator-mxnet/tools/im2rec.py recordio_dataset/validation.lst ../data/val

Now we have the train and validation datasets in recordio format, we will now copy them to our S3 bucket 

In [None]:
s3_train_key = "recordio_dataset/train"
s3_validation_key = "recordio_dataset/validation"
s3_train = 's3://{}/{}/'.format(bucket, s3_train_key)
s3_validation = 's3://{}/{}/'.format(bucket, s3_validation_key)

In [None]:
! aws s3 cp recordio_dataset/train.lst {s3_train}
! aws s3 cp recordio_dataset/train.rec {s3_train}
! aws s3 cp recordio_dataset/train.idx {s3_train}
! aws s3 cp recordio_dataset/validation.lst {s3_validation}
! aws s3 cp recordio_dataset/validation.rec {s3_validation}
! aws s3 cp recordio_dataset/validation.idx {s3_validation}

## Training
Lets now define our hyperparameter values for training the Image Classification algorithm

In [None]:
# For this training, we will use 50 layers
num_layers = "18" 
# we need to specify the input image shape for the training data
image_shape = "3,640,640"
# we also need to specify the number of training samples in the training set
# for caltech it is 15420
num_training_samples = "1334"
# specify the number of output classes
num_classes = "3"
# batch size for training
mini_batch_size =  "64"
# number of epochs
epochs = "50"
# learning rate
learning_rate = "0.01"

We will now set up the hyperparameters and define our training and valiadation channels

In [None]:
%%time
import time
from time import gmtime, strftime


s3 = boto3.client('s3')
# create unique job name
job_name_prefix = 'traffic-image-classification'
job_name = job_name_prefix + '-' + time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
training_params = \
{
    # specify the training docker image
    "AlgorithmSpecification": {
        "TrainingImage": training_image,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": 's3://{}/{}/output'.format(bucket, job_name_prefix)
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        #"InstanceType": "ml.m5.12xlarge",
        "InstanceType": "ml.p3.2xlarge",
        "VolumeSizeInGB": 50
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "image_shape": image_shape,
        "num_layers": str(num_layers),
        "num_training_samples": str(num_training_samples),
        "num_classes": str(num_classes),
        "mini_batch_size": str(mini_batch_size),
        "epochs": str(epochs),
        "learning_rate": str(learning_rate)
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 360000
    },
#Training data should be inside a subdirectory called "train"
#Validation data should be inside a subdirectory called "validation"
#The algorithm currently only supports fullyreplicated model (where data is copied onto each machine)
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_train,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": s3_validation,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "application/x-recordio",
            "CompressionType": "None"
        }
    ]
}
print('Training job name: {}'.format(job_name))
print('\nInput Data Location: {}'.format(training_params['InputDataConfig'][0]['DataSource']['S3DataSource']))

We now run the training job and wait until it completes - runs for 15-20 minutes

In [None]:
%%time
# create the Amazon SageMaker training job
sagemaker = boto3.client(service_name='sagemaker')
sagemaker.create_training_job(**training_params)

# confirm that the training job has started
status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print('Training job current status: {}'.format(status))

try:
    # wait for the job to finish and report the ending status
    sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
    training_info = sagemaker.describe_training_job(TrainingJobName=job_name)
    status = training_info['TrainingJobStatus']
    print("Training job ended with status: " + status)
except:
    print('Training failed to start')
     # if exception is raised, that means it has failed
    message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
    print('Training failed with the following error: {}'.format(message))

The training job is launched asynchronously. The get_waiter method waits until the job finishes then calls the describe_training_job to get the current status of the job

In [None]:
training_info = sagemaker.describe_training_job(TrainingJobName=job_name)
status = training_info['TrainingJobStatus']
print("Training job ended with status: " + status)

# Inference

***

A trained model does nothing on its own. We now want to use the model to perform inference. For this example, that means predicting the topic mixture representing a given document.

This section involves several steps,

1. [Create Model](#CreateModel) - Create model for the training output
1. [Create Endpoint Configuration](#CreateEndpointConfiguration) - Create a configuration defining an endpoint.
1. [Create Endpoint](#CreateEndpoint) - Use the configuration to create an inference endpoint.
1. [Perform Inference](#PerformInference) - Perform inference on some input data using the endpoint.

## Create Model

We now create a SageMaker Model from the training output. Using the model we can create an Endpoint Configuration.

In [None]:
%%time
import boto3
from time import gmtime, strftime
sage = boto3.Session().client(service_name='sagemaker') 

model_name="sm5-full-image-classification-model"
print(model_name)
#job_name = 'traffic-image-classification--2022-05-09-21-16-28'
info = sage.describe_training_job(TrainingJobName=job_name)

#model_data = info['ModelArtifacts']['S3ModelArtifacts']
model_data = 's3://ml-materials/sm_image_class/model.tar.gz'
print(model_data)

containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/image-classification:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/image-classification:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/image-classification:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/image-classification:latest'}
hosting_image = containers[boto3.Session().region_name]
primary_container = {
    'Image': hosting_image,
    'ModelDataUrl': model_data,
}

create_model_response = sage.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

### Create Endpoint Configuration
At launch, we will support configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way.

In addition, the endpoint configuration describes the instance type required for model deployment, and at launch will describe the autoscaling configuration.

In [None]:
from time import gmtime, strftime

timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
endpoint_config_name = job_name_prefix + '-epc-' + timestamp
endpoint_config_response = sage.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

print('Endpoint configuration name: {}'.format(endpoint_config_name))
print('Endpoint configuration arn:  {}'.format(endpoint_config_response['EndpointConfigArn']))

### Create Endpoint
Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.

In [None]:
%%time
import time

timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
endpoint_name = job_name_prefix + '-ep-' + timestamp
print('Endpoint name: {}'.format(endpoint_name))

endpoint_params = {
    'EndpointName': endpoint_name,
    'EndpointConfigName': endpoint_config_name,
}
endpoint_response = sagemaker.create_endpoint(**endpoint_params)
print('EndpointArn = {}'.format(endpoint_response['EndpointArn']))

Finally, now the endpoint can be created. It may take sometime to create the endpoint...

In [None]:
# get the status of the endpoint
response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print('EndpointStatus = {}'.format(status))


# wait until the status has changed
sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)


# print the status of the endpoint
endpoint_response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))

if status != 'InService':
    raise Exception('Endpoint creation failed.')

If you see the message,

> `Endpoint creation ended with EndpointStatus = InService`

then congratulations! You now have a functioning inference endpoint. You can confirm the endpoint configuration and status by navigating to the "Endpoints" tab in the AWS SageMaker console.

We will finally create a runtime object from which we can invoke the endpoint.

## Perform Inference
Finally, the customer can now validate the model for use. They can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.


In [None]:
import boto3
runtime = boto3.Session().client(service_name='runtime.sagemaker') 

### Download test image

In [None]:
file_name = '../data/test/Roundabout/R2.png'
# test image
from IPython.display import Image
Image(file_name)  

### Evaluation

Evaluate the image through the network for inteference. The network outputs class probabilities and typically, one selects the class with the maximum probability as the final class output.

**Note:** The output class detected by the network may not be accurate in this example. To limit the time taken and cost of training, we have trained the model only for a couple of epochs. If the network is trained for more epochs (say 20), then the output class will be more accurate.

In [None]:
import json
import numpy as np

with open(file_name, 'rb') as f:
    payload = f.read()
    payload = bytearray(payload)
response = runtime.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='application/x-image', 
                                   Body=payload)
result = response['Body'].read()
# result will be in json format and convert it to ndarray
result = json.loads(result)
print(result)

### Clean up

When we're done with the endpoint, we can just delete it and the backing instances will be released.  Run the following cell to delete the endpoint.

In [None]:
sage.delete_endpoint(EndpointName=endpoint_name)

## Batch Inference
We are going to use SageMaker Batch Transform to run batch inference on the Test dataset provided

In [None]:
%%time
import sagemaker

sage = boto3.Session().client(service_name='sagemaker') 

model_name="traffic-full-image-classification-model" + time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
print(model_name)
#info = sage.describe_training_job(TrainingJobName=job_name)
#model_data = info['ModelArtifacts']['S3ModelArtifacts']
model_data = 's3://ml-materials/sm_image_class/model.tar.gz'
print(model_data)

hosting_image = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='image-classification')

primary_container = {
    'Image': hosting_image,
    'ModelDataUrl': model_data,
}

create_model_response = sage.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

Copy the test images to your S3 bucket

In [None]:
batch_input = f's3://{bucket}/test/'

In [None]:
! aws s3 cp ../data/test/ {batch_input} --recursive

Setup the parameters for this batch transform job

In [None]:
timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
batch_job_name = "traffic-image-classification-model" + timestamp
request = \
{
    "TransformJobName": batch_job_name,
    "ModelName": model_name,
    "MaxConcurrentTransforms": 16,
    "MaxPayloadInMB": 6,
    "BatchStrategy": "SingleRecord",
    "TransformOutput": {
        "S3OutputPath": 's3://{}/{}/output'.format(bucket, batch_job_name)
    },
    "TransformInput": {
        "DataSource": {
            "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": batch_input
            }
        },
        "ContentType": "application/x-image",
        "SplitType": "None",
        "CompressionType": "None"
    },
    "TransformResources": {
            "InstanceType": "ml.m5.12xlarge",
            "InstanceCount": 1
    }
}

print('Transform job name: {}'.format(batch_job_name))
print('\nInput Data Location: {}'.format(batch_input))

Now lets run the batch transform job and wait for completion - takes 5-10 minutes 

In [None]:
%%time
sagemaker = boto3.client('sagemaker')
sagemaker.create_transform_job(**request)

print("Created Transform job with name: ", batch_job_name)

while(True):
    response = sagemaker.describe_transform_job(TransformJobName=batch_job_name)
    status = response['TransformJobStatus']
    if status == 'Completed':
        print("Transform job ended with status: " + status)
        break
    if status == 'Failed':
        message = response['FailureReason']
        print('Transform failed with the following error: {}'.format(message))
        raise Exception('Transform job failed') 
    time.sleep(30)

Let us now look at the result of the predictions for each image together with thier confidence rating. 
Note that we have to map the class numbers back to the label assignments

In [None]:
import json
import numpy as np
from urllib.parse import urlparse

s3_client = boto3.client('s3')
object_categories = ['Priority','Roundabout','Signal'] 

def list_objects(s3_client, bucket, prefix):
    response = s3_client.list_objects(Bucket=bucket, Prefix=prefix)
    objects = [content['Key'] for content in response['Contents']]
    return objects

def get_label(s3_client, bucket, prefix):
    filename = prefix.split('/')[-1]
    s3_client.download_file(bucket, prefix, filename)
    with open(filename) as f:
        data = json.load(f)
        index = np.argmax(data['prediction'])
        probability = data['prediction'][index]
    print("Filename: " + filename + " Result: label - " + object_categories[index] + ", probability - " + str(probability))
    return object_categories[index], probability

inputs = list_objects(s3_client, bucket, urlparse(batch_input).path.lstrip('/'))
print("Sample inputs: " + str(inputs[:2]))

outputs = list_objects(s3_client, bucket, batch_job_name + "/output")
print("Sample output: " + str(outputs[:2]))

# Check prediction result of the first 2 images
[get_label(s3_client, bucket, prefix) for prefix in outputs[0:10]]