# SageMaker Pipelines California Housing - Taking different steps based on model performance

This notebook illustrates how to take different actions based on model performance in a SageMaker Pipeline.

The steps in this pipeline include:
* Preprocessing the California Housing dataset.
* Train a TensorFlow2 Artificial Neural Network (ANN) Model.
* Creating a Transform Job to run batch inference on the dataset

In [1]:
import sys

!{sys.executable} -m pip install "sagemaker>=2.51.0"

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [2]:
import os
import time
import boto3
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import sagemaker
from sagemaker import get_execution_role

In [3]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name
model_package_group_name = "TF2-California-Housing"  # Model name in model registry
prefix = "tf2-california-housing-pipelines"
pipeline_name = "TF2CaliforniaHousingPipeline"  # SageMaker Pipeline name
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

## Download California Housing dataset and upload to Amazon S3

We use the California housing dataset.

More info on the dataset:

This dataset was obtained from the `StatLib` repository. http://lib.stat.cmu.edu/datasets/

The target variable is the median house value for California districts.

This dataset was derived from the 1990 U.S. census, using one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people).

In [4]:
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [5]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz .

download: s3://sagemaker-sample-files/datasets/tabular/california_housing/cal_housing.tgz to ./cal_housing.tgz


In [6]:
!tar -zxf cal_housing.tgz

tar: CaliforniaHousing/cal_housing.data: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: CaliforniaHousing/cal_housing.domain: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: Exiting with failure status due to previous errors


In [7]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]
cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)

In [8]:
cal_housing_df.head()

Unnamed: 0,longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0


In [9]:
X = cal_housing_df[
    [
        "longitude",
        "latitude",
        "housingMedianAge",
        "totalRooms",
        "totalBedrooms",
        "population",
        "households",
        "medianIncome",
    ]
]
Y = cal_housing_df[["medianHouseValue"]] / 100000

x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.33)

np.save(os.path.join(raw_dir, "x_train.npy"), x_train)
np.save(os.path.join(raw_dir, "x_test.npy"), x_test)
np.save(os.path.join(raw_dir, "y_train.npy"), y_train)
np.save(os.path.join(raw_dir, "y_test.npy"), y_test)
rawdata_s3_prefix = "{}/data/raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./data/raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://sagemaker-us-west-1-648739860567/tf2-california-housing-pipelines/data/raw


In [10]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.large")
training_epochs = ParameterString(name="TrainingEpochs", default_value="1")


# Transformer step parameters
transformer_instance_type = ParameterString(name="TransformInstanceType", default_value="ml.m5.large")
transformer_instance_count = ParameterInteger(name="TransformInstanceCount", default_value=2)
max_payload_in_mb = ParameterInteger(name="MaxPayloadMB", default_value=2)
output_data_path = ParameterString(name="OutputDataS3Path",default_value="s3://{}/ca-housing-batch-infer".format(bucket))
concurrency = ParameterInteger(name="MaxConcurrentRequests",default_value=4)


## Processing Step 

The first step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object similar to the one above, but now parameterized, so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.

In [11]:
%%writefile preprocess.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__ == "__main__":

    input_files = glob.glob("{}/*.npy".format("/opt/ml/processing/input"))
    print("\nINPUT FILE LIST: \n{}\n".format(input_files))
    scaler = StandardScaler()
    x_train = np.load(os.path.join("/opt/ml/processing/input", "x_train.npy"))
    scaler.fit(x_train)
    for file in input_files:
        raw = np.load(file)
        # only transform feature columns
        if "y_" not in file:
            transformed = scaler.transform(raw)
        if "train" in file:
            if "y_" in file:
                output_path = os.path.join("/opt/ml/processing/train", "y_train.npy")
                np.save(output_path, raw)
                print("SAVED LABEL TRAINING DATA FILE\n")
            else:
                output_path = os.path.join("/opt/ml/processing/train", "x_train.npy")
                np.save(output_path, transformed)
                print("SAVED TRANSFORMED TRAINING DATA FILE\n")
        else:
            if "y_" in file:
                output_path = os.path.join("/opt/ml/processing/test", "y_test.npy")
#                 output_path_csv = os.path.join("/opt/ml/processing/testcsv", "y_test.csv")
#                 np.savetxt(output_path_csv, raw, delimiter=",")
                np.save(output_path, raw)
                print("SAVED LABEL TEST DATA FILE\n")
            else:
                output_path = os.path.join("/opt/ml/processing/test", "x_test.npy")
                output_path_csv = os.path.join("/opt/ml/processing/testcsv", "x_test.csv")
                np.savetxt(output_path_csv, transformed, delimiter=",")
                np.save(output_path, transformed)
                print("SAVED TRANSFORMED TEST DATA FILE\n")

Writing preprocess.py


In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "0.23-1"

# Create SKlearn processor object,
# The object contains information about what instance type to use, the IAM role to use etc.
# A managed processor comes with a preconfigured container, so only specifying version is required.
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    role=role,
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="tf2-california-housing-processing-job",
)

# Use the sklearn_processor in a Sagemaker pipelines ProcessingStep
step_preprocess_data = ProcessingStep(
    name="Preprocess-California-Housing-Data",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="testcsv",source="/opt/ml/processing/testcsv"),
    ],
    code="preprocess.py",
)

## Train model step
In the second step, the train and validation output from the precious processing step are used to train a model. 

In [13]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"

hyperparameters = {"epochs": training_epochs}
tensorflow_version = "2.4.1"
python_version = "py37"

tf2_estimator = TensorFlow(
    source_dir="code",
    entry_point="train.py",
    instance_type=training_instance_type,
    instance_count=1,
    framework_version=tensorflow_version,
    role=role,
    base_job_name="tf2-california-housing-train",
    output_path=model_path,
    hyperparameters=hyperparameters,
    py_version=python_version,
)

# Use the tf2_estimator in a Sagemaker pipelines ProcessingStep.
# NOTE how the input to the training job directly references the output of the previous step.
step_train_model = TrainingStep(
    name="Train-California-Housing-Model",
    estimator=tf2_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

## Create the model

The model is created and the name of the model is provided to the Lambda function for deployment. The `CreateModelStep` dynamically assigns a name to the model.

In [14]:
from sagemaker.workflow.step_collections import CreateModelStep
from sagemaker.tensorflow.model import TensorFlowModel

model = TensorFlowModel(
    role=role,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    framework_version=tensorflow_version,
    sagemaker_session=sagemaker_session,
)

step_create_model = CreateModelStep(
    name="Create-California-Housing-Model",
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type=transformer_instance_type),
)

## Batch Transformer Step

The model can be either deployed for real time inference or set up to be run on batches of data with a transform job. Creating a `Transformer` from a sagemaker model creates a transformer which can be used to perform batch inference.

When creating the transformer, the output defaults to the sagemaker defualt bucket. It can be specified with `output_path` to save to a more desirable location. The other relevant parameters are `instance_count` and `instance_type`, which dictate the number and size of instance that will run the transform job, `max_concurrent_transforms`, which determines how many HTTP requests can be made to each transform container at a time, and `max_payload`, which determines how many megabytes can be sent to a transformer at once (max 4).

The transformer can then be passed to the TransformStep, which enables the pipeline to create it.

In [15]:
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_count=transformer_instance_count,
    instance_type=transformer_instance_type,
    max_concurrent_transforms=concurrency,
    max_payload=max_payload_in_mb,
    output_path=output_data_path,
)

step_batch_transform = TransformStep(
    name="Create-Housing-Transformer",
    transformer=transformer,
    inputs=
        sagemaker.inputs.TransformInput(
            data=step_preprocess_data.properties.ProcessingOutputConfig.Outputs[
                "testcsv"
            ].S3Output.S3Uri, # Use the same data from S3 as before
            data_type='S3Prefix',
            content_type='text/csv'
        )
    
)

## Pipeline Creation: Orchestrate all steps

Now that all pipeline steps are created, a pipeline is created.

In [16]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        input_data,
        training_epochs,
        transformer_instance_type,
        transformer_instance_count,
        max_payload_in_mb,
        output_data_path,
        concurrency,
    ],
    steps=[step_preprocess_data, step_train_model, step_create_model, step_batch_transform],
)

## Execute the Pipeline

### List the execution steps to check out the status and artifacts:

In [17]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-648739860567/tf2-california-housing-pipelines/data/raw'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '1'},
  {'Name': 'TransformInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'TransformInstanceCount', 'Type': 'Integer', 'DefaultValue': 2},
  {'Name': 'MaxPayloadMB', 'Type': 'Integer', 'DefaultValue': 2},
  {'Name': 'OutputDataS3Path',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-648739860567/ca-housing-batch-infer'},
  {'Name': 'MaxConcurrentRequests', 'Type': 'Integer', 'DefaultValue': 4}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName'

### Submit pipeline

In [18]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-west-1:648739860567:pipeline/tf2californiahousingpipeline',
 'ResponseMetadata': {'RequestId': '504a15b1-a7e8-403b-a6ca-de6c03cbe1e7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '504a15b1-a7e8-403b-a6ca-de6c03cbe1e7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '96',
   'date': 'Fri, 03 Jun 2022 17:57:24 GMT'},
  'RetryAttempts': 0}}

### Execute pipeline using the default parameters

In [19]:
execution = pipeline.start()

### Wait for pipeline to complete

In [None]:
execution.wait()

## Visualize SageMaker Pipeline
In SageMaker Studio, choose `SageMaker Components and registries` in the left pane and under `Pipelines`, click the pipeline that was created. Then all pipeline executions are shown, and the one just created should have a status of `Succeded`. Selecting that execution, the different pipeline steps can be tracked as they execute.

![](images/pipeline.png)

## Clean up (optional)

#### Delete the pipeline to keep the studio environment tidy.

In [None]:
def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return

In [None]:
delete_sagemaker_pipeline(client, pipeline_name)