#### PyTorch Complete Project Workflow in Amazon SageMaker
### Model Deployment
    
1. [Local Mode endpoint](#LocalModeEndpoint)
2. [SageMaker hosted endpoint](#SageMakerHostedEndpoint)
3. [Multi-Model endpoints](#MultiModelEndpoints)
4. [Production Variants with Model Monitor](#ProductionVariants)
5. [Invoking SageMaker endpoints](#InvokingSageMakerEndpoints)
6. [Clean up resources](#CleanUp)

## Local Mode endpoint <a class="anchor" id="LocalModeEndpoint">

While Amazon SageMaker’s Local Mode training is very useful to make sure your training code is working before moving on to full scale training, it also would be useful to have a convenient way to test your model locally before incurring the time and expense of deploying it to production. One possibility is to fetch the PyTorch artifact or a model checkpoint saved in Amazon S3, and load it in your notebook for testing. However, an even easier way to do this is to use the SageMaker Python SDK to do this work for you by setting up a Local Mode endpoint.

More specifically, the Estimator object from the Local Mode training job can be used to deploy a model locally. With one exception, this code is the same as the code you would use to deploy to production. In particular, all you need to do is invoke the local Estimator's deploy method, and similarly to Local Mode training, specify the instance type as either `local_gpu` or `local` depending on whether your notebook is on a GPU instance or CPU instance.  

First, we'll import the variables stored from previous notebooks.

In [None]:
%store -r

The following single line of code deploys the model locally in the SageMaker PyTorch container using the model artifacts from our local training job:  

In [None]:
from sagemaker.pytorch import PyTorchModel

local_model = PyTorchModel(entry_point='train_deploy.py', source_dir='pytorch-model/train_model',
                           model_data=local_model_data, role=role, framework_version='1.5.1')
local_predictor = local_model.deploy(initial_instance_count=1, instance_type='local')

To get predictions from the Local Mode endpoint, simply invoke the Predictor's predict method.

In [None]:
from sagemaker.predictor import json_deserializer, json_serializer
import json

local_predictor.content_type = "application/json"
local_predictor.accept = "application/json"
local_predictor.serializer = json_serializer
local_predictor.deserializer = json_deserializer

result = local_predictor.predict(x_test[0])
result

As a sanity check, the predictions can be compared against the actual target values.

In [None]:
local_results = [local_predictor.predict(x_test[i]) for i in range(0, 10)]
print(f'predictions: \t {local_results}')
print(f'target values: \t {y_test[:10]}')

We only trained the model for a few epochs and there is much room for improvement, but the predictions so far should at least appear reasonably within the ballpark.

To avoid having the SageMaker PyTorch Serving container indefinitely running locally, simply gracefully shut it down by calling the `delete_endpoint` method of the Predictor object.

In [None]:
local_predictor.delete_endpoint()

## SageMaker hosted endpoint <a class="anchor" id="SageMakerHostedEndpoint">

Assuming the best model from the tuning job is better than the model produced by the individual Hosted Training job above, we could now easily deploy that model to production.  A convenient option is to use a SageMaker hosted endpoint, which serves real time predictions from the trained model (Batch Transform jobs also are available for asynchronous, offline predictions on large datasets). The endpoint will retrieve the PyTorch SavedModel created during training and deploy it within a SageMaker PyTorch Serving container. This all can be accomplished with one line of code.  

More specifically, by calling the `deploy` method of the HyperparameterTuner object we instantiated above, we can directly deploy the best model from the tuning job to a SageMaker hosted endpoint.  It will take several minutes longer to deploy the model to the hosted endpoint compared to the Local Mode endpoint, which is more useful for fast prototyping of inference code.  

In [None]:
from sagemaker.pytorch import PyTorchModel

model = PyTorchModel(entry_point='train_deploy.py', source_dir='pytorch-model/train_model',
                     model_data=remote_model_data, role=role, framework_version='1.5.1',
                     name='pytorch-model-from-hosted-endpoint')
predictor = model.deploy(initial_instance_count=1, instance_type='ml.t2.medium',
                         endpoint_name='pytorch-housing')

To get predictions from the hosted endpoint, simply invoke the Predictor's predict method.

In [None]:
from sagemaker.predictor import json_deserializer, json_serializer
import json

predictor.content_type = "application/json"
predictor.accept = "application/json"
predictor.serializer = json_serializer
predictor.deserializer = json_deserializer

predictor.predict(x_test[0])

We can compare the predictions generated by this endpoint with those generated locally by the Local Mode endpoint: 

In [None]:
hosted_results = [predictor.predict(x_test[i]) for i in range(0, 10)]
print(f'local predictions: \t {local_results}')
print(f'hosted predictions: \t {hosted_results}')

### SageMaker hosted endpoint with autotuned parameters

In [None]:
from sagemaker.pytorch import PyTorch
from sagemaker.tuner import HyperparameterTuner

estimator = PyTorch(**estimator_parameters)
tuner_parameters['estimator'] = estimator

tuner = HyperparameterTuner(**tuner_parameters)
tuner = tuner.attach(tuning_job_name)
tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.t2.medium',
                                endpoint_name='pytorch-housing-auto')

We can compare the predictions generated by this endpoint with those generated locally by the Local Mode endpoint: 

In [None]:
tuning_predictor.content_type = "application/json"
tuning_predictor.accept = "application/json"
tuning_predictor.serializer = json_serializer
tuning_predictor.deserializer = json_deserializer

In [None]:
hosted_results = [tuning_predictor.predict(x_test[i]) for i in range(0, 10)]
print(f'local predictions: \t {local_results}')
print(f'tuner predictions: \t {hosted_results}')

## Multi-Model Endpoints <a class="anchor" id="MultiModelEndpoints">

Hundreds or thousands of models deployed to hundreds or thousands of endpoints can get costly. It's even more challenging when you don't have to access all models at the same time but still need them to be available at all times.

SageMaker multi-model endpoints allows you to deploy multiple models on a single serving container. This drastically reduces your costs without sacrificing scalability and low latency. Check out more about it [here](https://aws.amazon.com/blogs/machine-learning/save-on-inference-costs-by-using-amazon-sagemaker-multi-model-endpoints/).

In [None]:
model_1 = remote_model_data
model_2 = f's3://{bucket}/{tuner.best_training_job()}/output/model.tar.gz'

Copy the two PyTorch models to the same prefix.

In [None]:
!pip install --upgrade awscli==1.18.140

In [None]:
output_1 = f's3://{bucket}/{s3_prefix}/mme/model1.tar.gz'
output_2 = f's3://{bucket}/{s3_prefix}/mme/model2.tar.gz'

!aws s3 cp {model_1} {output_1}
!aws s3 cp {model_2} {output_2}

Deploy the Multi-Model Endpoint container.

In [None]:
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.multidatamodel import MultiDataModel
from sagemaker.predictor import json_serializer

sess = sagemaker.Session()

image = '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:1.5.1-cpu-py36-ubuntu16.04'

# All models are located under this prefix, each with a unique name
model_data_prefix = f's3://{bucket}/{s3_prefix}/mme/'

mme = MultiDataModel(name='mme-pytorch',
                     model_data_prefix=model_data_prefix,
                     model=model,
                     sagemaker_session=sess)

mme_predictor = mme.deploy(initial_instance_count=1,
                       instance_type='ml.t2.medium',
                       endpoint_name='mme-pytorch')

mme_predictor.serializer = json_serializer
mme_predictor.content_type = 'application/json'

Now we can call multiple models.

Here's a list of models we can choose from.

In [None]:
!aws s3 ls {model_data_prefix}

In [None]:
mme_predictor.predict(x_test[0], initial_args={'TargetModel': 'model1.tar.gz'})

In [None]:
mme_predictor.predict(x_test[0], initial_args={'TargetModel': 'model2.tar.gz'})

You can also add a model to this endpoint on-the-fly by dropping a `x.targ.gz` package in the `model_data_prefix` location in S3. Just make sure it's a PyTorch model and that it has a unique name from the others.

In [None]:
# Going to copy an existing model and just rename it for demo purposes

output_3 = f's3://{bucket}/{s3_prefix}/mme/model3.tar.gz'
!aws s3 cp {model_1} {output_3}

In [None]:
!aws s3 ls {model_data_prefix}

We see that the 3rd model is available to call, and we shall!

In [None]:
mme_predictor.predict(x_test[0], initial_args={'TargetModel': 'model3.tar.gz'})

## Production Variants with Model Monitor <a class="anchor" id="ProductionVariants">

In [None]:
from sagemaker.session import production_variant

# Create the first model
model = PyTorchModel(entry_point='train_deploy.py', source_dir='pytorch-model/train_model',
                     model_data=remote_model_data, role=role, framework_version='1.5.1',
                     name='pytorch-model', sagemaker_session=sess)
model._create_sagemaker_model(instance_type='ml.t2.medium')

# And now the second PyTorch model is already created from the autotuned job,
# so we'll just use that `tuner.best_training_job()`

# PyTorch model with our own hyperparameters
variant_1 = production_variant(model_name='pytorch-model',
                               instance_type='ml.t2.medium',
                               initial_instance_count=1,
                               variant_name='Variant1',
                               initial_weight=1)

# PyTorch model with autotuned hyperparameters
variant_2 = production_variant(model_name=tuner.best_training_job(),
                               instance_type='ml.t2.medium',
                               initial_instance_count=1,
                               variant_name='Variant2',
                               initial_weight=1)

Setup Model Monitor's Data Capture for Production Variants

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

s3_capture_upload_path = f's3://{bucket}/{s3_prefix}/model_monitor'

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

data_capture_config_dict = data_capture_config._to_request_dict()

Now create the Production Variant endpoint.

In [None]:
endpoint_name = 'pytorch-production-variants'
sess.endpoint_from_production_variants(name=endpoint_name,
                                       production_variants=[variant_1, variant_2],
                                       data_capture_config_dict=data_capture_config_dict)

Create a baseline.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
import numpy as np

# Upload our training data to S3 as a CSV
local_train_data = np.load('./data/train/x_train.npy')
np.savetxt('./data/train.csv', local_train_data, delimiter=',', fmt='%f')
out = f's3://{bucket}/{s3_prefix}/data/train.csv'
!aws s3 cp ./data/train.csv {out}

# Baseline data is the training data that we saved as CSV
baseline_data_uri = f's3://{bucket}/{s3_prefix}/data/train.csv'
baseline_results_uri = f's3://{bucket}/{s3_prefix}/model_monitor/baseline_output'

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=baseline_results_uri,
    wait=True
)

Create the monitoring job.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator

baseline_violations_uri = f's3://{bucket}/{s3_prefix}/model_monitor/violations'

monitor_schedule_name = 'pytorch-boston-housing-model-monitor-schedule'

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=monitor_schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri=baseline_violations_uri,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Let's invoke the production variant endpoint so that SageMaker Model Monitor can capture some traffic.

In [None]:
from sagemaker.predictor import RealTimePredictor
from sagemaker.predictor import csv_serializer
from sagemaker.predictor import json_deserializer

prodvar_predictor = RealTimePredictor(endpoint=endpoint_name,
                              sagemaker_session=sess,
                              serializer=json_serializer,
                              deserializer=json_deserializer,
                              content_type='application/json',
                              accept='application/json')

In [None]:
variant1_predictions = []
for i in range(len(x_test)):
    variant1_predictions.append(prodvar_predictor.predict(x_test[i], target_variant='Variant1'))
variant1_predictions

In [None]:
variant2_predictions = []
for i in range(len(x_test)):
    variant2_predictions.append(prodvar_predictor.predict(x_test[i], target_variant='Variant2'))
variant2_predictions

Some time has passed by and you want to grab the predictions that Model Monitor captured for us and compare them to what actually happened. As such, we can detect for model drift and even do an A/B test.

In [None]:
s3_capture_upload_path

In [None]:
variant1_predictions = f'{s3_capture_upload_path}/{endpoint_name}/Variant1'
!aws s3 cp --recursive {variant1_predictions} ./data/model_monitor/Variant1

In [None]:
import os

for root, dirs, files in os.walk('./data/model_monitor/Variant1'):
     for file in files:
        if file != '.DS_Store':
            with open(os.path.join(root, file), "r", encoding = "utf-8") as auto:
                requests_predictions_file = auto.readlines()

variant1_predictions = []
for i in range(len(requests_predictions_file)):
    variant1_predictions.append(float(json.loads(requests_predictions_file[i])['captureData']['endpointOutput']['data']))

In [None]:
variant2_predictions = f'{s3_capture_upload_path}/{endpoint_name}/Variant2'
!aws s3 cp --recursive {variant2_predictions} ./data/model_monitor/Variant2

In [None]:
for root, dirs, files in os.walk('./data/model_monitor/Variant2'):
     for file in files:
        if file != '.DS_Store':
            with open(os.path.join(root, file), "r", encoding = "utf-8") as auto:
                requests_predictions_file = auto.readlines()

variant2_predictions = []
for i in range(len(requests_predictions_file)):
    variant2_predictions.append(float(json.loads(requests_predictions_file[i])['captureData']['endpointOutput']['data']))

In [None]:
ground_truth = y_test.tolist()
list(zip(ground_truth, variant1_predictions))[0:10]

In [None]:
list(zip(ground_truth, variant2_predictions))[0:10]

We can use Levene's test to assess the equality of variances.

In [None]:
from scipy.stats import levene

errors_var_1 = [ground_truth[i] - variant1_predictions[i] for i in range(len(ground_truth))]
errors_var_2 = [ground_truth[i] - variant2_predictions[i] for i in range(len(ground_truth))]

stat, p = levene(errors_var_1, errors_var_2)
p

Not enough evidence to suggest the errors between both models were unlikely due to random chance.

If there was, you'd need to reroute traffic to the better production variant.

In [None]:
import boto3

sm = boto3.client('sagemaker')

response = sm.update_endpoint_weights_and_capacities(
    EndpointName=endpoint_name,
    DesiredWeightsAndCapacities=[
        {
            'DesiredWeight': 25,
            'VariantName': variant_1['VariantName']
        },
        {
            'DesiredWeight': 75,
            'VariantName': variant_2['VariantName']
        },
    ]
)

## Invoking SageMaker Endpoints <a class="anchor" id="InvokingSageMakerEndpoints">

In the code so far, we've seen examples of training a model, deploying it as an endpoint, then using that deployed model object to do predictions. But what if we want to call an existing SageMaker endpoint? Well, there are a couple ways to do this. The first is with SageMaker's Python SDK and the second with boto3.

Calling an endpoint with SageMaker's Python SDK:

In [None]:
import sagemaker

sess = sagemaker.Session()

predictor = RealTimePredictor(endpoint='pytorch-housing',
                              sagemaker_session=sess,
                              serializer=json_serializer,
                              deserializer=json_deserializer)

predictor.predict(x_test[0])

Or call an endpoint using boto3

In [None]:
import json

sm_runtime = boto3.client('sagemaker-runtime')
# Stringify the numpy array
payload = str(x_test[0].tolist())
prediction = sm_runtime.invoke_endpoint(EndpointName='pytorch-housing',
                                         ContentType='application/json',
                                         Body=payload)
prediction = json.loads(prediction['Body'].read())
prediction

## Clean Up <a class="anchor" id="CleanUp">

To avoid billing charges from stray resources, you can delete the prediction endpoint to release its associated instance(s).

In [None]:
predictor.delete_endpoint(delete_endpoint_config=True)
tuning_predictor.delete_endpoint(delete_endpoint_config=True)
mme_predictor.delete_endpoint(delete_endpoint_config=True)
!aws sagemaker delete-monitoring-schedule --monitoring-schedule-name pytorch-boston-housing-model-monitor-schedule
# Manually delete production variant endpoint (for now)