# Sagemaker Pipelines SKLearn-->PyTorch Serial Batch Inference Demo End to End

This notebook demonstrates the end to end process of creating a Sagemaker Pipeline for data preprocessing with scikit-learn and model training with PyTorch, registering the pipeline model, and running serial batch inference.


## Table of Contents
1. [Configure AWS](#configure)
2. [Load Data](#data)
3. [Define Pipeline Parameters](#params)
4. [Define a `features.py` script for training and inference](#features)
5. [Define a `model.py` script for training and inference](#model)
6. [Define model creation and registration steps](#createregister)
7. [Configure Sagemaker Pipeline](#pipeline)
8. [Execute Sagemaker Pipeline to build features, train model, register artifacts, and create Sagemaker Model](#submit)
9. [Pass pipeline to Batch Transform for serial inference](#inference)
10. [Retrieve output batch inference data](#batch_inference)


In [2]:
#  !pip install -U sagemaker

### 1. Configure AWS <a name="configure"></a><a name="configure"></a>

Set up your Sagemaker Session, Sagemaker Pipeline session, roles, etc. 

In [1]:
import os
import time
import boto3
import json
import numpy as np
import pandas as pd
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
from sagemaker import get_execution_role
from botocore.exceptions import ClientError

# Load configs
with open('config.json', 'r') as file:
    config_data = json.load(file)
print("Configs:")    
print(config_data)

# Configure boto3, bucket info, and Sagemaker Sessions
sess = boto3.Session()
sm = sess.client("sagemaker")
sagemaker_session = sagemaker.Session(default_bucket = config_data['DEFAULT_BUCKET'], boto_session=sess)
pipeline_session = PipelineSession(default_bucket = config_data['DEFAULT_BUCKET'], boto_session=sess) 

# Configure region
region = sagemaker_session.boto_region_name
print(f"Region: {region}")

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()
print(f"Role: {role}")

# S3 bucket
bucket = sagemaker_session.default_bucket()
print(f"Bucket: {bucket}")

Configs:
{'DEFAULT_BUCKET': 'pytorch-serial-inference-demo', 'MODEL_PACKAGE_GROUP_NAME': 'PipelinePyTorchPackageGroup', 'PREFIX': 'pipeline-pytorch-example', 'PIPELINE_NAME': 'serial-pytorch-pipeline', 'INPUT_DATA': 's3://pytorch-serial-inference-demo/pipeline-pytorch-example/housing_data/raw', 'MODEL_APPROVAL_STATUS': 'Approved', 'PROCESSING_INSTANCE_TYPE': 'ml.m5.xlarge', 'PROCESSING_INSTANCE_COUNT': 1, 'TRAINING_INSTANCE_TYPE': 'ml.m5.xlarge', 'CREATE_MODEL_INSTANCE_TYPE': 'ml.m5.large', 'BATCH_TRANSFORM_INSTANCE_COUNT': 1, 'BATCH_TRANSFORM_INSTANCE_TYPE': 'ml.m4.xlarge'}
Region: ca-central-1
Role: arn:aws:iam::817463428454:role/service-role/AmazonSageMaker-ExecutionRole-20230919T125063
Bucket: pytorch-serial-inference-demo


In [3]:
# define prefixes and names
model_package_group_name = config_data['MODEL_PACKAGE_GROUP_NAME']
prefix = config_data['PREFIX']
pipeline_name = config_data['PIPELINE_NAME']

### 2. Load Dataset to Studio & Upload to S3 for training <a name="data"></a><a name="data"></a>

We use the California housing dataset.

More info on the dataset:
* This dataset was obtained from the StatLib repository. http://lib.stat.cmu.edu/datasets/
* The target variable is the median house value for California districts.
* This dataset was derived from the 1990 U.S. census, using one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people).

In [4]:
data_dir = os.path.join(os.getcwd(), "housing_data")
os.makedirs(data_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), "housing_data/raw")
os.makedirs(raw_dir, exist_ok=True)

In [5]:
s3 = boto3.client("s3")
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/california_housing/cal_housing.tgz",
    "cal_housing.tgz",
)

In [6]:
!tar -zxf cal_housing.tgz

tar: CaliforniaHousing/cal_housing.data: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: CaliforniaHousing/cal_housing.domain: Cannot change ownership to uid 10017, gid 166: Operation not permitted
tar: Exiting with failure status due to previous errors


In [7]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
]
cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)
cal_housing_df[
    "medianHouseValue"
] /= 500000  # Scaling target down to avoid overcomplicating the example
cal_housing_df.to_csv(f"./housing_data/raw/raw_data_all.csv", header=True, index=False)
rawdata_s3_prefix = "{}/housing_data/raw".format(prefix)
raw_s3 = sagemaker_session.upload_data(path="./housing_data/raw/", key_prefix=rawdata_s3_prefix)
print(raw_s3)

s3://pytorch-serial-inference-demo/pipeline-pytorch-example/housing_data/raw


### 3. Define Parameters to Parametrize Pipeline Execution <a name="params"></a><a name="params"></a>

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

- ParameterString - represents a str Python type
- ParameterInteger - represents an int Python type
- ParameterFloat - represents a float Python type

In [8]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# raw input data
input_data = ParameterString(name='InputData', default_value=config_data['INPUT_DATA'])

# status of newly trained model in registry
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value=config_data['MODEL_APPROVAL_STATUS'])

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value=config_data['PROCESSING_INSTANCE_TYPE']
)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=config_data['PROCESSING_INSTANCE_COUNT'])

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value=config_data['TRAINING_INSTANCE_TYPE'])

# create model step parameters 
create_model_instance_type = ParameterString(name="CreateModelInstanceType", default_value=config_data["CREATE_MODEL_INSTANCE_TYPE"])

### 4. Feature Build and Inference Script<a name="features"></a><a name="features"></a>

Define a Sagemaker processing job for feature engineering, utilizing a scikit-learn StandardScaler(). Save the scaler as a feature artifact during training, and deserialize it for custom inference transformations.

#### Script Structure

Inside the main guard (`if name == __main__`), provide training code with arguments aligned to Sagemaker Processing Job documentation.

Outside the main guard, define four inference functions as expected by Sagemaker:
* `input_fn`: reads input data from the relative directory passed into the feature container
* `model_fn`: deserializes the tar.gz artifact from the model registry, containing pretrained feature artifact(s)
* `predict_fn`: computes the data transformation step for inference data
* `output_fn`: sends transformed data to the model step container as JSON

Refer to the Sagemaker Python SDK documentation for details. If no custom inference functions are provided, the default Sagemaker inference handler will run.

`features.py` is the entry point for data preprocessing functions.


In [9]:
!mkdir -p code

In [10]:
 %%writefile code/features.py

import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

feature_columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
]
label_column = "medianHouseValue"

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

# feature build logic 
if __name__ == "__main__":
    df = pd.read_csv(f"{base_dir}/input/raw_data_all.csv")
    x_train, x_test, y_train, y_test = train_test_split(df[feature_columns], df[label_column], test_size=0.33)

    scaler = StandardScaler()
    scaler.fit(x_train.values) # fit scaler without the feature names
    x_train[feature_columns] = scaler.transform(x_train) 

    train_dataset = pd.concat([x_train, y_train], axis=1) 
    test_dataset = pd.concat([x_test, y_test], axis=1)
    
    train_dataset.to_csv(f"{base_dir}/train/train.csv", header=None, index=None) 
    test_dataset.to_csv(f"{base_dir}/test/test.csv", header=None, index=None)
    
    # save feature artifact for inference
    joblib.dump(scaler, "model.joblib")
    with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
        tar_handle.add(f"model.joblib")

# inference functions
def input_fn(input_data, content_type):
    """Parse input data payload
    """
    print("Entering preprocessing input fn.")
    if content_type == "text/csv":
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None) 
        
        # If labelled, drop before inference
        if len(df.columns) == len(feature_columns) + 1:
            df.columns = feature_columns + [label_column]
            df=df.drop(columns = label_column)
            
        # If unlabelled, continue
        elif len(df.columns) == len(feature_columns):
            df.columns = feature_columns
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))
        
def model_fn(model_dir):
    """Deserialize fitted model"""
    print("Entering preprocessing model fn.")
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

def predict_fn(input_data, model):
    """Apply feature transform to data
    """
    print("Entering preprocessing predict fn.")
    features = model.transform(input_data.values) 
    return features

def output_fn(prediction, accept):
    """Format prediction output
    The default accept/content-type between containers for serial inference is JSON.
    """
    print("Entering preprocessing output fn.")
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append(row)
        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))

Overwriting code/features.py


Defining preprocessing pipeline step parameters:

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn

sklearn_framework_version = "1.2-1"

sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="pytorch-housing-data-process",
    role=role,
    sagemaker_session=pipeline_session, 
    code_location=f"s3://{bucket}/{prefix}/processing"
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model", destination = f"s3://{bucket}/{prefix}/processing"), 
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination = f"s3://{bucket}/{prefix}/train"), 
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination = f"s3://{bucket}/{prefix}/test"), 
    ],
    code="code/features.py",
)

instance_type is a PipelineVariable (<class 'sagemaker.workflow.parameters.ParameterString'>). Its interpreted value in execution time should not be of GPU types since GPU training is not supported for Scikit-Learn.
The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


Wrap feature script in a Sagemaker Pipelines ProcessingStep

In [12]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="30d")

step_process = ProcessingStep(
    name="PreprocessData",
    step_args=processor_args,
    cache_config=cache_config,
)

### 5. Model Training and Inference Script <a name="model"></a><a name="model"></a>

Demonstrates PyTorch model training and artifact registration. In custom inference functions, deserialize the model for prediction computation.

#### Script Structure

Inside the main guard (`if name == __main__`), provide training code with arguments for Sagemaker Training Job.

Outside the main guard, define Sagemaker's expected inference functions:
* `input_fn`: reads preprocessed input data from the feature step via the relative directory passed into the model step container
* `model_fn`: deserializes the tar.gz artifact from the model registry containing any pretrained model artifacts
* `predict_fn`: computes model inference predictions
* (optional) `output_fn`: can configure additional custom handling of output predictions
  
Refer to the Sagemaker Python SDK documentation for model-specific inferencing details. If no custom inferencing functions are provided, the default Sagemaker inference handler will run.

In [13]:
%%writefile code/model.py

import argparse
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import json
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler


class HousingDataset:
    def __init__(self, train_dir):
        self.train_dir = train_dir

    def load_data(self):
        train_dataset = pd.read_csv(os.path.join(self.train_dir, "train.csv"))

        np_inputs = train_dataset.iloc[:, :-1].to_numpy()
        inputs = torch.tensor(np_inputs).float()

        np_targets = train_dataset.iloc[:, -1].to_numpy()
        targets = torch.tensor(np_targets).float()

        print("x train shape:", np_inputs.shape, "y train shape:", np_targets.shape)

        return TensorDataset(inputs, targets), np_inputs.shape, np_targets.shape


class LinearRegressionModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(LinearRegressionModel, self).__init__()
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x):
        return self.linear(x)


class Trainer:
    def __init__(self, model, criterion, optimizer, device):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device

    def train(self, train_loader, epochs):
        for epoch in range(epochs):
            self.model.train()
            for xb, yb in train_loader:
                xb, yb = xb.to(self.device), yb.to(self.device)
                self.optimizer.zero_grad()
                pred = self.model(xb)
                loss = self.criterion(pred, yb)
                loss.backward()
                self.optimizer.step()

                if (epoch + 1) % 10 == 0:
                    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, epochs, loss.item()))

    def save_model(self, model_dir):
        path = os.path.join(model_dir, "model.pth")
        torch.save(self.model.cpu().state_dict(), path)


class Predictor:
    def __init__(self, model, device):
        self.model = model
        self.device = device

    def predict(self, input_object):
        predictions = []
        with torch.no_grad():
            for xb in input_object:
                outputs = self.model(xb[0].to(self.device))
                predictions.extend(outputs.tolist())
        return predictions


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--batch_size", type=int, default=64)
    parser.add_argument("--learning_rate", type=float, default=0.1)
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    return parser.parse_known_args()


def get_model(input_size, output_size, device):
    model = LinearRegressionModel(input_size, output_size).to(device)
    return model

# Model Training
if __name__ == "__main__":
    
    # Passing in environment variables and hyperparameters for our training script
    args, _ = parse_args()
    print("Training data location: {}".format(args.train))
    
    # Hyperparamaters
    batch_size = args.batch_size
    epochs = args.epochs
    learning_rate = args.learning_rate

    print(
        "batch_size = {}, epochs = {}, learning rate = {}".format(batch_size, epochs, learning_rate)
    )
    
    # Reading in data
    housing_dataset = HousingDataset(args.train)
    train_dataset, input_shape, targets_shape = housing_dataset.load_data()
    print("train dataset: ", train_dataset)
    print("input shape: ", input_shape)
    print("targets_shape: ", targets_shape)
    
    train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Model Building
    model = get_model(input_size=input_shape[1], output_size=1, device=device)
    criterion = nn.MSELoss().to(device)
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    trainer = Trainer(model, criterion, optimizer, device)
    trainer.train(train_loader, epochs)
    
    # save model artifact for inference
    trainer.save_model(args.sm_model_dir)
    
# inference functions
def reformat_inference_data(test_dataset):
    """Apply tensor structure and DataLoaders
    """
    inputs = torch.tensor(test_dataset).float()
    reformatted_inf_dataset = TensorDataset(inputs)
    reformatted_inf_dataloader = DataLoader(reformatted_inf_dataset, batch_size=64)
    return reformatted_inf_dataloader


def input_fn(request_body, request_content_type):
    """Parse input data payload
    """
    print("Entering model input_fn.")
    if request_content_type == "application/json":
        request_body = json.loads(request_body)
        inpVar = request_body["instances"]
        inference_data = reformat_inference_data(inpVar)
        return inference_data
    else:
        raise ValueError("This model only supports application/json input")

def model_fn(model_dir):
    """Deserialize fitted model
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = get_model(8, 1, device)
    with open(os.path.join(model_dir, "model.pth"), "rb") as f:
        model.load_state_dict(torch.load(f))
    return model.to(device)


def predict_fn(input_object, model):
    """Apply feature transform to data
    """
    print("Entering predict_fn")
    predictor = Predictor(model, torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    return predictor.predict(input_object)


Overwriting code/model.py


In [14]:
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

pytorch_framework_version = "1.8.0"
model_path = f"s3://{bucket}/{prefix}/pytorchmodel/"

training_pytorch_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version=pytorch_framework_version,
    py_version="py3",
    image_scope='training',
    instance_type=config_data["TRAINING_INSTANCE_TYPE"],
    
)

estimator = PyTorch(
    sagemaker_session=pipeline_session,
    entry_point="code/model.py",
    role=role,
    output_path=model_path,
    py_version="py3",
    image_uri=training_pytorch_image_uri,
    instance_count=1,
    instance_type=config_data["TRAINING_INSTANCE_TYPE"],
    hyperparameters={"epochs": 5},
)


train_args = estimator.fit(
    {
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        )
    }
)

step_train_model = TrainingStep(name="TrainPyTorchModel", step_args=train_args, cache_config=cache_config)

### 6. Define a model creation step <a name="createregister"></a><a name="createregister"></a>

In [15]:
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel
from sagemaker import image_uris
from sagemaker.pytorch import PyTorchModel


scaler_model_s3 = "{}/model.tar.gz".format(
    step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

scaler_model = SKLearnModel(
    model_data=scaler_model_s3,
    role=role,
    sagemaker_session=pipeline_session,
    entry_point="code/features.py",
    framework_version=sklearn_framework_version,
)

inference_pytorch_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version=pytorch_framework_version,
    py_version="py3",
    image_scope='inference',
    instance_type=config_data["BATCH_TRANSFORM_INSTANCE_TYPE"],
)

pytorch_model = PyTorchModel(
    image_uri = inference_pytorch_image_uri,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    entry_point="model.py",
    source_dir="code",
    role=role,
)

pipeline_model = PipelineModel(
    models=[scaler_model, pytorch_model], role=role, sagemaker_session=pipeline_session
)

instance_type is a PipelineVariable (<class 'sagemaker.workflow.parameters.ParameterString'>). Its interpreted value in execution time should not be of GPU types since GPU training is not supported for Scikit-Learn.


Using provided s3_resource


In [16]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="PyTorchPipelineModelCreation",
    step_args=pipeline_model.create(instance_type=create_model_instance_type),
)



### Define a Model Registration Step

In [17]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

register_args = pipeline_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[processing_instance_type, training_instance_type],
    transform_instances=[config_data["BATCH_TRANSFORM_INSTANCE_TYPE"]],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
)

step_register_pipeline_model = ModelStep(
    name="PyTorchPipelineModelRegistration",
    step_args=register_args,
)



### 7. Define a Sagemaker Pipeline<a name="pipeline"></a><a name="pipeline"></a>

Wrap the feature building and model building for training and inference in a Sagemaker Pipeline.

In [18]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    sagemaker_session=sagemaker_session,
    parameters=[
        training_instance_type,
        processing_instance_type,
        processing_instance_count,
        input_data,
        model_approval_status,
    ],
    
    steps = [step_process, step_train_model, step_create_model, step_register_pipeline_model]
)

Display Pipeline definition:

In [19]:
import json

definition = json.loads(pipeline.definition())
definition


INFO:sagemaker.processing:Uploaded None to s3://pytorch-serial-inference-demo/pipeline-pytorch-example/processing/serial-pytorch-pipeline/code/bf5f270b1960b4061b98c1625e4aa582/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pytorch-serial-inference-demo/serial-pytorch-pipeline/code/ece5752bed68960c30544c25e83cd9ee/runproc.sh


Using provided s3_resource
Using provided s3_resource
Using provided s3_resource




Using provided s3_resource


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://pytorch-serial-inference-demo/pipeline-pytorch-example/housing_data/raw'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification'

### 8. Submit the pipeline and start execution <a name="submit"></a><a name="submit"></a>

Running steps to upsert the `role_arn` and start the [pipeline execution](https://docs.aws.amazon.com/sagemaker/latest/dg/run-pipeline.html) will kick-off training.

In [20]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()
print("------- done -------")



Using provided s3_resource


INFO:sagemaker.processing:Uploaded None to s3://pytorch-serial-inference-demo/pipeline-pytorch-example/processing/serial-pytorch-pipeline/code/bf5f270b1960b4061b98c1625e4aa582/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pytorch-serial-inference-demo/serial-pytorch-pipeline/code/ece5752bed68960c30544c25e83cd9ee/runproc.sh


Using provided s3_resource
Using provided s3_resource
Using provided s3_resource


INFO:sagemaker.processing:Uploaded None to s3://pytorch-serial-inference-demo/pipeline-pytorch-example/processing/serial-pytorch-pipeline/code/bf5f270b1960b4061b98c1625e4aa582/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pytorch-serial-inference-demo/serial-pytorch-pipeline/code/ece5752bed68960c30544c25e83cd9ee/runproc.sh


Using provided s3_resource
Using provided s3_resource
Using provided s3_resource
Using provided s3_resource
------- done -------


Get name of the latest model.

In [23]:
sm_model_name = sm.list_models(NameContains='PyTorch')['Models'][0]['ModelName']

print(sm_model_name)

pipelines-efdkwygjfuvn-PyTorchPipelineModel-kLvjP2vy25


List model registry information:

In [24]:
from utils import get_approved_package

pck = get_approved_package(model_package_group_name, sm) 
model_description = sm.describe_model_package(ModelPackageName=pck["ModelPackageArn"])

print(model_description)

INFO:utils:Identified the latest approved model package: arn:aws:sagemaker:ca-central-1:817463428454:model-package/PipelinePyTorchPackageGroup/26


{'ModelPackageGroupName': 'PipelinePyTorchPackageGroup', 'ModelPackageVersion': 26, 'ModelPackageArn': 'arn:aws:sagemaker:ca-central-1:817463428454:model-package/PipelinePyTorchPackageGroup/26', 'CreationTime': datetime.datetime(2023, 12, 15, 21, 31, 58, 964000, tzinfo=tzlocal()), 'InferenceSpecification': {'Containers': [{'Image': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3', 'ImageDigest': 'sha256:9b43ef4706faae38d10bdff012a0d1b35ed9c5b3aac9e60c960170f10d29fa51', 'ModelDataUrl': 's3://pytorch-serial-inference-demo/pipeline-pytorch-example/processing/model.tar.gz', 'Environment': {'SAGEMAKER_CONTAINER_LOG_LEVEL': '20', 'SAGEMAKER_PROGRAM': 'features.py', 'SAGEMAKER_REGION': 'ca-central-1', 'SAGEMAKER_SUBMIT_DIRECTORY': 's3://pytorch-serial-inference-demo/sagemaker-scikit-learn-2023-12-15-21-29-09-220/sourcedir.tar.gz'}}, {'Image': '763104351884.dkr.ecr.ca-central-1.amazonaws.com/pytorch-inference:1.8.0-cpu-py3', 'ImageDigest': 'sha256:7c4c7ea4

### 9. Run serial batch inference job <a name="inference"></a>
After the pipeline has finished executing, lookup the Sagemaker Model Name and pass it to a Sagemaker Batch Transformation job for inference, along with a raw test dataset.


In [25]:
import sagemaker
input_data_path = "s3://{}/{}".format(sagemaker_session.default_bucket(), f"{prefix}/test/test.csv") 
output_data_path = "s3://{}/{}".format(sagemaker_session.default_bucket(), f"{prefix}/batch-transform/output")

transform_job = sagemaker.transformer.Transformer(
    model_name = sm_model_name,
    instance_count = config_data["BATCH_TRANSFORM_INSTANCE_COUNT"],
    instance_type = config_data["BATCH_TRANSFORM_INSTANCE_TYPE"],
    strategy = 'MultiRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='inference-pipelines-batch',
    sagemaker_session=sagemaker_session,
    accept = 'text/csv') 

transform_job.transform(data = input_data_path, 
                        content_type = 'text/csv', 
                        split_type = 'Line',
                        join_source = 'Input')

INFO:sagemaker:Creating transform job with name: inference-pipelines-batch-2023-12-15-21-33-34-489


...........................................[34m2023-12-15 21:40:45,105 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2023-12-15 21:40:45,108 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2023-12-15 21:40:45,109 INFO - sagemaker-containers - nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[35m2023-12-15 21:40:45,105 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[35m2023-12-15 21:40:45,108 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[35m2023-12-15 21:40:45,109 INFO - sagemaker-containers - nginx config: [0m
[35mworker_processes auto;[0m
[35mdaemon off;[0m
[35mpid /tmp/nginx.pid;[0m
[35merror_log  /dev/stderr;[0m
[35mworker_rlimit_nofile 4096;[0m
[3

### 10. Retrieve output batch inference data<a name="batch_inference"></a>

After the batch transform job has completed, you can download and read inference predictions for future use.

In [26]:
s3 = boto3.client('s3')

FILE_NAME = "inference_results.csv"
BUCKET_NAME = bucket
OBJECT_NAME = f"{prefix}/batch-transform/output/test.csv.out"

s3.download_file(BUCKET_NAME, OBJECT_NAME, FILE_NAME)

In [27]:
columns = [
    "longitude",
    "latitude",
    "housingMedianAge",
    "totalRooms",
    "totalBedrooms",
    "population",
    "households",
    "medianIncome",
    "medianHouseValue",
    "predictedTarget"
]

inf_results = pd.read_csv("inference_results.csv", names=columns)

display(inf_results)

Unnamed: 0,longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue,predictedTarget
0,-122.27,37.84,52.0,2436.0,541.0,1015.0,478.0,1.7250,0.227800,0.418929
1,-118.36,34.19,11.0,2921.0,685.0,1512.0,664.0,4.1445,0.352800,0.383919
2,-122.39,37.60,44.0,2304.0,384.0,986.0,379.0,4.6520,0.774200,0.423604
3,-121.30,38.60,32.0,9534.0,1819.0,4951.0,1710.0,3.3926,0.206800,0.406280
4,-122.23,37.83,52.0,2990.0,379.0,947.0,361.0,7.8772,1.000002,0.428225
...,...,...,...,...,...,...,...,...,...,...
6807,-118.48,34.02,11.0,72.0,16.0,150.0,20.0,2.6250,0.500000,0.385648
6808,-117.39,34.10,12.0,7184.0,1516.0,4862.0,1235.0,2.4492,0.207600,0.367412
6809,-121.38,38.59,38.0,1839.0,287.0,685.0,276.0,4.5313,0.378800,0.421584
6810,-118.43,34.22,36.0,1372.0,295.0,774.0,306.0,3.6618,0.374600,0.385341
