#### PyTorch Complete Project Workflow in Amazon SageMaker
### Model Deployment
    
1. [Local Mode endpoint](#LocalModeEndpoint)
2. [SageMaker hosted endpoint with model monitoring](#SageMakerHostedEndpoint)
3. [Invoking SageMaker endpoints](#InvokingSageMakerEndpoints)
4. [Clean up resources](#CleanUp)

## Local Mode endpoint <a class="anchor" id="LocalModeEndpoint">

While Amazon SageMaker’s Local Mode training is very useful to make sure your training code is working before moving on to full scale training, it also would be useful to have a convenient way to test your model locally before incurring the time and expense of deploying it to production. One possibility is to fetch the XGBoost artifact or a model checkpoint saved in Amazon S3, and load it in your notebook for testing. However, an even easier way to do this is to use the SageMaker Python SDK to do this work for you by setting up a Local Mode endpoint.

More specifically, the Estimator object from the Local Mode training job can be used to deploy a model locally. With one exception, this code is the same as the code you would use to deploy to production. In particular, all you need to do is invoke the local Estimator's deploy method, and similarly to Local Mode training, specify the instance type as either `local_gpu` or `local` depending on whether your notebook is on a GPU instance or CPU instance.  

First, we'll import the variables stored from previous notebooks.

In [None]:
from parameter_store import ParameterStore
import sagemaker
from sagemaker.inputs import TrainingInput
import numpy as np

ps = ParameterStore()
parameters = ps.read()

bucket = parameters['bucket']
s3_prefix = parameters['s3_prefix']
raw_s3 = parameters['raw_s3']
train_dir = parameters['train_dir']
test_dir = parameters['test_dir']
train_dir_csv = parameters['train_dir_csv']
test_dir_csv = parameters['test_dir_csv']
local_model_data = parameters['local_model_data']
remote_model_data = parameters['remote_model_data']
training_job_name = parameters['training_job_name']
tuning_job_name = parameters['tuning_job_name']
s3_input_train_uri = parameters['s3_input_train_uri']
s3_input_test_uri = parameters['s3_input_test_uri']
role = parameters['role']
sess = sagemaker.Session()

s3_input_train = TrainingInput(s3_input_train_uri, content_type='csv')
s3_input_test = TrainingInput(s3_input_test_uri, content_type='csv')
inputs = {'train': TrainingInput, 'test': s3_input_test}

x_test = np.load('./data/test/x_test.npy')
y_test = np.load('./data/test/y_test.npy')

The following single line of code deploys the model locally in the SageMaker XGBoost container using the model artifacts from our local training job:  

In [None]:
from sagemaker.xgboost.model import XGBoostModel

local_model = XGBoostModel(entry_point='train_deploy.py', model_data=local_model_data, role=role, framework_version='1.0-1')
local_predictor = local_model.deploy(initial_instance_count=1, instance_type='local')

To get predictions from the Local Mode endpoint, simply invoke the Predictor's predict method.

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

local_predictor.serializer = CSVSerializer()
local_predictor.deserializer = JSONDeserializer()

local_predictor.predict(x_test[0], initial_args={'Accept': 'text/csv'})

As a sanity check, the predictions can be compared against the actual target values.

In [None]:
local_results = [local_predictor.predict(x_test[i], initial_args={'Accept': 'text/csv'}) for i in range(0, 10)]
print(f'predictions: \t {local_results}')
print(f'target values: \t {y_test[:10]}')

We only trained the model for a few rounds, but the predictions so far should at least appear reasonably within the ballpark.

To avoid having the SageMaker TensorFlow Serving container indefinitely running locally, simply gracefully shut it down by calling the `delete_endpoint` method of the Predictor object.

In [None]:
local_predictor.delete_endpoint()

## SageMaker hosted endpoint with model monitoring <a class="anchor" id="SageMakerHostedEndpoint">

Assuming the best model from the tuning job is better than the model produced by the individual Hosted Training job above, we could now easily deploy that model to production.  A convenient option is to use a SageMaker hosted endpoint, which serves real time predictions from the trained model (Batch Transform jobs also are available for asynchronous, offline predictions on large datasets). The endpoint will retrieve the XGBoost saved model created during training and deploy it within a SageMaker XGBoost Serving container. This all can be accomplished with one line of code.  

More specifically, by calling the `deploy` method of the HyperparameterTuner object we instantiated above, we can directly deploy the best model from the tuning job to a SageMaker hosted endpoint.  It will take several minutes longer to deploy the model to the hosted endpoint compared to the Local Mode endpoint, which is more useful for fast prototyping of inference code.
    
In this example, we'll also be including Model Monitor to monitor the requests to the hosted endpoint for data drift.

First setup Model Monitor's Data Capture for the endpoint.

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

s3_capture_upload_path = f's3://{bucket}/{s3_prefix}/model_monitor'

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

Now deploy the hosted endpoint.

In [None]:
from sagemaker.xgboost import XGBoostModel

model = XGBoostModel(entry_point='train_deploy.py', model_data=remote_model_data,
                     role=role, framework_version='1.0-1',
                     name='xgboost-model-from-hosted-endpoint')

predictor = model.deploy(initial_instance_count=1, instance_type='ml.t2.medium',
                         endpoint_name='xgboost-housing', data_capture_config=data_capture_config)

To get predictions from the hosted endpoint, simply invoke the Predictor's predict method.

In [None]:
predictor.serializer = CSVSerializer()
predictor.deserializer = JSONDeserializer()

predictor.predict(x_test[0], initial_args={'Accept': 'text/csv'})

We can compare the predictions generated by this endpoint with those generated locally by the Local Mode endpoint: 

In [None]:
hosted_results = [predictor.predict(x_test[i], initial_args={'Accept': 'text/csv'}) for i in range(0, 10)]
print(f'local predictions: \t {local_results}')
print(f'hosted predictions: \t {hosted_results}')

### Model Monitor

Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. With Model Monitor, you can set alerts that notify you when there are deviations in the model quality. Early and proactive detection of these deviations enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling. You can use Model Monitor prebuilt monitoring capabilities that do not require coding. You also have the flexibility to monitor models by coding to provide custom analysis.

First, we need to create a baseline so Model Monitor can generate baseline statistics for the training data.

These [statistics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-statistics.html) include mean, standard deviation, min, max, distribution, and percentage of missing data. We can see some of those statistics in our training data below:

In [None]:
import pandas as pd

x_train = np.load('./data/train/x_train.npy')
x_train_df = pd.DataFrame(x_train)
x_train_df.describe()

And here's the distribution of our features.

In [None]:
# Plot distribution of all features
x_train_df.plot.kde(subplots=True, layout=(4,4), figsize=(20, 9));

Ok, let's generate the full set of statistics and constraints now.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
import numpy as np

# Baseline data of the training data that we saved as CSV
baseline_data_uri = f's3://{bucket}/{s3_prefix}/data/train/train.csv'
baseline_results_uri = f's3://{bucket}/{s3_prefix}/model_monitor/baseline_output'

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=baseline_results_uri,
    wait=True
)

The constraints are auto-generated based off the training data statistics that are calculated. The constraints represent thresholds around distribution parameters, missing value percentages, and other statistics. More information on constraints can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html).

With constraints and statistics in hand, we can create a monitoring schedule so that Model Monitor will monitor the statistics of the incoming request data and compare it to the baseline training statistics and constraints in order to detect violations. Violations include:

- `data_type_check`
- `completeness_check`
- `baseline_drift_check` (distance is calculated by getting the maximum absolute difference between the cumulative distribution functions of two distributions and compared to a threshold)
- `missing_column_check`
- `extra_column_check`
- `categorical_values_check`

In [None]:
# Oversample training data to skew distributions of features
# Sampling with respect to the distribution of one of our features
oversampled_requests_df = x_train_df.sample(frac=.4, replace=True, random_state=123, weights=x_train_df.groupby(6)[6].transform('count'))
print(oversampled_requests_df.describe())

# Plot distribution of features
x_train_df.plot.kde(subplots=True, layout=(4,4), figsize=(20, 9), title='(BEFORE) Distribution of training features');
oversampled_requests_df.plot.kde(subplots=True, layout=(4,4), figsize=(20, 9), title='(AFTER) Distribution of features across request data');

As there is a considerable difference between some of the training feature distributions and some of the request distributions, Model Monitor would likely trigger a violation and output a `constraint_violations.csv` file in the following location: `s3://{bucket}/{s3_prefix}/model_monitor/violations`. The contents of the file might look something like this:

```json
{
  "violations" : [ {
    "feature_name" : "_c1",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "Baseline drift distance: 0.18977610005771073 exceeds threshold: 0.1"
  }, {
    "feature_name" : "_c8",
    "constraint_check_type" : "baseline_drift_check",
    "description" : "Baseline drift distance: 0.32592205342200775 exceeds threshold: 0.1"
  } ]
}
```

So let's create the monitoring job that will monitor the statistics of the incoming request data and compare it to the baseline training statistics to detect violations.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator

baseline_violations_uri = f's3://{bucket}/{s3_prefix}/model_monitor/violations'

monitor_schedule_name = 'xgboost-boston-housing-model-monitor-schedule'

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=monitor_schedule_name,
    endpoint_input='xgboost-housing',
    output_s3_uri=baseline_violations_uri,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Of course, while you're testing, you could simulate a violation happening by just uploading an empty `constraint_violations.csv` file which could trigger a Lambda function to kick off a re-training pipeline for example.

In [None]:
# If you want to simulate a violation happening
# !touch constraint_violations.csv
# out = f's3://{bucket}/{s3_prefix}/model_monitor/violations/constraint_violations.csv'
# !aws s3 cp ./constraint_violations.csv {out}

### SageMaker hosted endpoint with autotuned parameters

In [None]:
from sagemaker.tuner import HyperparameterTuner, IntegerParameter

# Parameters from last notebook
hyperparameter_ranges = {
  'num_round': IntegerParameter(2, 10)
}

tuner_parameters = {'estimator':model,
                    'objective_metric_name':'validation:aucpr',
                    'hyperparameter_ranges':hyperparameter_ranges,
                    #'metric_definitions':metric_definitions,
                    'max_jobs':4,
                    'max_parallel_jobs':2}
tuner_parameters['estimator'] = model

tuner = HyperparameterTuner(**tuner_parameters)
tuner = tuner.attach(tuning_job_name)
tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.t2.medium',
                                endpoint_name='xgboost-housing-auto')

We can compare the predictions generated by this endpoint with those generated locally by the Local Mode endpoint: 

In [None]:
tuning_predictor.content_type = 'text/csv'
tuning_predictor.accept = 'text/csv'
tuning_predictor.serializer = csv_serializer
tuning_predictor.deserializer = json_deserializer

In [None]:
hosted_results = [tuning_predictor.predict(x_test[i]) for i in range(0, 10)]
print(f'local predictions: \t {local_results}')
print(f'tuner predictions: \t {hosted_results}')

## Invoking SageMaker Endpoints <a class="anchor" id="InvokingSageMakerEndpoints">

Let's restore the endpoint names we created from our parameters file just in case you decided to shut down the kernel or notebook.

In the code so far, we've seen examples of training a model, deploying it as an endpoint, then using that deployed model object to do predictions. But what if we want to call an existing SageMaker endpoint? Well, there are a couple ways to do this. The first is with SageMaker's Python SDK and the second with boto3.

Calling an endpoint with SageMaker's Python SDK:

In [None]:
import boto3
import sagemaker
from sagemaker.predictor import RealTimePredictor

sess = sagemaker.Session()

predictor = RealTimePredictor(endpoint='xgboost-housing',
                              sagemaker_session=sess,
                              serializer=csv_serializer,
                              deserializer=json_deserializer)

predictor.predict(x_test[0])

Or call an endpoint using boto3

In [None]:
import json

sm_runtime = boto3.client('sagemaker-runtime')
# Create a CSV string from the numpy array
payload = ', '.join([str(each) for each in x_test[0]])
prediction = sm_runtime.invoke_endpoint(EndpointName='xgboost-housing',
                                        ContentType='text/csv',
                                        Body=payload)
prediction = json.loads(prediction['Body'].read())
prediction

## Clean Up <a class="anchor" id="CleanUp">

To avoid billing charges from stray resources, you can delete the prediction endpoint to release its associated instance(s).

In [None]:
predictor.delete_endpoint(delete_endpoint_config=True)
tuning_predictor.delete_endpoint(delete_endpoint_config=True)
!aws sagemaker delete-monitoring-schedule --monitoring-schedule-name xgboost-boston-housing-model-monitor-schedule