# MNIST classification using XGBoost algorithm on SageMaker

---
## Contents

1. [Introduction](#Introduction)
2. [Data ingestion](#Data-ingestion)
3. [Training the XGBoost model](#Training-the-XGBoost-model)
4. [Hosting the model](#Hosting-model)
   1. [Create model](#Create-model)
   2. [Create endpoint configuration](#Create-endpoint-configuration)
   3. [Create endpoint](#Create-endpoint)
5. [Validate the model for use](#Validate-the-model-for-use)

---
## Introduction

This notebook uses XGBoost algorithm to train MNIST dataset. The model is hosted on SageMaker endpoint. MNIST dataset has a total of 70,000 images. It's splitted to a training set of 60,000 examples and a test set of 10,000 examples.


In [None]:
#Update SageMaker
! pip install --upgrade sagemaker

In [None]:
%%time

import os
import boto3
import re
import copy
import time
from time import gmtime, strftime
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()

region = boto3.Session().region_name

sess = sagemaker.Session()

# S3 bucket where the original mnist data is downloaded and stored.
downloaded_data_bucket = f"sagemaker-sample-files"
downloaded_data_prefix = "datasets/image/MNIST"

# S3 bucket for saving code and model artifacts.
# Feel free to specify a different bucket and prefix
bucket = sess.default_bucket()

In [None]:
prefix = "sagemaker/MNIST-xgboost-multiclass-classification"
# customize to your bucket where you have stored the data
bucket_path = f"s3://{bucket}"

### Data ingestion

Download data from S3 public bucket

In [None]:
%%time
import pickle, gzip, numpy, json

# Load the dataset
s3 = boto3.client("s3")
s3.download_file(downloaded_data_bucket, f"{downloaded_data_prefix}/mnist.pkl.gz", "mnist.pkl.gz")
with gzip.open("mnist.pkl.gz", "rb") as f:
    train_set, valid_set, test_set = pickle.load(f, encoding="latin1")

#### Data processing

The xgboost model consumes the libsvm converted data from S3 for training. The data needs to be converted from pickle-ized numpy array to the libsvm format before being uploaded to S3.  The following provides functions for data conversions and file upload to S3 and download from S3. 

In [None]:
%%time

import struct
import io
import boto3


def to_libsvm(f, labels, values):
    f.write(
        bytes(
            "\n".join(
                [
                    "{} {}".format(
                        label, " ".join(["{}:{}".format(i + 1, el) for i, el in enumerate(vec)])
                    )
                    for label, vec in zip(labels, values)
                ]
            ),
            "utf-8",
        )
    )
    return f


def write_to_s3(fobj, bucket, key):
    return (
        boto3.Session(region_name=region)
        .resource("s3")
        .Bucket(bucket)
        .Object(key)
        .upload_fileobj(fobj)
    )


def get_dataset():
    import pickle
    import gzip

    with gzip.open("mnist.pkl.gz", "rb") as f:
        u = pickle._Unpickler(f)
        u.encoding = "latin1"
        return u.load()


def upload_to_s3(partition_name, partition):
    labels = [t.tolist() for t in partition[1]]
    vectors = [t.tolist() for t in partition[0]]
    num_partition = 5  # partition file into 5 parts
    partition_bound = int(len(labels) / num_partition)
    for i in range(num_partition):
        f = io.BytesIO()
        to_libsvm(
            f,
            labels[i * partition_bound : (i + 1) * partition_bound],
            vectors[i * partition_bound : (i + 1) * partition_bound],
        )
        f.seek(0)
        key = f"{prefix}/{partition_name}/examples{str(i)}"
        url = f"s3://{bucket}/{key}"
        print(f"Writing to {url}")
        write_to_s3(f, bucket, key)
        print(f"Done writing to {url}")


def download_from_s3(partition_name, number, filename):
    key = f"{prefix}/{partition_name}/examples{number}"
    url = f"s3://{bucket}/{key}"
    print(f"Reading from {url}")
    s3 = boto3.resource("s3", region_name=region)
    s3.Bucket(bucket).download_file(key, filename)
    try:
        s3.Bucket(bucket).download_file(key, "mnist.local.test")
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "404":
            print(f"The object does not exist at {url}.")
        else:
            raise


def convert_data():
    train_set, valid_set, test_set = get_dataset()
    partitions = [("train", train_set), ("validation", valid_set), ("test", test_set)]
    for partition_name, partition in partitions:
        print(f"{partition_name}: {partition[0].shape} {partition[1].shape}")
        upload_to_s3(partition_name, partition)

In [None]:
%%time

convert_data()

## Training the XGBoost model

Now that we have our data in S3, we can begin training. We'll use Amazon SageMaker XGboost algorithm.

In [None]:
from sagemaker.image_uris import retrieve

container = retrieve(framework="xgboost", region=region, version="1.7-1")

In [None]:
container

In [None]:
# Ensure that the train and validation data folders generated above are reflected in the "InputDataConfig" parameter below.
common_training_params = {
    "AlgorithmSpecification": {"TrainingImage": container, "TrainingInputMode": "File"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": f"{bucket_path}/{prefix}/xgboost"},
    "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.m4.10xlarge", "VolumeSizeInGB": 5},
    "HyperParameters": {
        "max_depth": "5",
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "verbosity": "0",
        "objective": "multi:softmax",
        "num_class": "10",
        "num_round": "10",
    },
    "StoppingCondition": {"MaxRuntimeInSeconds": 86400},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{prefix}/train/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{prefix}/validation/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None",
        },
    ],
}

Now we'll create a single instance trainng job

### Training on a single instance

In [None]:
# single machine job params
single_machine_job_name = f'DEMO-xgboost-classification{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print("Job name is:", single_machine_job_name)

single_machine_job_params = copy.deepcopy(common_training_params)
single_machine_job_params["TrainingJobName"] = single_machine_job_name
single_machine_job_params["OutputDataConfig"][
    "S3OutputPath"
] = f"{bucket_path}/{prefix}/xgboost-single"
single_machine_job_params["ResourceConfig"]["InstanceCount"] = 1

In [None]:
%%time

sm = boto3.Session(region_name=region).client("sagemaker")

sm.create_training_job(**single_machine_job_params)
#sm.create_training_job(**distributed_job_params)

status = sm.describe_training_job(TrainingJobName=single_machine_job_name)["TrainingJobStatus"]
print(status)
sm.get_waiter("training_job_completed_or_stopped").wait(TrainingJobName=single_machine_job_name)


if status == "Failed":
    message = sm.describe_training_job(TrainingJobName=single_machine_job_name)["FailureReason"]
    print(f"Training failed with the following error: {message}")
    raise Exception("Training job failed")

In [None]:
#Confirm both jobs have finished.

In [None]:
print(
    "Single Machine:",
    sm.describe_training_job(TrainingJobName=single_machine_job_name)["TrainingJobStatus"],
)

## Hosting model
In order to set up hosting, we will create model, create endpoint configuration, and then create endpoint. The step below demonstrated hosting the single machine job. 


### Create model
Next, you register the model with hosting. This allows you the flexibility of importing models trained elsewhere.

In [None]:
model_name = f"{single_machine_job_name}-model"
print(model_name)

info = sm.describe_training_job(TrainingJobName=single_machine_job_name)

model_data = info["ModelArtifacts"]["S3ModelArtifacts"]
print(model_data)

primary_container = {"Image": container, "ModelDataUrl": model_data}

create_model_response = sm.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response["ModelArn"])

### Create endpoint configuration
SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way. In addition, the endpoint configuration describes the instance type required for model deployment.

In [None]:
from time import gmtime, strftime

endpoint_config_name = f'DEMO-XGBoostEndpointConfig-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print(endpoint_config_name)
create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": "ml.m4.xlarge",
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic",
        }
    ],
)

print(f'Endpoint Config Arn: {create_endpoint_config_response["EndpointConfigArn"]}')

### Create endpoint
Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.

In [None]:
%%time
import time

endpoint_name = f'DEMO-XGBoostEndpoint-{strftime("%Y-%m-%d-%H-%M-%S", gmtime())}'
print(endpoint_name)
create_endpoint_response = sm.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(create_endpoint_response["EndpointArn"])

resp = sm.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print(f"Status: {status}")

while status == "Creating":
    time.sleep(60)
    resp = sm.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
    print(f"Status: {status}")

print(f'Arn: {resp["EndpointArn"]}')
print(f"Status: {status}")

## Validate the model for use
Finally, the customer can now validate the model for use. 


In [None]:
runtime_client = boto3.client("runtime.sagemaker", region_name=region)

In order to evaluate the model, we'll use the test dataset previously generated. Let us first download the data from S3 to the local host.

In [None]:
download_from_s3("test", 0, "mnist.local.test")  # reading the first part file within test

Start with a single prediction. Lets use the first record from the test file.

In [None]:
!head -1 mnist.local.test > mnist.single.test

In [None]:
%%time
import json

file_name = (
    "mnist.single.test"  # customize to your test file 'mnist.single.test' if use the data above
)

with open(file_name, "r") as f:
    payload = f.read()

response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="text/x-libsvm", Body=payload
)
result = response["Body"].read().decode("ascii")
print(f"Predicted label is {result}.")

OK, a single prediction works.
Let's do a whole batch and see how good is the predictions accuracy.

In [None]:
import sys


def do_predict(data, endpoint_name, content_type):
    payload = "\n".join(data)
    response = runtime_client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=payload
    )
    result = response["Body"].read().decode("ascii")
    preds = [float(num) for num in result.split("\n")[:-1]]
    return preds


def batch_predict(data, batch_size, endpoint_name, content_type):
    items = len(data)
    arrs = []
    for offset in range(0, items, batch_size):
        arrs.extend(
            do_predict(data[offset : min(offset + batch_size, items)], endpoint_name, content_type)
        )
        sys.stdout.write(".")
    return arrs

The following function helps us calculate the error rate on the batch dataset. 

In [None]:
%%time
import json

file_name = "mnist.local.test"
with open(file_name, "r") as f:
    payload = f.read().strip()

labels = [float(line.split(" ")[0]) for line in payload.split("\n")]
test_data = payload.split("\n")
preds = batch_predict(test_data, 100, endpoint_name, "text/x-libsvm")

print(
    "\nerror rate=%f"
    % (sum(1 for i in range(len(preds)) if preds[i] != labels[i]) / float(len(preds)))
)

Here are a few predictions

In [None]:
preds[0:10]

and the corresponding labels

In [None]:
labels[0:10]

The following helps us visualize the erros that the XGBoost classifier is making. 

### Delete Endpoint
Once you are done using the endpoint, you can use the following to delete it. 

In [None]:
sm.delete_endpoint(EndpointName=endpoint_name)