## XGBoost Complete Project Workflow in Amazon SageMaker
### Data Preprocessing
    
1. [Introduction](#Introduction)
2. [SageMaker Processing for dataset transformation](#SageMakerProcessing)
    
## Introduction <a class="anchor" id="Introduction">

If you are using XGBoost, you can use the Amazon SageMaker prebuilt XGBoost container. Using SageMaker's XGBoost and other SageMaker features, you can build a complete workflow for an XGBoost project. These notebooks present such a workflow, including all key steps such as preprocessing data with SageMaker Processing, code prototyping with SageMaker Local Mode training and inference, and production-ready model training and deployment with SageMaker hosted training and inference. Automatic Model Tuning in SageMaker is used to tune the model's hyperparameters.  Additionally, the [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/readmelink.html) is used to automate the main training and deployment steps for use in a production workflow outside notebooks.    

To enable you to run these notebooks within a reasonable time (typically less than an hour), the use case is a straightforward regression task:  predicting house prices based on the well-known Boston Housing dataset. This public dataset contains 13 features regarding housing stock of towns in the Boston area.  Features include average number of rooms, accessibility to radial highways, adjacency to the Charles River, etc.  

To begin, we'll import some necessary packages and set up directories for local training and test data.  We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output.  The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region.  

In [1]:
import os
import sagemaker
import random
import boto3

# seed used by Tensorflow's boston housing data set
random.seed(113)

sess = sagemaker.Session()
bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='AmazonSageMaker-ExecutionRole-20200630T141851')['Role']['Arn']

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

train_dir_csv = os.path.join(os.getcwd(), 'data/train_csv')
os.makedirs(train_dir_csv, exist_ok=True)

test_dir_csv = os.path.join(os.getcwd(), 'data/test_csv')
os.makedirs(test_dir_csv, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

model_dir = os.path.join(os.getcwd(), 'model')
os.makedirs(model_dir, exist_ok=True)

We'll also import and instantiate a helper class to help us store variables as we go through these notebooks so we can pick up where we left off, even if the notebook kernels get restarted.

In [2]:
from parameter_store import ParameterStore
ps = ParameterStore()

# SageMaker Processing for dataset transformation <a class="anchor" id="SageMakerProcessing">

Next, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.    

First we'll load the Boston Housing dataset, save the raw feature data and upload it to Amazon S3 for transformation by SageMaker Processing.  We'll also save the labels for training and testing.

In [3]:
import numpy as np
import math
from sklearn.datasets import load_boston
from sklearn.preprocessing import StandardScaler


boston = load_boston()
x, y = boston['data'], boston['target']
training_index = math.floor(.8 * boston['data'].shape[0])
x_train, y_train = x[:training_index], y[:training_index]
x_test, y_test = x[training_index:], y[training_index:]

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
s3_prefix = 'xgboost-workflow'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)

To use SageMaker Processing, simply supply a Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

In [4]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    for file in input_files:
        raw = np.load(file)
        transformed = scaler.fit_transform(raw)
        if 'train' in file:
            output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
            np.save(output_path, transformed)
            print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
            np.save(output_path, transformed)
            print('SAVED TRANSFORMED TEST DATA FILE\n')

Overwriting preprocessing.py


Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.  Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing.  

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

We're now ready to run the Processing job.  To enable distributing the data files equally among the instances, we specify the `ShardedByS3Key` distribution type in the `ProcessingInput` object.  This ensures that if we have `n` instances, each instance will receive `1/n` files from the specified S3 bucket.  It may take around 3 minutes for the following code cell to run, mainly to set up the cluster.  At the end of the job, the cluster automatically will be torn down by SageMaker.  

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime 

processing_job_name = "xgboost-workflow-preprocessing-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

# code=can be a s3 uri for the input script
sklearn_processor.run(code='preprocessing.py',
                      job_name=processing_job_name,
                      inputs=[ProcessingInput(
                        source=raw_s3,
                        destination='/opt/ml/processing/input',
                        s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[ProcessingOutput(output_name='train',
                                                destination='{}/train'.format(output_destination),
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(output_destination),
                                                source='/opt/ml/processing/test')])

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  xgboost-workflow-preprocessing-23-15-57-58
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-781297242028/xgboost-workflow/data/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-781297242028/xgboost-workflow-preprocessing-23-15-57-58/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-781297242028/xgboost-workflow/data/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-781297242028/xgboost-workflow/data/test', 'LocalPath': '/opt/ml

In the log output of the SageMaker Processing job above, you should be able to see logs in two different colors for the two different instances, and that each instance received different files.  Without the `ShardedByS3Key` distribution type, each instance would have received a copy of **all** files.  By spreading the data equally among `n` instances, you should receive a speedup by approximately a factor of `n` for most stateless data transformations.  After saving the job results locally, we'll move on to prototyping training and inference code with Local Mode.

In [None]:
train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
!aws s3 cp {train_in_s3} ./data/train/x_train.npy
!aws s3 cp {test_in_s3} ./data/test/x_test.npy

download: s3://sagemaker-us-west-2-781297242028/xgboost-workflow/data/train/x_train.npy to data/train/x_train.npy
download: s3://sagemaker-us-west-2-781297242028/xgboost-workflow/data/test/x_test.npy to data/test/x_test.npy


We'll use the data and artifacts created in this notebook in downstream notebooks for model training and deployment. Store them here for later retrieval.

In [None]:
ps.update({'bucket': bucket, 's3_prefix': s3_prefix, 'raw_s3': raw_s3, 'train_dir': train_dir,
          'test_dir': test_dir, 'role': role, 'train_dir_csv': train_dir_csv,
          'test_dir_csv': test_dir_csv})

In [None]:
ps.store()