# Distributed training with Amazon SageMaker built-in algorithm XGBoost 

This notebook shows usage of SageMaker built-in algorithm XGBoost for distributed training, and how to leverage SageMaker automatic model tuning to tune model hyperparameters.

This notebook was tested in Amazon SageMaker Studio on a ml.t3.medium instance with Python 3 (Data Science) kernel.

### Setup variables and define functions

In [None]:
!pip3 install -U sagemaker

In [None]:
%%time

import os
import boto3
import re
import sagemaker

# Get a SageMaker-compatible role used by this Notebook Instance.
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

### update below values appropriately ###
bucket = sagemaker.Session().default_bucket()
prefix = 'sagemaker/xgboost-dist-builtin'

print(region)

### Download and prepare data

Data preparation includes convert categorical column into numerical, move label column to the 1st column, split datset into traiing and validation datasets, save data into csv format.

In [None]:
%%time

import pyarrow
import numpy as np
import pandas as pd
from sklearn.datasets import load_svmlight_file

s3 = boto3.client("s3")
# Download the dataset and load into a pandas dataframe
FILE_NAME = 'abalone.csv'
s3.download_file("sagemaker-sample-files", f"datasets/tabular/uci_abalone/abalone.csv", FILE_NAME)

feature_names=['Sex', 
               'Length', 
               'Diameter', 
               'Height', 
               'Whole weight', 
               'Shucked weight', 
               'Viscera weight', 
               'Shell weight', 
               'Rings']

data = pd.read_csv(FILE_NAME, 
                   header=None, 
                   names=feature_names)
data["Sex"] = data["Sex"].astype("category").cat.codes

data.head()

In [None]:
# SageMaker XGBoost has the convention of label in the first column
data = data[feature_names[-1:] + feature_names[:-1]]
data.head()

In [None]:
# Split the downloaded data into train/test dataframes
train, validation = np.split(data.sample(frac=1), [int(.8*len(data))])
train_0, train_1 = np.split(train.sample(frac=1), [int(.5*len(train))])

# When dealing with csv format, SageMaker built-in Xgboost algorithm requires csv file with header removed
train_0.to_csv('abalone_train_0.csv', index=False, header=False)
train_1.to_csv('abalone_train_1.csv', index=False, header=False)
validation.to_csv('abalone_validation.csv', index=False, header=False)

Upload training and validation data to s3 bucket

In [None]:
%%time

sagemaker.Session().upload_data('abalone_train_0.csv', 
                                bucket=bucket, 
                                key_prefix=prefix+'/'+'train')

sagemaker.Session().upload_data('abalone_train_1.csv', 
                                bucket=bucket, 
                                key_prefix=prefix+'/'+'train')

sagemaker.Session().upload_data('abalone_validation.csv', 
                                bucket=bucket, 
                                key_prefix=prefix+'/'+'validation')

### Obtaining the latest XGBoost container
We obtain the new container by specifying the framework version (1.5-1). This version specifies the upstream XGBoost framework version (1.5) and an additional SageMaker version (1). If you have an existing XGBoost workflow based on the previous (1.0-1, 1.2-2 or 1.3-1) container, this would be the only change necessary to get the same workflow working with the new container.

In [None]:
container = sagemaker.image_uris.retrieve("xgboost", region, "1.5-1")
print(container)

### Training the XGBoost model

After setting training parameters, we kick off training, and poll for status until training is completed, which in this example, takes few minutes.

To train SageMaker built-in algorithms, we construct a sagemaker.estimator.Estimator class object, which accepts several constructor arguments:

* __role__: Role ARN
* __container__: The built-in algorithm container image ARN in ECR
* __hyperparameters__: A dictionary passed to the train function as hyperparameters.
* __instance_type__: The type of SageMaker instances for training.
* __instance_number__: The number of SageMaker instances for training.
* __sagemaker_session__: The session used to train on Sagemaker.

In [None]:
hyperparameters = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "reg:squarederror",
    "num_round": "50",
    "verbosity": "2",
}

instance_type = "ml.m5.2xlarge"
instance_count = 2
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "abalone-xgb")
content_type = "csv"

If Spot instances are used, the training job can be interrupted, causing it to take longer to start or finish. If a training job is interrupted, a checkpointed snapshot can be used to resume from a previously saved point and can save training time (and cost).

To enable checkpointing for Managed Spot Training using SageMaker XGBoost we need to configure three things: 

1. Enable the `use_spot_instances` constructor arg - a simple self-explanatory boolean. 

2. Set the `max_wait` constructor arg - this is an int arg representing the amount of time you are willing to wait for Spot infrastructure to become available. Some instance types are harder to get at Spot prices and you may have to wait longer. You are not charged for time spent waiting for Spot infrastructure to become available, you're only charged for actual compute time spent once Spot instances have been successfully procured. 

3. Setup a `checkpoint_s3_uri` constructor arg - this arg will tell SageMaker an S3 location where to save checkpoints. While not strictly necessary, checkpointing is highly recommended for Manage Spot Training jobs due to the fact that Spot instances can be interrupted with short notice and using checkpoints to resume from the last interruption ensures you don't lose any progress made before the interruption.

Feel free to toggle the `use_spot_instances` variable to see the effect of running the same job using regular (a.k.a. "On Demand") infrastructure.

Note that `max_wait` can be set if and only if `use_spot_instances` is enabled and must be greater than or equal to `train_max_run`.

In [None]:
import time
from sagemaker.inputs import TrainingInput

job_name = "DEMO-xgboost-builtin-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
print("Training job", job_name)

use_spot_instances = False
max_run = 3600
max_wait = 7200 if use_spot_instances else None
checkpoint_s3_uri = (
    "s3://{}/{}/checkpoints/{}".format(bucket, prefix, job_name) if use_spot_instances else None
)
print("Checkpoint path:", checkpoint_s3_uri)

xgb_estimator = sagemaker.estimator.Estimator(
    container,
    role,
    hyperparameters=hyperparameters,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size=5,  # 5 GB
    output_path=output_path,
    sagemaker_session=sagemaker.Session(),
    use_spot_instances=use_spot_instances,
    max_run=max_run,
    max_wait=max_wait,
    checkpoint_s3_uri=checkpoint_s3_uri,
)

train_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "train"), 
    distribution='ShardedByS3Key', 
    content_type=content_type)

validation_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "validation"), 
    distribution='FullyReplicated', 
    content_type=content_type)

In [None]:
xgb_estimator.fit({'train': train_input, 'validation': validation_input}, job_name=job_name)

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

predictor = xgb_estimator.deploy(
    initial_instance_count=1, 
    instance_type="ml.m5.2xlarge",
    serializer=CSVSerializer(),
    deserializer=CSVDeserializer(),
)

In [None]:
array = data.iloc[:5, 1:].to_numpy() 
array

In [None]:
prediction = predictor.predict(array)
prediction

#### Clean-up

In [None]:
predictor.delete_endpoint(delete_endpoint_config=True)

## SageMaker automatic model tuning

Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose. For example, suppose that you want to solve a binary classification problem. Your goal is to maximize the area under the curve (auc) metric of the algorithm by training an XGBoost Algorithm model. You don't know which values of the eta, alpha, min_child_weight, and max_depth hyperparameters to use to train the best model. To find the best values for these hyperparameters, you can specify ranges of values that Amazon SageMaker hyperparameter tuning searches to find the combination of values that results in the training job that performs the best as measured by the objective metric that you chose. Hyperparameter tuning launches training jobs that use hyperparameter values in the ranges that you specified, and returns the training job with highest accuracy.

SageMakker automatic model tuuning works for all model building options including built-in algorithms, bring your own script (BYOS), and bring your own container (BYOC). To use SageMaker automatic model tuning with BYOS and BYOC, remember to logger objective matrix in your training script into CloudWatch, e.g., `logger.info(f"Test Loss: {test_loss}")`. Then in `HyperparameterTuner`, provide `metric_definitions` in a list of dictionaries with regular expression, e.g.:

```
metric_definitions = [
    {
        "Name": "average test loss",
        "Regex": "Test Loss: ([0-9\\.]+)",
    }
]
```

For more examples, please refer to [sagemaker-examples github repo](https://github.com/aws/amazon-sagemaker-examples/tree/f1fe550777162c298a544b0f6955dcb078235abb/hyperparameter_tuning)

There are three steps before launching a hyperparameter tuning job

### Step 1: Specify hyperparameters that you want to tune, and their types and ranges. 

For hyperparameter type, we have `IntegerParameter`, `CategoricalParameter`, and `ContinuousParameter` three types.

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {
    'eta': ContinuousParameter(0.1, 0.9),
    'max_depth': IntegerParameter(2, 8)
}

### Step 2: Define objective metric

In [None]:
objective_metric_name = 'validation:rmse'

### Step 3: Initiate `HyperparameterTuner` class object

In `HyperparameterTuner`, you need to pass over the model `Estimator`, `objective_metric_name`, and `hyperparameter_ranges` specified above. You also need to specify total tuning jobs in `max_jobs` and number of tuning jobs in parallel `max_parallel_jobs`.

In [None]:
tuner = HyperparameterTuner(
    xgb_estimator,
    objective_metric_name,
    hyperparameter_ranges,
    objective_type='Minimize',
    max_jobs=20,
    max_parallel_jobs=3
)

### Launch hyperparameter tuning job

And finally, we can start our hyperprameter tuning job by calling `.fit()` and passing in the S3 path to our train and test dataset.

After the hyperprameter tuning job is created, you should be able to describe the tuning job to see its progress in the next step, and you can go to SageMaker console->Jobs to check out the progress of the progress of the hyperparameter tuning job.

In [None]:
tuner.fit({'train': train_input, 'validation': validation_input})

Get the best training job name

In [None]:
tuner.best_training_job()

Deploy the best tuned model to a SageMaker endpoint. You can also deploy a specified model among all tuned models.

In [None]:
tuner_predictor = tuner.deploy(
    initial_instance_count=1, 
    instance_type="ml.m5.2xlarge",
    serializer=CSVSerializer(),
    deserializer=CSVDeserializer(),
)

In [None]:
array = data.iloc[:5, 1:].to_numpy() 
prediction = tuner_predictor.predict(array)
prediction

#### Clean-up

In [None]:
tuner_predictor.delete_endpoint(delete_endpoint_config=True)