# SageMaker Pipelines --  California Medium House Value

**This notebook requires SageMaker version >= 2.75** 

The following notebook shows how to create an Amazon SageMaker Pipeline that builds and trains a **PipelineModel** consisting of a preprocessing SKLearn script followed by a TensorFlow model. The pipeline model is then registered to the Model Registry and deployed from there into a real-time endpoint. 


<div>
<table>
  <tr>
    <td><img src="./images/pipeline.png" width="950" height="850"/></td>
    <td><img src="./images/chmp.png" width="450" height="450"/></td>
  </tr>
</table>
</div>

Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.


In [2]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role

In [3]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()

sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

model_package_group_name = "PipelineModelPackageGroup"
prefix = "pipeline-model-example"
pipeline_name = "TrainingPipelineForModel"  # SageMaker Pipeline name

In [4]:
import logging

logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
logger.info(f'Using SageMaker version: {sagemaker.__version__}')

Using SageMaker version: 2.76.0
Using SageMaker version: 2.76.0


## Download California Housing dataset and upload to Amazon S3

We use the California housing dataset.

More info on the dataset:

This dataset was obtained from the StatLib repository. http://lib.stat.cmu.edu/datasets/

The target variable is the median house value for California districts.

This dataset was derived from the 1990 U.S. census, using one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people).

In [5]:
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [6]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz .

download: s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz to ./cal_housing.tgz


In [7]:
!tar -zxf cal_housing.tgz

In [8]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]
cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)
cal_housing_df[
    "medianHouseValue"
] /= 500000  # Scaling target down to avoid overcomplicating the example
cal_housing_df.to_csv(f"./data/raw/raw_data_all.csv", header=True, index=False)
rawdata_s3_prefix = "{}/data/raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./data/raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://sagemaker-us-west-2-976939723775/pipeline-model-example/data/raw


## Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

- ParameterString - represents a str Python type
- ParameterInteger - represents an int Python type
- ParameterFloat - represents a float Python type

In [9]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# status of newly trained model in registry
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)  # PendingManualApproval | Rejected

# processing step parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

## Define a Processing Step for Feature Engineering


The below preprocessing script, in addition to creating a scaler, contains the necessary functions for it to be deployed as part of a pipeline model. 

In [10]:
!mkdir -p code

In [11]:
%%writefile code/preprocess.py

import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

if __name__ == "__main__":
    df = pd.read_csv(f"{base_dir}/input/raw_data_all.csv")
    feature_data = df.drop(label_column, axis=1, inplace=False)
    label_data = df[label_column]
    x_train, x_test, y_train, y_test = train_test_split(feature_data, label_data, test_size=0.33)

    scaler = StandardScaler()

    scaler.fit(x_train)
    x_train = scaler.transform(x_train)
    x_test = scaler.transform(x_test)

    train_dataset = pd.concat([pd.DataFrame(x_train), y_train.reset_index(drop=True)], axis=1)
    test_dataset = pd.concat([pd.DataFrame(x_test), y_test.reset_index(drop=True)], axis=1)

    train_dataset.columns = feature_columns + [label_column]
    test_dataset.columns = feature_columns + [label_column]

    train_dataset.to_csv(f"{base_dir}/train/train.csv", header=True, index=False)
    test_dataset.to_csv(f"{base_dir}/test/test.csv", header=True, index=False)
    joblib.dump(scaler, "model.joblib")
    with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        tar_handle.add(f"model.joblib")


def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None)

        if len(df.columns) == len(feature_columns) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns + [label_column]
        elif len(df.columns) == len(feature_columns):
            # This is an unlabelled example.
            df.columns = feature_columns

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append(row)
        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))


def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features


def model_fn(model_dir):
    """Deserialize fitted model"""
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

Overwriting code/preprocess.py


In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor


sklearn_framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-housing-data-process",
    role=role,
)

In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="PreprocessData",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
)

## Define a Training Step to Train a Model

In [14]:
%%writefile code/train.py

import argparse
import numpy as np
import os
import tensorflow as tf
import pandas as pd

feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"


def parse_args():

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--batch_size", type=int, default=64)
    parser.add_argument("--learning_rate", type=float, default=0.1)

    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))

    # model directory
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))

    return parser.parse_known_args()


def get_train_data(train_dir):
    train_data = pd.read_csv(os.path.join(train_dir, "train.csv"))
    x_train = train_data[feature_columns].to_numpy()
    y_train = train_data[label_column].to_numpy()
    print("x train", x_train.shape, "y train", y_train.shape)

    return x_train, y_train


def get_test_data(test_dir):

    test_data = pd.read_csv(os.path.join(test_dir, "test.csv"))
    x_test = test_data[feature_columns].to_numpy()
    y_test = test_data[label_column].to_numpy()
    print("x test", x_test.shape, "y test", y_test.shape)

    return x_test, y_test


def get_model():

    inputs = tf.keras.Input(shape=(8,))
    hidden_1 = tf.keras.layers.Dense(8, activation="tanh")(inputs)
    hidden_2 = tf.keras.layers.Dense(4, activation="sigmoid")(hidden_1)
    outputs = tf.keras.layers.Dense(1)(hidden_2)
    return tf.keras.Model(inputs=inputs, outputs=outputs)


if __name__ == "__main__":

    args, _ = parse_args()

    print("Training data location: {}".format(args.train))
    print("Test data location: {}".format(args.test))
    x_train, y_train = get_train_data(args.train)
    x_test, y_test = get_test_data(args.test)

    batch_size = args.batch_size
    epochs = args.epochs
    learning_rate = args.learning_rate
    print(
        "batch_size = {}, epochs = {}, learning rate = {}".format(batch_size, epochs, learning_rate)
    )

    model = get_model()
    optimizer = tf.keras.optimizers.SGD(learning_rate)
    model.compile(optimizer=optimizer, loss="mse")
    model.fit(
        x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test)
    )

    # evaluate on test set
    scores = model.evaluate(x_test, y_test, batch_size, verbose=2)
    print("\nTest MSE :", scores)

    # save model
    model.save(args.sm_model_dir + "/1")

Overwriting code/train.py


In [15]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

hyperparameters = {"epochs": training_epochs}
tensorflow_version = "2.4.1"
python_version = "py37"

tf2_estimator = TensorFlow(
    source_dir="code",
    entry_point="train.py",
    instance_type=training_instance_type,
    instance_count=1,
    framework_version=tensorflow_version,
    role=role,
    base_job_name="tensorflow-train-model",
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version,
)

# Use the tf2_estimator in a Sagemaker pipelines ProcessingStep.
# NOTE how the input to the training job directly references the output of the previous step.
step_train_model = TrainingStep(
    name="TrainTensorflowModel",
    estimator=tf2_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

## Define a Model Evaluation Step to Evaluate the Trained Model


In [16]:
%%writefile code/evaluate.py

import os
import json
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile


feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"

if __name__ == "__main__":

    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall("./model")
    import tensorflow as tf

    model = tf.keras.models.load_model("./model/1")
    test_path = "/opt/ml/processing/test/"
    df = pd.read_csv(test_path + "/test.csv")
    x_test = df[feature_columns].to_numpy()
    y_test = df[label_column].to_numpy()
    scores = model.evaluate(x_test, y_test, verbose=2)
    print("\nTest MSE :", scores)

    # Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "regression_metrics": {
            "mse": {"value": scores, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluate.py


In [17]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor

tf_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version=tensorflow_version,
    image_scope="training",
    py_version="py37",
    instance_type=training_instance_type,
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=tf_eval_image_uri,
    command=["python3"],
    instance_count=1,
    instance_type=training_instance_type,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_evaluate_model = ProcessingStep(
    name="EvaluateModelPerformance",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
    property_files=[evaluation_report],
)

## Define a Register Model Step to Create a Model Package for the PipelineModel


In [18]:
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel


scaler_model_s3 = "{}/model.tar.gz".format(
    step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

scaler_model = SKLearnModel(
    model_data=scaler_model_s3,
    role=role,
    sagemaker_session=sagemaker_session,
    entry_point="code/preprocess.py",
    framework_version=sklearn_framework_version,
)


tf_model_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version=tensorflow_version,
    image_scope="inference",
    py_version="py37",
    instance_type=training_instance_type,
)

tf_model = Model(
    image_uri=tf_model_image_uri,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

pipeline_model = PipelineModel(
    models=[scaler_model, tf_model], role=role, sagemaker_session=sagemaker_session
)

In [21]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

step_register_pipeline_model = RegisterModel(
    name="PipelineModel",
    model=pipeline_model,
    #estimator=tf_model,
    #model_data=tf_model_image_uri,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

## Define a Condition Step to Check Accuracy and Conditionally Register a Model in the Model Registry

In [22]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
#from sagemaker.workflow.properties import PropertyFile

# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate_model.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=accuracy_mse_threshold,
)

# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
    name="MSE-Lower-Than-Threshold-Condition",
    conditions=[cond_lte],
    if_steps=[step_register_pipeline_model],  # step_register_model, step_register_scaler,
    else_steps=[],
)

## Define a Pipeline of Parameters, Steps, and Conditions


In [23]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        input_data,
        model_approval_status,
        training_epochs,
        accuracy_mse_threshold,
    ],
    steps=[step_process, step_train_model, step_evaluate_model, step_cond],
)

In [24]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-2-976939723775/pipeline-model-example/data/raw'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '100'},
  {'Name': 'AccuracyMseThreshold', 'Type': 'Float', 'DefaultValue': 0.75}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.Processing

## Submit the pipeline to SageMaker and start execution¶


In [25]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:976939723775:pipeline/trainingpipelineformodel',
 'ResponseMetadata': {'RequestId': '2508ecca-e02f-4a5a-bdf9-6c32a5367bda',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2508ecca-e02f-4a5a-bdf9-6c32a5367bda',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '92',
   'date': 'Mon, 21 Feb 2022 21:20:36 GMT'},
  'RetryAttempts': 0}}

In [26]:
execution = pipeline.start()

In [27]:
execution.wait()

In [28]:
execution.list_steps()

[{'StepName': 'PipelineModel',
  'StartTime': datetime.datetime(2022, 2, 21, 21, 37, 47, 116000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 21, 21, 37, 48, 381000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:976939723775:model-package/pipelinemodelpackagegroup/3'}}},
 {'StepName': 'sklearnRepackModel',
  'StartTime': datetime.datetime(2022, 2, 21, 21, 33, 29, 865000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 21, 21, 37, 46, 90000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:976939723775:training-job/pipelines-fly02m30niwb-sklearnrepackmodel-utohcdhxix'}}},
 {'StepName': 'MSE-Lower-Than-Threshold-Condition',
  'StartTime': datetime.datetime(2022, 2, 21, 21, 33, 28, 688000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 2, 21, 21, 33, 29, 305000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata

## Deploy latest approved model to a real-time endpoint

In [29]:
%%writefile utils.py
import argparse
import boto3
import logging
import os
from botocore.exceptions import ClientError
import tarfile
import zipfile

logger = logging.getLogger(__name__)
sm_client = boto3.client("sagemaker")


def get_approved_package(model_package_group_name):
    """Gets the latest approved model package for a model package group.

    Args:
        model_package_group_name: The model package group name.

    Returns:
        The SageMaker Model Package ARN.
    """
    try:
        # Get the latest approved model package
        response = sm_client.list_model_packages(
            ModelPackageGroupName=model_package_group_name,
            ModelApprovalStatus="Approved",
            SortBy="CreationTime",
            MaxResults=100,
        )
        approved_packages = response["ModelPackageSummaryList"]

        # Fetch more packages if none returned with continuation token
        while len(approved_packages) == 0 and "NextToken" in response:
            logger.debug("Getting more packages for token: {}".format(response["NextToken"]))
            response = sm_client.list_model_packages(
                ModelPackageGroupName=model_package_group_name,
                ModelApprovalStatus="Approved",
                SortBy="CreationTime",
                MaxResults=100,
                NextToken=response["NextToken"],
            )
            approved_packages.extend(response["ModelPackageSummaryList"])

        # Return error if no packages found
        if len(approved_packages) == 0:
            error_message = (
                f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
            )
            logger.error(error_message)
            raise Exception(error_message)

        # Return the pmodel package arn
        model_package_arn = approved_packages[0]["ModelPackageArn"]
        logger.info(f"Identified the latest approved model package: {model_package_arn}")
        return approved_packages[0]
        # return model_package_arn
    except ClientError as e:
        error_message = e.response["Error"]["Message"]
        logger.error(error_message)
        raise Exception(error_message)

Writing utils.py


In [30]:
from utils import get_approved_package

sm_client = boto3.client("sagemaker")

pck = get_approved_package(
    model_package_group_name
)  # Reminder: model_package_group_name was defined as "NominetAbaloneModelPackageGroupName" at the beginning of the pipeline definition
model_description = sm_client.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

model_description

{'ModelPackageGroupName': 'PipelineModelPackageGroup',
 'ModelPackageVersion': 3,
 'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:976939723775:model-package/pipelinemodelpackagegroup/3',
 'CreationTime': datetime.datetime(2022, 2, 21, 21, 37, 48, 286000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
    'ImageDigest': 'sha256:fc21b1b187c1980fa792c9bd648c34d60e8622ad0963c140a8eb60ceef2fc549',
    'ModelDataUrl': 's3://sagemaker-us-west-2-976939723775/pipelines-fly02m30niwb-sklearnRepackModel-UtoHcdHXiX/output/model.tar.gz'},
   {'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.4.1-cpu',
    'ImageDigest': 'sha256:82d4db66f767fd5dad7c2d9b44f7dc379a1f9c402dcc2f6dec8bc0d159d9d8c6',
    'ModelDataUrl': 's3://sagemaker-us-west-2-976939723775/pipeline-model-example/model/pipelines-fly02m30niwb-TrainTensorflowModel-eedvRrTdwo/output/model.tar.gz'}],
  'S

In [31]:
from sagemaker import ModelPackage

model_package_arn = model_description["ModelPackageArn"]
model = ModelPackage(
    role=role, model_package_arn=model_package_arn, sagemaker_session=sagemaker_session
)

endpoint_name = "DEMO-endpoint-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
print("EndpointName= {}".format(endpoint_name))
model.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge", endpoint_name=endpoint_name)

EndpointName= DEMO-endpoint-2022-02-21-21-59-22
-------------!

In [32]:
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name=endpoint_name)

In [33]:
data = pd.read_csv("data/raw/raw_data_all.csv")
house_values = data["medianHouseValue"]
data = data.drop("medianHouseValue", axis=1)

pred_count = 10
payload = data.iloc[:pred_count].to_string(header=False, index=False).replace("  ", ",")
p = predictor.predict(payload, initial_args={"ContentType": "text/csv"})
print(p.decode("utf-8"))

{
    "predictions": [[0.909166694], [0.994690061], [0.871307], [0.739631534], [0.543595672], [0.56353569], [0.572581291], [0.653109193], [0.439536244], [0.619721293]
    ]
}


In [34]:
blue, stop = "\033[94m", "\033[0m"
predictions = json.loads(p.decode("utf-8"))["predictions"]
for i in range(pred_count):
    print(
        f"Predicted: {blue}{predictions[i][0]}{stop} and Actual is: {blue}{house_values.iloc[i]}{stop}"
    )

Predicted: [94m0.909166694[0m and Actual is: [94m0.9052[0m
Predicted: [94m0.994690061[0m and Actual is: [94m0.7170000000000001[0m
Predicted: [94m0.871307[0m and Actual is: [94m0.7042[0m
Predicted: [94m0.739631534[0m and Actual is: [94m0.6826[0m
Predicted: [94m0.543595672[0m and Actual is: [94m0.6844[0m
Predicted: [94m0.56353569[0m and Actual is: [94m0.5394[0m
Predicted: [94m0.572581291[0m and Actual is: [94m0.5984[0m
Predicted: [94m0.653109193[0m and Actual is: [94m0.4828[0m
Predicted: [94m0.439536244[0m and Actual is: [94m0.4534[0m
Predicted: [94m0.619721293[0m and Actual is: [94m0.5222[0m


## Clean-up
Delete the resources created for this example to avoid any unintended charges

In [35]:
sm_client = boto3.client("sagemaker")

for d in sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)[
    "ModelPackageSummaryList"
]:
    print(d["ModelPackageArn"])
    sm_client.delete_model_package(ModelPackageName=d["ModelPackageArn"])

sm_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

arn:aws:sagemaker:us-west-2:976939723775:model-package/pipelinemodelpackagegroup/3
arn:aws:sagemaker:us-west-2:976939723775:model-package/pipelinemodelpackagegroup/2
arn:aws:sagemaker:us-west-2:976939723775:model-package/pipelinemodelpackagegroup/1


{'ResponseMetadata': {'RequestId': '031bb05b-406f-473b-8b83-26a8f1b29c02',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '031bb05b-406f-473b-8b83-26a8f1b29c02',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Mon, 21 Feb 2022 22:30:42 GMT'},
  'RetryAttempts': 0}}

In [36]:
predictor.delete_endpoint()

In [37]:
pipeline.delete()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:976939723775:pipeline/trainingpipelineformodel',
 'ResponseMetadata': {'RequestId': 'c96cea09-10cf-41b5-99c8-71d5518c36b4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c96cea09-10cf-41b5-99c8-71d5518c36b4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '92',
   'date': 'Mon, 21 Feb 2022 22:30:44 GMT'},
  'RetryAttempts': 0}}