# MLOps workshop with Amazon SageMaker

## Module 02: Transform the data and train a model using SageMaker managed training job.

In this module we will use the same dataset and model, but will update the model to use features of SageMaker to scale dataset transformation and model training beyond Jupyter notebook.

This notebook includes all key steps such as preprocessing data with SageMaker Processing, and model training and deployment with SageMaker hosted training and inference. Automatic Model Tuning in SageMaker is used to tune the model's hyperparameters. If you are using TensorFlow 2, you can use the Amazon SageMaker prebuilt TensorFlow 2 framework container with training scripts similar to those you would use outside SageMaker.

In [None]:
!pip install --upgrade sagemaker -q

In [None]:
import boto3
import os
import sagemaker
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
region = boto3.Session().region_name

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

batch_dir = os.path.join(os.getcwd(), 'data/batch')
os.makedirs(batch_dir, exist_ok=True)

print(f'SageMaker Version: {sagemaker.__version__}')

# SageMaker Processing for dataset transformation <a class="anchor" id="SageMakerProcessing">

Next, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.  An alternative to SageMaker Processing is [SageMaker Data Wrangler](https://aws.amazon.com/sagemaker/data-wrangler/), a visual data preparation tool integrated with the SageMaker Studio UI.    

To work with SageMaker Processing, first we'll load the California Housing dataset, save the raw feature data and upload it to Amazon S3 so it can be accessed by SageMaker Processing.  We'll also save the labels for training and testing.
    
More info on the dataset:

This dataset was obtained from the StatLib repository. http://lib.stat.cmu.edu/datasets/

The target variable is the median house value for California districts.

This dataset was derived from the 1990 U.S. census, using one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people).

In [None]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz .

In [None]:
!tar -zxf cal_housing.tgz 2>/dev/null

In [None]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]
df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)

In [None]:
df.head()

In [None]:
X = df[['longitude','latitude','housingMedianAge','totalRooms','totalBedrooms','population','households','medianIncome']]
Y = df[['medianHouseValue']]

In [None]:
print("Features:", list(X.columns))
print("Dataset shape:", X.shape)
print("Dataset Type:", type(X))
print("Label set shape:", Y.shape)
print("Label set Type:", type(X))

# We partition the dataset into 2/3 training and 1/3 test set.
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.33)

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(raw_dir, 'y_train.npy'), y_train)
np.save(os.path.join(raw_dir, 'y_test.npy'), y_test)
s3_prefix = 'tf-2-workflow'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)

# Upload the raw dataset to S3 SageMaker default bucket, files will be used to start as input in the preprocessing job.
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(f"Uploaded raw dataset to: {raw_s3}")

In [None]:
!aws s3 ls {raw_s3} --recursive

Next, simply supply an ordinary Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn framework container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal API contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

## SageMaker Local Mode
To help you perform the code changes required to take your script and adapt it to SageMaker, you can use the [SageMaker local mode](https://aws.amazon.com/blogs/machine-learning/use-the-amazon-sagemaker-local-mode-to-train-on-your-notebook-instance/) to process, train or inference on your local machine.

The local mode in the Amazon SageMaker Python SDK can emulate CPU (single and multi-instance) and GPU (single instance) SageMaker training jobs by changing a single argument in the TensorFlow, PyTorch or MXNet estimators. To do this, it uses Docker compose and NVIDIA Docker. It will also pull the Amazon SageMaker TensorFlow, PyTorch or MXNet containers from Amazon ECR, so you’ll need to be able to access a public Amazon ECR repository from your local environment.


In this workshop we will not use SageMaker Local mode, as we have processing, training and evaluation scripts ready. You can browse [this GitHub repository](https://github.com/aws-samples/amazon-sagemaker-local-mode) which contains examples and related resources showing you how to preprocess, train, debug your training script with breakpoints, and serve on your local machine using Amazon SageMaker Local mode for processing jobs, training and serving.

In [None]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    x_train = np.load(os.path.join('/opt/ml/processing/input', 'x_train.npy'))
    scaler.fit(x_train)
    for file in input_files:
        raw = np.load(file)
        # only transform feature columns
        if 'y_' not in file:
            transformed = scaler.transform(raw)
        if 'train' in file:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/train', 'y_train.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TRAINING DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/test', 'y_test.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TEST DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TEST DATA FILE\n')

In [None]:
from sagemaker import get_execution_role
import boto3


def get_sagemaker_execution_role():
    try:
        role = get_execution_role()
    except ValueError:
        iam = boto3.client('iam')
        role = iam.get_role(RoleName='AmazonSageMaker-ExecutionRole-20210510T183593')['Role']['Arn'] # When updating locally update your sagemaker execution code, to get it working locally
    
    return role

Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.  Spinning a cluster is just a matter of setting `instance_count` to 2 or more, but our transformation has a `StandardScaler` which must be run over all training data and applied equally to train and test data. That can't be parallelized with `scikit-learn`, but since the dataset is small, that is not a problem.

In [None]:

from sagemaker.sklearn.processing import SKLearnProcessor


role = get_sagemaker_execution_role()
sklearn_processor1 = SKLearnProcessor(framework_version='0.23-1',
                                      role=role,
                                      instance_type='ml.m5.xlarge',
                                      instance_count=1)

We're now ready to run the Processing job.

To enable distributing the data files equally among the instances, you could have specified the `ShardedByS3Key` distribution type in the `ProcessingInput` object.  This would have ensured that if you have `n` instances, each instance will receive `1/n` files from the specified S3 bucket.
This is not needed in this case since the dataset is fairly small.

It may take around 3 minutes for the following code cell to run, mainly to set up the cluster.  At the end of the job, the cluster automatically will be torn down by SageMaker.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime 

processing_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor1.run(
    code='preprocessing.py',
    job_name=processing_job_name,
    inputs=[ProcessingInput(
        source=raw_s3,
        destination='/opt/ml/processing/input'
    )],
    outputs=[
        ProcessingOutput(output_name='train',
            destination='{}/train'.format(output_destination),
            source='/opt/ml/processing/train'),
        ProcessingOutput(output_name='test',
            destination='{}/test'.format(output_destination),
            source='/opt/ml/processing/test')
    ]
)

preprocessing_job_description = sklearn_processor1.jobs[-1].describe()

In the log output of the SageMaker Processing job above, you should be able to see logs in two different colors for the two different instances, and that each instance received different files.  Without the `ShardedByS3Key` distribution type, each instance would have received a copy of **all** files.  By spreading the data equally among `n` instances, you should receive a speedup by approximately a factor of `n` for most stateless data transformations.  After saving the job results locally, we'll move on to training and inference code.

### Validation for processing job

We will validate that that the processing job was successful. we will download them from S3 `output_destination` that we configured in the processing job before

>This is optional for validation purposes.

In [None]:
x_train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
y_train_in_s3 = '{}/train/y_train.npy'.format(output_destination)
x_test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
y_test_in_s3 = '{}/test/y_test.npy'.format(output_destination)

!aws s3 cp {x_train_in_s3} ./data/train/x_train.npy
!aws s3 cp {y_train_in_s3} ./data/train/y_train.npy
!aws s3 cp {x_test_in_s3} ./data/test/x_test.npy
!aws s3 cp {y_test_in_s3} ./data/test/y_test.npy

#  SageMaker hosted training <a class="anchor" id="SageMakerHostedTraining">

Now that we've prepared a dataset, we can move on to SageMaker's model training functionality. With SageMaker hosted training the actual training itself occurs not on the notebook instance, but on a separate cluster of machines managed by SageMaker. Before starting hosted training, the data must be in S3, or an EFS or FSx for Lustre file system. We'll upload to S3 now, and confirm the upload was successful.

In [None]:
s3_prefix = 'tf-2-workflow'

traindata_s3_prefix = '{}/data/train'.format(s3_prefix)
testdata_s3_prefix = '{}/data/test'.format(s3_prefix)

In [None]:
train_s3 = sess.upload_data(path='./data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./data/test/', key_prefix=testdata_s3_prefix)

inputs = {'train':train_s3, 'test': test_s3}

print(inputs)

In [None]:
!aws s3 ls {train_s3} --recursive

In [None]:
!aws s3 ls {test_s3} --recursive

We're now ready to set up an Estimator object for hosted training. We simply call `fit` to start the actual hosted training.

In [None]:
from sagemaker.tensorflow import TensorFlow

train_instance_type = 'ml.c5.xlarge'
hyperparameters = {'epochs': 70, 'batch_size': 128, 'learning_rate': 0.01}

role = get_sagemaker_execution_role()
hosted_estimator = TensorFlow(
                       source_dir='code',
                       entry_point='train.py',
                       instance_type=train_instance_type,
                       instance_count=1,
                       hyperparameters=hyperparameters,
                       role=role,
                       base_job_name='tf-2-workflow',
                       framework_version='2.6',
                       py_version='py38')

After starting the hosted training job with the `fit` method call below, you should observe the valication loss converge with each epoch.  Can we do better? We'll look into a way to do so in the **Automatic Model Tuning** section below. In the meantime, the hosted training job should take about 3 minutes to complete.  

In [None]:
hosted_estimator.fit(inputs)

The training job produces a model saved in S3 that we can retrieve.  This is an example of the modularity of SageMaker: having trained the model in SageMaker, you can now take the model out of SageMaker and run it anywhere else.  Alternatively, you can deploy the model into a production-ready environment using SageMaker's hosted endpoints functionality, as shown in the **SageMaker hosted endpoint** section below.

Retrieving the model from S3 is very easy:  the hosted training estimator you created above stores a reference to the model's location in S3.  You simply copy the model from S3 using the estimator's `model_data` property and unzip it to inspect the contents.

In [None]:
!aws s3 cp {hosted_estimator.model_data} ./model/model.tar.gz

The unzipped archive should include the assets required by TensorFlow Serving to load the model and serve it, including a .pb file:  

In [None]:
!tar -xvzf ./model/model.tar.gz -C ./model

In [None]:
!ls ./model/ --recursive

# Automatic Model Tuning <a class="anchor" id="AutomaticModelTuning">

So far we have simply run one Hosted Training job without any real attempt to tune hyperparameters to produce a better model.  Selecting the right hyperparameter values to train your model can be difficult, and typically is very time consuming if done manually. The right combination of hyperparameters is dependent on your data and algorithm; some algorithms have many different hyperparameters that can be tweaked; some are very sensitive to the hyperparameter values selected; and most have a non-linear relationship between model fit and hyperparameter values.  SageMaker Automatic Model Tuning helps automate the hyperparameter tuning process:  it runs multiple training jobs with different hyperparameter combinations to find the set with the best model performance.

We begin by specifying the hyperparameters we wish to tune, and the range of values over which to tune each one.  We also must specify an objective metric to be optimized:  in this use case, we'd like to minimize the validation loss.

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {
  'learning_rate': ContinuousParameter(0.001, 0.2, scaling_type="Logarithmic"),
  'epochs': IntegerParameter(10, 50),
  'batch_size': IntegerParameter(64, 256),
}

metric_definitions = [{'Name': 'loss',
                       'Regex': ' loss: ([0-9\\.]+)'},
                     {'Name': 'val_loss',
                       'Regex': ' val_loss: ([0-9\\.]+)'}]

objective_metric_name = 'val_loss'
objective_type = 'Minimize'
strategy='Bayesian' # Default to Bayesian, can be Random, Grid, Hyperband. may require additional parameters

Next we specify a HyperparameterTuner object that takes the above definitions as parameters.  Each tuning job must be given a budget:  a maximum number of training jobs.  A tuning job will complete after that many training jobs have been executed.  

We also can specify how much parallelism to employ, in this case three jobs, meaning that the tuning job will complete after two series of three jobs in parallel have completed.  For the default Bayesian Optimization tuning strategy used here, the tuning search is informed by the results of previous groups of training jobs, so we don't run all of the jobs in parallel, but rather divide the jobs into groups of parallel jobs.  There is a trade-off: using more parallel jobs will finish tuning sooner, but likely will sacrifice tuning search accuracy. 

Now we can launch a hyperparameter tuning job by calling the `fit` method of the HyperparameterTuner object.  The tuning job may take around 10 minutes to finish.  While you're waiting, the status of the tuning job, including metadata and results for invidual training jobs within the tuning job, can be checked in the SageMaker console in the **Hyperparameter tuning jobs** panel.  

In [None]:
tuner = HyperparameterTuner(hosted_estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions,
                            max_jobs=6,
                            max_parallel_jobs=3,
                            objective_type=objective_type,
                            strategy=strategy)

tuning_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
tuner.fit(inputs, job_name=tuning_job_name)

# Track progress of the tuning job with wait() function
tuner.wait()

After the tuning job is finished, we can use the `HyperparameterTuningJobAnalytics` object from the SageMaker Python SDK to list the top 5 tuning jobs with the best performance. Although the results vary from tuning job to tuning job, the best validation loss from the tuning job (under the FinalObjectiveValue column) likely will be substantially lower than the validation loss from the hosted training job above, where we did not perform any tuning other than manually increasing the number of epochs once.  

In [None]:
tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)
tuner_metrics.dataframe().sort_values(['FinalObjectiveValue'], ascending=True).head(5)

The total training time and training jobs status can be checked with the following lines of code. Because automatic early stopping is by default off, all the training jobs should be completed normally.  For an example of a more in-depth analysis of a tuning job, see the SageMaker official sample [HPO_Analyze_TuningJob_Results.ipynb](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/hyperparameter_tuning/analyze_results/HPO_Analyze_TuningJob_Results.ipynb) notebook.

In [None]:
total_time = tuner_metrics.dataframe()['TrainingElapsedTimeSeconds'].sum() / 3600
print("The total training time is {:.2f} hours".format(total_time))
tuner_metrics.dataframe()['TrainingJobStatus'].value_counts()

#  SageMaker hosted endpoint <a class="anchor" id="SageMakerHostedEndpoint">

Assuming the best model from the tuning job is better than the model produced by the individual hosted training job above, we could now easily deploy that model to production.  A convenient option is to use a SageMaker hosted endpoint, which serves real time predictions from the trained model (For asynchronous, offline predictions on large datasets, you can use either SageMaker Processing or SageMaker Batch Transform.). The endpoint will retrieve the TensorFlow SavedModel created during training and deploy it within a SageMaker TensorFlow Serving container. This all can be accomplished with one line of code.  

More specifically, by calling the `deploy` method of the HyperparameterTuner object we instantiated above, we can directly deploy the best model from the tuning job to a SageMaker hosted endpoint.

In [None]:
tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

We can compare the predictions generated by this endpoint with the actual target values: 

In [None]:
x_test = np.load('./data/test/x_test.npy')
y_test = np.load('./data/test/y_test.npy')

In [None]:
results = tuning_predictor.predict(x_test[:10])['predictions'] 
flat_list = [float('%.1f'%(item)) for sublist in results for item in sublist]
print('predictions: \t{}'.format(np.array(flat_list)))
print('target values: \t{}'.format(y_test[:10].round(decimals=1)))

To avoid billing charges from stray resources, you can delete the prediction endpoint to release its associated instance(s).

In [None]:
sess.delete_endpoint(tuning_predictor.endpoint_name)

# Batch Scoring Step <a class="anchor" id="BatchScoringStep">
    
The final step in this pipeline is offline, batch scoring (inference/prediction).  The inputs to this step will be the model we trained earlier, and the test data.  A simple, ordinary Python script is all we need to do the actual batch inference.

In [None]:
%%writefile batch-score.py

import os
import subprocess
import sys
import numpy as np
import pathlib
import tarfile


if __name__ == "__main__":
    
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, 'r:gz') as tar:
        tar.extractall('./model')
    import tensorflow as tf
    model = tf.keras.models.load_model('./model/1')
    test_path = "/opt/ml/processing/test/"
    x_test = np.load(os.path.join(test_path, 'x_test.npy'))
    y_test = np.load(os.path.join(test_path, 'y_test.npy'))
    scores = model.evaluate(x_test, y_test, verbose=2)
    print("\nTest MSE :", scores)
    
    output_dir = "/opt/ml/processing/batch"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/score-report.txt"
    with open(evaluation_path, 'w') as writer:
         writer.write(f"Test MSE : {scores}")

We'll use SageMaker Processing here to perform batch scoring.

In [None]:
!aws s3 ls {sklearn_processor1.latest_job.outputs[1].destination}/

In [None]:
from sagemaker.tensorflow import TensorFlowProcessor

batch_instance_type = "ml.c5.xlarge"
batch_instance_count = 1

batch_scorer = TensorFlowProcessor(
                    framework_version='2.6',
                    role=role,
                    instance_type=batch_instance_type,
                    instance_count=batch_instance_count,
                    base_job_name="tf-2-workflow-batch",
                    py_version='py38'
)

In [None]:
batch_scorer.run(
    inputs=[
        ProcessingInput(
            source=tuner.best_estimator().model_data,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=sklearn_processor1.latest_job.outputs[1].destination,    # [0] is train, [1] is test
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),],
    code="./batch-score.py" )

In [None]:
report_path = f"{batch_scorer.latest_job.outputs[0].destination}/score-report.txt"
!aws s3 cp {report_path} ./score-report.txt --quiet && cat score-report.txt