# Simple SageMaker Pipelines for Traffic Dataset

This notebook illustrates how to take different actions based on model performance in a SageMaker Pipeline.

The steps in this pipeline include:
* Preprocessing the California Housing dataset.
* Train a TensorFlow2 Artificial Neural Network (ANN) Model.
* Evaluate the model performance - mean square error (MSE).
* If MSE is higher than threshold, use a Lambda step to send an E-Mail to the Data Science team.
* If MSE is lower than threshold, register the model into the Model Registry, and use a Lambda step to deploy the model to SageMaker Endpoint.

## Prerequisites

#### Add `AmazonSageMakerPipelinesIntegrations` policy

The notebook execution role should have policies which enable the notebook to create a Lambda function. The Amazon managed policy `AmazonSageMakerPipelinesIntegrations` can be added to the notebook execution role. 

The policy description is:

```

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:CreateFunction",
                "lambda:DeleteFunction",
                "lambda:InvokeFunction",
                "lambda:UpdateFunctionCode"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:*sagemaker*",
                "arn:aws:lambda:*:*:function:*sageMaker*",
                "arn:aws:lambda:*:*:function:*SageMaker*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:CreateQueue",
                "sqs:SendMessage"
            ],
            "Resource": [
                "arn:aws:sqs:*:*:*sagemaker*",
                "arn:aws:sqs:*:*:*sageMaker*",
                "arn:aws:sqs:*:*:*SageMaker*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "arn:aws:iam::*:role/*",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": [
                        "lambda.amazonaws.com"
                    ]
                }
            }
        }
    ]
}
    
```

#### Add inline policy to enable creation of IAM role required for the Lambda Function

The notebook execution role should have an inline policy which enable the notebook to create the IAM role required for the Lambda function. An inline policy can be added to the notebook execution role. 

The policy description is:

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:GetRole",
                "iam:CreateRole",
                "iam:AttachRolePolicy"
            ],
            "Resource": "*"
        }
    ]
}
```


In [2]:
import sys

In [3]:
! pip install -U sagemaker

Collecting sagemaker
  Downloading sagemaker-2.105.0.tar.gz (567 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m567.9/567.9 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (setup.py) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.105.0-py2.py3-none-any.whl size=783586 sha256=4090825bbc3996c948b11918f25bbb71660f131578c2f0d4297cdfddedd71cd0
  Stored in directory: /root/.cache/pip/wheels/52/fc/34/94a8645919501020cf6fc1b5a0704c8681cc707db68ee61754
Successfully built sagemaker
Installing collected packages: sagemaker
  Attempting uninstall: sagemaker
    Found existing installation: sagemaker 2.104.0
    Uninstalling sagemaker-2.104.0:
      Successfully uninstalled sagemaker-2.104.0
Successfully installed sagemaker-2.105.0
[0m

In [4]:
import os
import time
import boto3
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker import get_execution_role

In [38]:
bucket = "<YOUR BUCKKET NAME HERE>"

train_instance_type='ml.m5.12xlarge'      # The type of EC2 instance which will be used for training
deploy_instance_type='ml.m5.4xlarge'     # The type of EC2 instance which will be used for deployment

'''
we can use the train and validation path as stated above 
or you can 
just rearrange data and use a single path like below
'''
training_data_uri="s3://{}".format(bucket)
test_data_uri=f"{training_data_uri}/test"

sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
#model_package_group_name = "TF2-California-Housing"  # Model name in model registry
prefix = "tf2-traffic-pipelines"
#pipeline_name = "TF2CaliforniaHousingPipeline"  # SageMaker Pipeline name
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

## Train model step
In the second step, the train and validation output from the precious processing step are used to train a model. 

In [34]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

#hyperparameters = {"epochs": training_epochs}
tensorflow_version = "2.4.1"
python_version = "py37"

tf2_estimator = TensorFlow(
  base_job_name='tensorflow-pssummit-traffic-class',
  entry_point="tfModelCode.py",             # Your entry script
  role=role,
  framework_version=tensorflow_version,               # TensorFlow's version
  py_version=python_version,
  output_path=model_path,
  instance_count=1, 
  instance_type=train_instance_type,
)

#print("Training ...")
#estimator_tf.fit(training_data_uri,logs=False)

# Use the tf2_estimator in a Sagemaker pipelines ProcessingStep.
# NOTE how the input to the training job directly references the output of the previous step.
step_train_model = TrainingStep(
    name="Train-Traffic-Model",
    estimator=tf2_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=training_data_uri,
        ),
    },
)

In [28]:
training_data_uri

's3://agm-dov-temp'

## Evaluate model step
When a model is trained, it's common to evaluate the model on unseen data before registering it with the model registry. This ensures the model registry isn't cluttered with poorly performing model versions. To evaluate the model, create a ScriptProcessor object and use it in a ProcessingStep.

**Note** that a separate preprocessed test dataset is used to evaluate the model, and not the output of the processing step. This is only for demo purposes, to ensure the second run of the pipeline creates a model with better performance. In a real-world scenario, the test output of the processing step would be used.

In [66]:
%%writefile evaluate.py

import os
import json
import subprocess
import sys
import numpy as np
import pathlib
import tarfile

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

test_dir = '/opt/ml/processing/test'

if __name__ == "__main__":

    install("tensorflow==2.4.1")
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall("./model")
    import tensorflow as tf
    from tensorflow.keras.preprocessing import image

    # Function for generating batches
    def get_batches(dirname, gen=image.ImageDataGenerator(), shuffle=False, batch_size=32, class_mode='categorical',
                    target_size=(250,250)):
        return gen.flow_from_directory(dirname, target_size=target_size,
                class_mode=class_mode, shuffle=shuffle, batch_size=batch_size)

    # Load all data
    def get_data(path, target_size=(250,250)):
        batches = get_batches(path, shuffle=False, batch_size=1, class_mode=None, target_size=target_size)
        return np.concatenate([batches.next() for i in range(batches.samples)])
    
    
    model = tf.keras.models.load_model("./model/tf000000001/1")
    test_path = "/opt/ml/processing/test/"
    
    #LOAD TEST DATA
    test_gen=get_batches(test_dir, shuffle=False, batch_size=32).classes
    test_labels=tf.keras.utils.to_categorical(test_gen)
    test_img=get_data(test_dir)

    scores = model.evaluate(test_img, test_labels, verbose=2)
    print("\nTest Eval :", scores)

    # Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "classification_metrics": {
            "acc": {"value": scores},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
        

Overwriting evaluate.py


In [67]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"

# Create SKLearnProcessor object.
# The object contains information about what container to use, what instance type etc.
evaluate_model_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="tf2-traffic-evaluate",
    role=role,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_evaluate_model = ProcessingStep(
    name="Evaluate-Traffic-Model",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=test_data_uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluate.py",
    property_files=[evaluation_report],
)

## Send E-Mail Lambda Step

When defining the `LambdaStep`, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the `lambda_func` argument to provide the function ARN to an already deployed Lambda function OR use the `Lambda` class to create a Lambda function by providing a script, function name and role for the Lambda function.

When passing inputs to the Lambda, the `inputs` argument can be used and within the Lambda function's handler, the `event` argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the `LambdaOutput` objects provided to the `outputs` argument. The `output_name` in `LambdaOutput` corresponds to the dictionary key in the Lambda's return dictionary.

#### Define the Lambda function

Users can choose the leverage the Lambda helper class to create a Lambda function and provide that function object to the `LambdaStep`. Alternatively, users can use a pre-deployed Lambda function and provide the function ARN to the `Lambda` helper class in the lambda step.

Here, If the MSE is lower than threshold, an E-Mail will be sent to Data Science team.

Note that the E-Mail sending part is left for you to implement by the framework you choose.

In [17]:
%%writefile send_email_lambda.py

"""
This Lambda function sends an E-Mail to the Data Science team with the MSE from model evaluation step. 
The evaluation.json location in S3 is provided via the `event` argument
"""

import json
import boto3


s3_client = client = boto3.client("s3")


def lambda_handler(event, context):

    print(f"Received Event: {event}")

    evaluation_s3_uri = event["evaluation_s3_uri"]
    path_parts = evaluation_s3_uri.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)

    content = s3_client.get_object(Bucket=bucket, Key=key)
    text = content["Body"].read().decode()
    evaluation_json = json.loads(text)
    mse = evaluation_json["regression_metrics"]["mse"]["value"]

    subject_line = "Please check high MSE ({}) detected on model evaluation".format(mse)
    print(f"Sending E-Mail to Data Science Team with subject line: {subject_line}")

    # TODO - ADD YOUR CODE TO SEND EMAIL...

    return {"statusCode": 200, "body": json.dumps("E-Mail Sent Successfully")}

Writing send_email_lambda.py


#### IAM Role

The Lambda function needs an IAM role that will allow it to read the `evaluation.json` from S3. The role ARN must be provided in the `LambdaStep`.

A helper function in `iam_helper.py` is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - `AmazonS3ReadOnlyAccess`. This should be replaced with an IAM policy with the least privileges as per AWS IAM best practices.

In [18]:
from iam_helper import create_s3_lambda_role

lambda_role = create_s3_lambda_role("send-email-to-ds-team-lambda-role")

Waiting 30 seconds for the IAM role to propagate


#### Create the Lambda Function step

In [19]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

send_email_lambda_function_name = "sagemaker-send-email-to-ds-team-lambda-" + current_time

send_email_lambda_function = Lambda(
    function_name=send_email_lambda_function_name,
    execution_role_arn=lambda_role,
    script="send_email_lambda.py",
    handler="send_email_lambda.lambda_handler",
)

step_higher_mse_send_email_lambda = LambdaStep(
    name="Send-Email-To-DS-Team",
    lambda_func=send_email_lambda_function,
    inputs={"evaluation_s3_uri": evaluation_s3_uri},
)

## Register model step
If the trained model meets the model performance requirements a new model version is registered with the model registry for further analysis. To attach model metrics to the model version, create a [`ModelMetrics`](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html) object using the evaluation report created in the evaluation step. Then, create the RegisterModel step.

In [75]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_package_group_name = "TF2-Traffic"  # Model name in model registry

# Create ModelMetrics object using the evaluation report from the evaluation step
# A ModelMetrics object contains metrics captured from a model.
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

# Create a RegisterModel step, which registers the model with Sagemaker Model Registry.
step_register_model = RegisterModel(
    name="Register--Traffic-Model",
    estimator=tf2_estimator,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
)

## Create the model

The model is created and the name of the model is provided to the Lambda function for deployment. The `CreateModelStep` dynamically assigns a name to the model.

In [21]:
from sagemaker.workflow.step_collections import CreateModelStep
from sagemaker.tensorflow.model import TensorFlowModel

model = TensorFlowModel(
    role=role,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    framework_version=tensorflow_version,
    sagemaker_session=sagemaker_session,
)

step_create_model = CreateModelStep(
    name="Create-California-Housing-Model",
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type="ml.m5.large"),
)

## Deploy model to SageMaker Endpoint Lambda Step

When defining the `LambdaStep`, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the `lambda_func` argument to provide the function ARN to an already deployed Lambda function OR use the `Lambda` class to create a Lambda function by providing a script, function name and role for the Lambda function.

When passing inputs to the Lambda, the `inputs` argument can be used and within the Lambda function's handler, the `event` argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the `LambdaOutput` objects provided to the `outputs` argument. The `output_name` in `LambdaOutput` corresponds to the dictionary key in the Lambda's return dictionary.

### Define the Lambda function

Here, the Lambda Function will deploy the model to SageMaker Endpoint.

In [22]:
%%writefile deploy_model_lambda.py


"""
This Lambda function deploys the model to SageMaker Endpoint. 
If Endpoint exists, then Endpoint will be updated with new Endpoint Config.
"""

import json
import boto3
import time


sm_client = boto3.client("sagemaker")


def lambda_handler(event, context):

    print(f"Received Event: {event}")

    current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
    endpoint_instance_type = event["endpoint_instance_type"]
    model_name = event["model_name"]
    endpoint_config_name = "{}-{}".format(event["endpoint_config_name"], current_time)
    endpoint_name = event["endpoint_name"]

    # Create Endpoint Configuration
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": endpoint_instance_type,
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )
    print(f"create_endpoint_config_response: {create_endpoint_config_response}")

    # Check if an endpoint exists. If no - Create new endpoint, if yes - Update existing endpoint
    list_endpoints_response = sm_client.list_endpoints(
        SortBy="CreationTime",
        SortOrder="Descending",
        NameContains=endpoint_name,
    )
    print(f"list_endpoints_response: {list_endpoints_response}")

    if len(list_endpoints_response["Endpoints"]) > 0:
        print("Updating Endpoint with new Endpoint Configuration")
        update_endpoint_response = sm_client.update_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f"update_endpoint_response: {update_endpoint_response}")
    else:
        print("Creating Endpoint")
        create_endpoint_response = sm_client.create_endpoint(
            EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
        )
        print(f"create_endpoint_response: {create_endpoint_response}")

    return {"statusCode": 200, "body": json.dumps("Endpoint Created Successfully")}

Writing deploy_model_lambda.py


#### IAM Role

The Lambda function needs an IAM role that will allow it to deploy a SageMaker Endpoint. The role ARN must be provided in the `LambdaStep`.

A helper function in `iam_helper.py` is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - `AmazonSageMakerFullAccess`. This should be replaced with an IAM policy with the least privileges as per AWS IAM best practices.

In [23]:
from iam_helper import create_sagemaker_lambda_role

lambda_role = create_sagemaker_lambda_role("deploy-model-lambda-role")

Waiting 30 seconds for the IAM role to propagate


In [24]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

endpoint_config_name = "tf2-california-housing-endpoint-config"
endpoint_name = "tf2-california-housing-endpoint-" + current_time

deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda-" + current_time

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script="deploy_model_lambda.py",
    handler="deploy_model_lambda.lambda_handler",
)

step_lower_mse_deploy_model_lambda = LambdaStep(
    name="Deploy-California-Housing-Model-To-Endpoint",
    lambda_func=deploy_model_lambda_function,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "endpoint_instance_type": endpoint_instance_type,
    },
)

## Accuracy condition step
Adding conditions to the pipeline is done with a ConditionStep.
In this case, we only want to register the new model version with the model registry if the new model meets an accuracy condition.

In [25]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=accuracy_mse_threshold,
)

# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name="MSE-Lower-Than-Threshold-Condition",
    conditions=[cond_lte],
    if_steps=[step_register_model, step_create_model, step_lower_mse_deploy_model_lambda],
    else_steps=[step_higher_mse_send_email_lambda],
)

# Pipeline Creation: Orchestrate all steps

Now that all pipeline steps are created, a pipeline is created.

In [76]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name="mytestpipeline",
#    parameters=[
#        input_data,
#        training_epochs,
#        accuracy_mse_threshold,
#        endpoint_instance_type,
#    ],
    #steps=[step_preprocess_data, step_train_model, step_evaluate_model, step_cond],
    steps=[step_train_model,step_evaluate_model,step_register_model],
)

## Execute the Pipeline

### List the execution steps to check out the status and artifacts:

In [77]:
import json

definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Train-Traffic-Model',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-training:2.4.1-cpu-py37',
     'EnableSageMakerMetricsTimeSeries': True},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-650687152614/tf2-traffic-pipelines/model/'},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
    'ResourceConfig': {'VolumeSizeInGB': 30,
     'InstanceCount': 1,
     'InstanceType': 'ml.m5.12xlarge'},
    'RoleArn': 'arn:aws:iam::650687152614:role/service-role/AmazonSageMaker-ExecutionRole-20220301T162062',
    'InputDataConfig': [{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': 's3://agm-scratch'

### Submit pipeline

In [78]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:650687152614:pipeline/mytestpipeline',
 'ResponseMetadata': {'RequestId': 'b70ece93-b7d0-45ac-b833-f9d71b52e5f0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b70ece93-b7d0-45ac-b833-f9d71b52e5f0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Mon, 22 Aug 2022 18:45:59 GMT'},
  'RetryAttempts': 0}}

### Execute pipeline using the default parameters

In [79]:
execution = pipeline.start()

### Wait for pipeline to complete

In [80]:
execution.wait()

## Visualize SageMaker Pipeline - MSE lower than the threshold
In SageMaker Studio, choose `SageMaker Components and registries` in the left pane and under `Pipelines`, click the pipeline that was created. Then all pipeline executions are shown, and the one just created should have a status of `Succeded`. Selecting that execution, the different pipeline steps can be tracked as they execute.

You can see that the `Register-California-Housing-Model` step was executed.

![](images/pipeline_run_1_mse_lower_than_threshold.png "Pipeline - MSE lower than the threshold")

## Start a pipeline with 2 epochs to trigger the `send-email-to-ds-team-lambda` Lambda Function


Run the pipeline again, but this time, with only 2 epochs and a lower MSE Threshold of 0.2. This will result in a higher MSE value on model evaluation, and will cause the `send-email-to-ds-team-lambda` Lambda Function to be triggered. 

In [None]:
# Execute pipeline with explicit parameters
execution = pipeline.start(parameters=dict(TrainingEpochs=2, AccuracyMseThreshold=0.2))

In [None]:
execution.wait()

## Visualize SageMaker Pipeline - MSE higher than the threshold
In SageMaker Studio, choose `SageMaker Components and registries` in the left pane and under `Pipelines`, click the pipeline that was created. Then all pipeline executions are shown, and the one just created should have a status of `Succeded`. Selecting that execution, the different pipeline steps can be tracked as they execute.

You can see that the `Send-Email-To-DS-Team` step was executed.

![](images/pipeline_run_1_mse_higher_than_threshold.png "Pipeline - MSE higher than the threshold")

## Clean up (optional)

#### Stop / Close the Endpoint
You should delete the endpoint before you close the notebook if you don't need to keep the endpoint running for serving real-time predictions.

In [None]:
import boto3

client = boto3.client("sagemaker")
client.delete_endpoint(EndpointName=endpoint_name)

#### Delete the model registry and the pipeline to keep the studio environment tidy.

In [None]:
def delete_model_package_group(sm_client, package_group_name):
    try:
        model_versions = sm_client.list_model_packages(ModelPackageGroupName=package_group_name)

    except Exception as e:
        print("{} \n".format(e))
        return

    for model_version in model_versions["ModelPackageSummaryList"]:
        try:
            sm_client.delete_model_package(ModelPackageName=model_version["ModelPackageArn"])
        except Exception as e:
            print("{} \n".format(e))
        time.sleep(0.5)  # Ensure requests aren't throttled

    try:
        sm_client.delete_model_package_group(ModelPackageGroupName=package_group_name)
        print("{} model package group deleted".format(package_group_name))
    except Exception as e:
        print("{} \n".format(e))
    return


def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return

In [None]:
delete_model_package_group(client, model_package_group_name)
delete_sagemaker_pipeline(client, pipeline_name)

#### Delete the Lambda functions.

In [None]:
send_email_lambda_function.delete()
deploy_model_lambda_function.delete()