![logo](../imgs/MLU_Logo.png)

---

# Automate an End-to-end MLOps Pipeline

Now, it is time for automating the ML pipeline using the MLOps environment.

We'll do that by putting a zip file, called **trainingjob.zip**, in an S3 bucket. CodePipeline will listen to that bucket and start a job. This zip file has the following structure:
 - trainingjob.json (Sagemaker training job descriptor)
 - environment.json (Instructions to the environment of how to deploy and prepare the endpoints)

## Configuring the Container Image

Let's start defining the hyperparameters and other attributes.


In [1]:
import sagemaker
import boto3

use_xgboost_builtin=True

sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
region = boto3.session.Session().region_name
model_prefix='iris-model'
training_image = None
hyperparameters = None
if use_xgboost_builtin: 
    training_image = sagemaker.image_uris.retrieve('xgboost', boto3.Session().region_name, version='1.0-1')
    hyperparameters = {
        "alpha": 0.42495142279951414,
        "eta": 0.4307531922567607,
        "gamma": 1.8028358018081714,
        "max_depth": 10,
        "min_child_weight": 5.925133573560345,
        "num_class": 3,
        "num_round": 30,
        "objective": "multi:softmax",
        "reg_lambda": 10,
        "silent": 0,
    }
else:
    training_image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account_id, region, model_prefix)
    hyperparameters = {
        "max_depth": 11,
        "n_jobs": 5,
        "n_estimators": 120
    }
print(training_image)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3


### Defining Parameters

Here, we define the parameters for training and deploying Lambda functions: `mlops-op-training` and `mlops-op-deployment`. You can check the details of functions in [**Lambda**](https://console.aws.amazon.com/lambda/home?region=us-west-2#/functions) console.

#### Defining `training_params`

Notice that you need to replace "**your_alias**" with your alias when you setup the Cloudformation pipeline. 

In [2]:
import time
import sagemaker
import boto3

your_alias = "rlhu"
roleArn = "arn:aws:iam::{}:role/{}".format(account_id, your_alias)
timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
job_name = model_prefix + timestamp
sagemaker_session = sagemaker.Session()

training_params = {}

# Here we set the reference for the Image Classification Docker image, stored on ECR (https://aws.amazon.com/pt/ecr/)
training_params["AlgorithmSpecification"] = {
    "TrainingImage": training_image,
    "TrainingInputMode": "File"
}

# The IAM role with all the permissions given to Sagemaker
training_params["RoleArn"] = roleArn

# Here Sagemaker will store the final trained model
training_params["OutputDataConfig"] = {
    "S3OutputPath": 's3://{}/{}'.format(sagemaker_session.default_bucket(), model_prefix)
}

# This is the config of the instance that will execute the training
training_params["ResourceConfig"] = {
    "InstanceCount": 1,
    "InstanceType": "ml.m4.xlarge",
    "VolumeSizeInGB": 30
}

# The job name. You'll see this name in the Jobs section of the Sagemaker's console
training_params["TrainingJobName"] = job_name

for i in hyperparameters:
    hyperparameters[i] = str(hyperparameters[i])
    
# Here you will configure the hyperparameters used for training your model.
training_params["HyperParameters"] = hyperparameters

# Training timeout
training_params["StoppingCondition"] = {
    "MaxRuntimeInSeconds": 360000
}

# The algorithm currently only supports fully replicated model (where data is copied onto each machine)
training_params["InputDataConfig"] = []

Here we set training and validation dataset.

In [3]:
training_params["InputDataConfig"].append({
    "ChannelName": "train",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/train'.format(
                sagemaker_session.default_bucket(), 
                model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})
training_params["InputDataConfig"].append({
    "ChannelName": "validation",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/validation'.format(
                sagemaker_session.default_bucket(), 
                model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})
training_params["Tags"] = []

#### Defining `deployment_params`

In [4]:
deployment_params = {
    "EndpointPrefix": model_prefix,
    "DevelopmentEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/dev'.format(
            sagemaker_session.default_bucket(), model_prefix),
        # we don't want to enable A/B tests in development
        "ABTests": False,
        # we'll use a basic instance for testing purposes
        "InstanceType": "ml.t2.large", # "ml.t3.medium" does not work
        "InitialInstanceCount": 1,
        # we don't want high availability/escalability for development
        "AutoScaling": None
    },
    "ProductionEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/prd'.format(
            sagemaker_session.default_bucket(), model_prefix),
        # we want to do A/B tests in production
        "ABTests": True,
        # we'll use a better instance for production. CPU optimized
        "InstanceType": "ml.c5.xlarge", # "ml.m4.xlarge", # 
        "InitialInstanceCount": 2,
        "InitialVariantWeight": 0.1,
        # we want elasticity. at minimum 2 instances to support the endpoint and at maximum 10
        # we'll use a threshold of 200 predictions per instance to start adding new instances or remove them
        "AutoScaling": {
            "MinCapacity": 2,
            "MaxCapacity": 10,
            "TargetValue": 200.0,
            "ScaleInCooldown": 30,
            "ScaleOutCooldown": 60,
            "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance"
        }
    }
}

#### Preparing and uploading the dataset

In [5]:
%%time

import numpy as np
import sagemaker
from sklearn import datasets
from sklearn.model_selection import train_test_split

sagemaker_session = sagemaker.Session()
iris = datasets.load_iris()
prefix='mlops/iris'

X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.33, random_state=42, stratify=iris.target)
np.savetxt("iris_train.csv", np.column_stack((y_train, X_train)), delimiter=",", fmt='%0.3f')
np.savetxt("iris_test.csv", np.column_stack((y_test, X_test)), delimiter=",", fmt='%0.3f')

# Upload the dataset to an S3 bucket
input_train = sagemaker_session.upload_data(path='iris_train.csv', key_prefix='%s/input/train' % model_prefix)
input_test = sagemaker_session.upload_data(path='iris_test.csv', key_prefix='%s/input/validation' % model_prefix)

CPU times: user 419 ms, sys: 33 ms, total: 452 ms
Wall time: 608 ms


## Activating the Pipeline 

Alright! Now it's time to start the end-to-end training/deployment process.

In [6]:
import boto3
import io
import zipfile
import json

s3 = boto3.client('s3')
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]

session = boto3.session.Session()
region = session.region_name

bucket_name = "mlops-%s-%s" % (region, account_id)
print("bucket_name : {}".format(bucket_name))
key_name = "training_jobs/%s/trainingjob.zip" % model_prefix
print("key_name : {}".format(key_name))

bucket_name : mlops-us-east-1-576192184325
key_name : training_jobs/iris-model/trainingjob.zip


Now let's activate our pipeline by putting the training data into the [**S3 bucket**](https://s3.console.aws.amazon.com/s3/buckets/mlops-us-west-2-901591081018?region=us-west-2).

In [7]:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('trainingjob.json', json.dumps(training_params))
    zf.writestr('deployment.json', json.dumps(deployment_params))
zip_buffer.seek(0)

s3.put_object(Bucket=bucket_name, Key=key_name, Body=bytearray(zip_buffer.read()))

{'ResponseMetadata': {'RequestId': 'SMWVFHB9WC36MBD6',
  'HostId': 'uo7iIvIVT7hS0MLwEUdyYJGALVo699JmEx08h1fULDtCph0bJgVED9BF1JAv4n9SNf3ykHSqaCw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'uo7iIvIVT7hS0MLwEUdyYJGALVo699JmEx08h1fULDtCph0bJgVED9BF1JAv4n9SNf3ykHSqaCw=',
   'x-amz-request-id': 'SMWVFHB9WC36MBD6',
   'date': 'Mon, 08 Mar 2021 04:15:14 GMT',
   'x-amz-version-id': 'h6CG0hWwWV4vmXJLBglAUci7ULVXgsK9',
   'etag': '"76d4655fcd7dad56f07cf3e34ed40033"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"76d4655fcd7dad56f07cf3e34ed40033"',
 'VersionId': 'h6CG0hWwWV4vmXJLBglAUci7ULVXgsK9'}

## Monitoring MLOps Pipeline

After the s3 bucket get the new data, the MLOps pipeline will be kicked off automatically. It involved the following functions:
- `TrianModel`: an AWS Cloud Formation stack to train (see in below picture);
- `DeployDev`: an AWS Cloud Formation stack to deploy trained model to **Development** environment (see in below picture);
- `DeployApproval`: an action you need to take in either by clicking `DeployApproval` in Codepipeline or in next notebook;

<img src="../imgs/codepipeline_approval.png" alt="Drawing" style="width: 600px;"/>

- `DeployProd`: an AWS Cloud Formation stack to deploy trained model to **Production** environment (see in below picture);

While the pipeline is built automatically, open the CodePipeline console to see the status of our building pipeline. After around 20 minutes, the finished building pipeline is showing as below:

<img src="../imgs/codepipeline.png" alt="Drawing" style="width: 400px;"/>


> **Action:** Now, click on [the next NOTEBOOK](lab2b_Productionizing_End-to-end_Pipeline.ipynb) to see the progress and test your endpoint

# Other Resources

## A/B TESTS

If you take a look on the **deployment** parameters you'll see that we enabled the **Production** endpoint for A/B tests. To try this, just deploy the first model into production, then run the section **1.3** again. Feel free to change some hyperparameter values in the section **1.1** before starting a new training session.

When publishing the second model into **Development**, the endpoint will be updated and the model will be replaced without compromising the user experience. This is the natural behavior of an Endpoint in SageMaker when you update it.

After you approve the deployment into **Production**, the endponint will be updated and a second model will be added to it. Now it's time to execute some **A/B tests**. In the **Progress** Jupyter (link above), execute the last cell (test code) to show which model answered your request. You just need to keep sending some requests to see the **Production** endpoint using both models A and B, respecting the proportion defined by the variable **InitialVariantWeight** in the deployment params.

In a real life scenario you can monitor the performance of both models and then adjust the **Weight** of each model to do the full transition to the new model (and remove the old one) or to rollback the new deployment.

To adjust the weight of each model (Variant Name) in an endpoint, you just need to call the following function: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.update_endpoint_weights_and_capacities

![logo](../imgs/MLU_Logo.png)