# Customizing the Build/Train/Deploy MLOps Project Template

At Re:Invent 2020, AWS announced [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/): the first 
purpose-built, easy-to-use Continuous Integration and Continuous Delivery (CI/CD) service for machine learning. 
SageMaker Pipelines has three main components which improves the operational resilience and reproducibility of your 
workflows: Pipelines, Model Registry, and Projects. 

SageMaker Projects introduce MLOps templates that automatically provision the underlying resources needed to enable 
CI/CD capabilities for your Machine Learning Development Lifecycle (MLDC). Customers can use a number of built-in 
templates or create your own custom templates.

This example will focus on using one of the MLOps templates to bootstrap your ML project and establish a CI/CD 
pattern from seed code. We’ll show how to use the built-in Build/Train/Deploy Project template as a base for a 
customer churn classification example. This base template will enable CI/CD for training machine learning models, 
registering model artifacts to the Model Registry, and automating model deployment with manual approval and automated 
testing.

## Deploy the MLOps template for build, train, and deploy

We will package what whe did in the previous session as an automated pipeline (data processing, model training, model evaluation and model deployment). Our starting point will be the output of the Data Wrangler job we defined previously.

We’ll start by taking a detailed look at what AWS services are launched when this build, train, deploy MLOps template 
is launched. Later, we’ll discuss how the skeleton can be modified for a custom use case. 

To get started with SageMaker Projects, [they must be first enabled in the SageMaker Studio console](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-studio-updates.html). 
This can be done for existing users or while creating new ones:

<img src="img/enable_projects.png">

Within Amazon SageMaker Studio, you can now select “Projects” from a drop-down menu on the “Components and registries” 
tab as shown below:

<img src="img/select_projects.png">

From the projects page you’ll have the option to launch a pre-configured SageMaker MLOps template. We'll select the build, train and deploy template:

<img src="img/create_project.png">

> **NOTE:** Launching this template will kick off a model building pipeline by default and will train a regression model. This will incur a small cost.

Once the project is created from the MLOps template, the following architecture will be deployed:

<img src="img/deep_dive.png">


## Modify the seed code for our custom use case

The SageMaker-provided template has initialized with seed code for a generic demo use case (the Abalone dataset).

Once the project has been created, you'll be able to see:

- The visualization of the SageMaker Pipeline from the "Pipelines" drop down menu within SageMaker Studio
- CI/CD pipelines in [AWS CodePipeline](https://console.aws.amazon.com/codesuite/codepipeline/pipelines) defining the overall process flow, within which the SageMaker Pipeline execution is one step.

In order to modify the seed code from this launched template to match our own (credit default) example, we’ll first need to clone the AWS CodeCommit repositories to our local SageMaker Studio instance.

▶️ **Select** the SageMaker Project that was just created from the list of projects. 

▶️ **Clone both** the AWS CodeCommit repositories to your notebook, by clicking the hyperlinks in the “Repositories” tab of the project.

▶️ **Copy** the `Repository local path` from the dialog for each repo and paste it into the cell below.

<img src="img/clone_repos.png">

In [None]:
model_build_repo_local_path = ""  # TODO: Update
model_deploy_repo_local_path = ""  # TODO: Update

### ModelBuild Repo

The “ModelBuild” repository contains the code for preprocessing, training, and evaluating the model. 

<img src="img/repo_directory.png">

First, we'll rename the `abalone` directory to `credit_default`:

In [None]:
if not os.path.isdir(model_build_repo_abs_path):
    raise ValueError(
        "Couldn't find repository at {}. Did you clone the repo and update the cell?".format(
            model_build_repo_abs_path,
        )
    )

if os.path.isdir(f"{model_build_repo_abs_path}/pipelines/abalone"):
    os.rename(
        f"{model_build_repo_abs_path}/pipelines/abalone",
        f"{model_build_repo_abs_path}/pipelines/credit_default",
    )

Next, we'll update the `codebuild-buildspec.yml` (which defines the commands run to define and execute the SageMaker Pipeline):

In [None]:
import os

model_build_repo_abs_path = f"/root/{model_build_repo_local_path}"

with open(f"{model_build_repo_abs_path}/codebuild-buildspec.yml", "r") as f:
    newspec = f.read().replace("abalone", "credit_default")

with open(f"{model_build_repo_abs_path}/codebuild-buildspec.yml", "w") as f:
    f.write(newspec)

Finally we'll overwrite the project's pipeline definition with modified files this local folder.

First though, you'll need to manually update one of the files with the specific location of our project's data from the last notebook - as shown below:

In [None]:
%store -r flow_output_s3uri
print(flow_output_s3uri)

▶️ **Open** `modelbuild/pipelines/pipeline.py` from this notebook's folder

▶️ **Edit** the `default_value` of the *InputDataUrl parameter* to match the S3 URI above. The final result will look something like:

```python
    input_data = ParameterString(
        name="InputDataUrl",
        default_value=f"s3://creditmodel-myname-mlsandbox/data-wrangler/credit-flow-2048-01-01/.../default/",  # Change this to point to the s3 location of your raw input data.
    )
```

▶️ **Confirm** that you've done it by editing the cell below to continue!

In [None]:
I_UPDATED_THE_INPUT_DATA_URL_DEFAULT_VALUE = False

if not I_UPDATED_THE_INPUT_DATA_URL_DEFAULT_VALUE:
    raise ValueError(
        "Follow the instructions above to edit pipeline.py and then confirm here to continue!"
    )

You should now be ready to run the below to copy the modified files into your local copy of the model-build repository:

In [None]:
import shutil

for item in os.listdir("modelbuild/pipelines/credit_default"):
    their_item_path = f"{model_build_repo_abs_path}/pipelines/credit_default/{item}"
    if os.path.isdir(their_item_path):
        shutil.rmtree(their_item_path)
    elif os.path.isfile(their_item_path):
        os.remove(their_item_path)

    our_item_path = f"modelbuild/pipelines/credit_default/{item}"
    if os.path.isdir(our_item_path):
        shutil.copytree(our_item_path, their_item_path)
    else:
        shutil.copyfile(our_item_path, their_item_path)
    print(f"Copied {our_item_path}")

## Trigger a new Pipeline execution through git commit

In the previous section we updated your local copy of the model build repository - but didn't yet commit and push these changes to source control.

By committing and pushing these changes to the AWS CodeCommit repository, a new pipeline execution will be triggered ([via Amazon EventBridge](https://docs.aws.amazon.com/codecommit/latest/userguide/monitoring-events.html))

▶️ **Navigate** to your local copy of the `modelbuild` repository using the folder tab in SageMaker Studio, then **switch** to the source control tab as shown before (to see git status for the correct repository).

▶️ **Stage**, **Commit** and **push** the changes as shown below:

<img src="img/git_push.png">

After a few moments, a new execution of the SageMaker pipeline will be triggered and we can monitor the execution by selecting your Pipeline inside of the SageMaker Project:

<img src="img/execute_pipeline.png">

<img src="img/dag.png">

## Deploy the registered model

When the pipeline above completes, you'll have a trained and evaluated model stored in the SageMaker Model Registry (refer to the "Model groups" section in the SageMaker Studio Components & Registries tab).

However, before we approve the model to trigger deployment, we'll also customize the deployment infrastructure.

### Customizing the deployment infrastructure

We'd like to enable data capture on our deployed endpoints, but will need to configure a location in Amazon S3.

▶️ **Run** the cell below to generate a data capture URI using the default SageMaker bucket, and **copy** the output:

In [None]:
import sagemaker

data_capture_s3uri = f"s3://{sagemaker.Session().default_bucket()}/mlopsdemo/capture"
data_capture_s3uri

▶️ **Edit** the `modeldeploy/endpoint-config-template.yml` file to set the DestinationS3Uri for data capture to the value above

When you're done, update the below to confirm and run the cell to copy this file to your local copy of the ModelDeploy repository.

In [None]:
I_UPDATED_THE_DATA_CAPTURE_URI = True

if not I_UPDATED_THE_DATA_CAPTURE_URI:
    raise ValueError(
        "Follow the instructions above to edit endpoint-config-template.yml and then confirm here to continue!"
    )


shutil.copyfile(
    "modeldeploy/endpoint-config-template.yml",
    f"/root/{model_deploy_repo_local_path}/endpoint-config-template.yml"
)
print(f"Copied modeldeploy/endpoint-config-template.yml")

▶️ **Commit and push** your change to the `modeldeploy` repository like we previously did for the `modelbuild` repository.

### Approving the model

Although pushing the change should trigger the ModelDeploy CodePipeline already, we'll also need to **approve the model** which will re-trigger again and actually enable deployment.

▶️ **Open** the “Model groups” section in the SageMaker Studio UI and inspect the metadata attached to the model artifacts. From there, approve the model for deployment:

<img src="img/model_metrics.png">

<img src="img/approve_model.png">

This approval will trigger the ModelDeploy [CodePipeline](https://console.aws.amazon.com/codesuite/codepipeline/pipelines) and create a `staging` endpoint for real time inference.

After verifying the endpoint, you can approve *promotion* of the model via the ***ModelDeploy CodePipeline***, to trigger deployment of a `prod` endpoint as well:

<img src="img/endpoints.png">



## Trigger a new Pipeline Execution through SDK

Alternatively you can also retrieve and execute an existing pipeline through the sagemaker SDK. The template created a 
file `get_pipeline` which you can use to trigger an execution in your own notebook


```
# This is the module name or the path to your pipeline.py file.
from pipelines.customer_churn.pipeline import get_pipeline

model_package_group_name = f"CustomerChurnPackageGroup"
pipeline_name = f"CustomerChurnDemo-p-ewf8t7lvhivm"


# These variables were defined the IAM role.
pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
)
```

### Submit the pipeline to SageMaker and start execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

```
pipeline.upsert(role_arn=role)
execution = pipeline.start()

execution.describe()
execution.wait()
```

### Parametrized Executions

We can run additional executions of the pipeline specifying different pipeline parameters. The parameters argument is a 
dictionary whose names are the parameter names, and whose values are the primitive values to use as overrides of the defaults.

Of particular note, based on the performance of the model, we may want to kick off another pipeline execution, but this 
time on a compute-optimized instance type and set the model approval status automatically be "Approved". This means 
that the model package version generated by the `RegisterModel` step will automatically be ready for deployment through 
CI/CD pipelines, such as with SageMaker Projects.

```
# Note: You can change the ModelApprovalStatus to "PendingManualApproval". This is the default set in the pipeline.py file.

execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)


execution.wait()
execution.list_steps()
```

## Real-time inference

Once the staging endpoint is deployed, we can query it for real-time inference requests using the same data we did for batch transform testing in notebook 1.

First, we'll re-load the data from the previous notebook:

In [None]:
%store -r test_result_df

# Optionally drop any fields that might have snuck in but shouldn't be there:
test_result_df = test_result_df.drop(columns=["credit_default_staging"], errors="ignore")
# Drop the previous testing results:
input_df = test_result_df.drop(columns=["credit_default", "credit_default_pred"])

# Create an alternative *biased* set by selecting only defaulted records:
skewed_df = test_result_df[
    test_result_df["credit_default"] == 1
].drop(columns=["credit_default", "credit_default_pred"])

input_df.head()

To conveniently send data through the endpoint from Python, we can use the `Predictor` class from the high-level SageMaker Python SDK:

In [None]:
predictor_staging = sagemaker.predictor.Predictor(
    "mlopsdemo-staging",  # Replace with your 'endpoint name' from above
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer(),
)

With this Predictor, we can send the records to the endpoint one by one (or in manageable batches) - and then plot a classification accuracy report similarly to notebook 1:

In [None]:
staging_results = [
    # df.iterrows messes up data types. First element of a itertuple row is index
    # Returns a 2D array of 1 string element, so we take [0][0]
    float(predictor_staging.predict(rowtuple[1:])[0][0])
    for rowtuple in input_df.itertuples()
]

In [None]:
import util

output_df = test_result_df.copy()
output_df["credit_default_staging"] = staging_results

util.plotting.generate_classification_report(
    y_real=output_df["credit_default"],
    y_predict_proba=output_df["credit_default_staging"],
    decision_threshold=0.5,
    class_names_list=["good", "default"],
    title="Staging pipeline credit risk model",
)

## Data drift monitoring

Because our endpoint has data capture enabled, you'll be able to configure *Data Quality Monitoring* via the SageMaker Studio UI.

First, we need a representative **baseline dataset** which future captured data will be compared to.

While we could use the training data for this, the training data has a very unrealistic distribution of output variables (ground truth values '1' or '0') as compared to real inference data (where a floating point score e.g. '0.23' would be output).

It is possible to baseline from live capture data, but for this example let's create and upload a simple CSV:

In [None]:
baseline_df = input_df.copy()
baseline_df["credit_default"] = staging_results
baseline_df.head()

In [None]:
drift_baseline_s3uri = f"s3://{sagemaker.Session().default_bucket()}/mlopsdemo/data-drift-baseline/baseline.csv"

baseline_df.to_csv(drift_baseline_s3uri, index=False)
print(f"Uploaded baseline file to {drift_baseline_s3uri}")

▶️ **Open** your `staging` endpoint from the *Endpoints* view in SageMaker Studio and click "Create monitoring schedule" for data quality:

![](img/create_monitoring_schedule.png "Screenshot of SMStudio Create Model Monitoring option")

▶️ **Configure** the schedule as follows:

- Set *Schedule Expression* to `hourly`
- Set *S3 Output Configuration* S3 bucket name and prefix as per output of the following notebook cell
- Set *Baseline dataset S3 location* bucket name and prefix as per the below
- Set *Baseline S3 output location* bucket name and prefix as per the below
- Leave other parameters as default

In [None]:
drift_output_s3uri = f"s3://{sagemaker.Session().default_bucket()}/mlopsdemo/data-drift"
print(f"S3 Output Configuration:\n{drift_output_s3uri}\n")

print(f"Baseline dataset S3 location:\n{drift_baseline_s3uri}\n")

drift_baseline_output_s3uri = f"s3://{sagemaker.Session().default_bucket()}/mlopsdemo/data-drift-baseline/output"
print(f"Baseline S3 Output:\n{drift_baseline_output_s3uri}\n")

Once monitoring is enabled on your endpoint, reports will be generated every hour: With generation kicked off between 0 and 20 minutes after the hour.

To explore the results you can:

▶️ **In the initial hour's window**, repeat the "Real time inference" steps above to generate normal traffic to the endpoint

▶️ **In the next hour's window**, use code like the below to generate biased data using only a sample of the dataset:

In [None]:
for _ in range(20):
    skewed_results = [
        # df.iterrows messes up data types. First element of a itertuple row is index
        # Returns a 2D array of 1 string element, so we take [0][0]
        float(predictor_staging.predict(rowtuple[1:])[0][0])
        for rowtuple in skewed_df.itertuples()
    ]

The monitoring results can be viewed from the Endpoint's detail page in SageMaker Studio.