In [203]:
import os
import json
import pandas as pd
import boto3
import sagemaker
import sagemaker.session
from sagemaker.model import Model
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.model_step import ModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput, TransformInput
from sagemaker.transformer import Transformer
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# from sklearn.pipeline import Pipeline

# Retrieves the current AWS region name.
region = boto3.Session().region_name
# Creates a new SageMaker session.
sagemaker_session = sagemaker.session.Session()
# Gets the IAM execution role for SageMaker.
role = sagemaker.get_execution_role()
# Retrieves the default S3 bucket for the SageMaker session.
default_bucket = sagemaker_session.default_bucket()
# Creates a new PipelineSession for working with SageMaker Pipelines.
pipeline_session = PipelineSession()
# Defines a name for a SageMaker Model Package Group, which can be used to group related model versions.
model_package_group_name = "AbaloneModelPackageGroupName"

basic_instance_type = "ml.t2.medium"
# large_instance_type = "ml.m4.xlarge"
large_instance_type = "ml.m5.xlarge"

In [204]:
default_bucket

'sagemaker-us-east-1-992382634893'

## Steps ##
Create a pipeline that includes steps for:
- Preprocessing
- Training
- Evaluation
- Condition Evaluation
- Model Registration

## 1. Download the dataset - Predicting the age of abalone from physical measurements ##

In [205]:
# Dowload the dataset - Predicting the age of abalone from physical measurements
# https://archive.ics.uci.edu/dataset/1/abalone/
# # https://archive.ics.uci.edu/static/public/1/abalone.zip

# Prepare the location to store the data
# Ensure the data directory exists
os.makedirs('data', exist_ok=True)
dataset_local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    dataset_local_path
)

In [206]:
base_uri = f"s3://{default_bucket}/aws-mlops-live"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path = dataset_local_path,
    desired_s3_uri = base_uri)
print(input_data_uri)

s3://sagemaker-us-east-1-992382634893/aws-mlops-live/abalone-dataset.csv


In [207]:
# Download second dataset for batch transform
batch_local_path = "data/abalone-dataset-batch.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    batch_local_path
)

batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path = batch_local_path,
    desired_s3_uri = base_uri)
print(batch_data_uri)


s3://sagemaker-us-east-1-992382634893/aws-mlops-live/abalone-dataset-batch.csv


## 2. Definition of Pipeline Parameters ##
Default variables for your pipeline, using SageMaker Parameters.
- processing_instance_count - The instance count of the processing job
- input_data
- batch_data
- model_approval_status - what is the model approval status during processing

In [208]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
input_data = ParameterString(name="InputData", default_value=input_data_uri)
batch_data = ParameterString(name="BatchData", default_value=batch_data_uri)

## 3. Define a processing step for feature engineering ##
This section shows how to create a processing step to prepare the data from the dataset for training.
Uses the built-in SageMaker Pipeline and Processor SDK.


In [209]:
os.makedirs('abalone', exist_ok=True)
df = pd.read_csv(dataset_local_path)
df.head()

Unnamed: 0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
0,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
1,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
2,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
3,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7
4,I,0.425,0.3,0.095,0.3515,0.141,0.0775,0.12,8


In [210]:
df.columns = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
     "shucked_weight",
    "viscera_weight",
    "shell_weight",
    "rings"
]

df.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
1,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
2,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
3,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7
4,I,0.425,0.3,0.095,0.3515,0.141,0.0775,0.12,8


In [211]:
df[df.duplicated()]

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings


In [212]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4176 entries, 0 to 4175
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   sex             4176 non-null   object 
 1   length          4176 non-null   float64
 2   diameter        4176 non-null   float64
 3   height          4176 non-null   float64
 4   whole_weight    4176 non-null   float64
 5   shucked_weight  4176 non-null   float64
 6   viscera_weight  4176 non-null   float64
 7   shell_weight    4176 non-null   float64
 8   rings           4176 non-null   int64  
dtypes: float64(7), int64(1), object(1)
memory usage: 293.8+ KB


In [213]:
df.isna().sum()

sex               0
length            0
diameter          0
height            0
whole_weight      0
shucked_weight    0
viscera_weight    0
shell_weight      0
rings             0
dtype: int64

In [214]:
df.nunique()

sex                  3
length             134
diameter           111
height              51
whole_weight      2429
shucked_weight    1515
viscera_weight     880
shell_weight       926
rings               28
dtype: int64

In [215]:
df.sex.unique()

array(['M', 'F', 'I'], dtype=object)

In [216]:
# Create an instance of an SKLearnProcessor to pass in to the processing step.
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    sagemaker_session=pipeline_session,
    role=role,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### Create a processing step. ###
This step takes in the SKLearnProcessor, the input and output channels, and the preprocessing.py script. This is very similar to a processor instance's run method in the SageMaker Python SDK. The input_data parameter passed into ProcessingStep is the input data of the step itself. This input data is used by the processor instance when it runs.

Note the  "train, "validation, and "test" named channels specified in the output configuration for the processing job. 
Step Properties such as these can be used in subsequent steps and resolve to their runtime values at runtime.

In [217]:
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
) 

step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args=processor_args
)



In [218]:
processor_args

<sagemaker.workflow.pipeline_context._StepArguments at 0x7fe934853820>

## 4. Define a training step ##
This section shows how to use the SageMaker XGBoost Algorithm to train a model on the training data output from the processing steps.

1. Specify the model path where you want to save the models from training.

In [219]:
model_path = f"s3://{default_bucket}/AbaloneTrain"

2. Configure an estimator for the XGBoost algorithm and the input dataset. The training instance type is passed into the estimator. 
A typical training script:
- loads data from the input channels
- configures training with hyperparameters
- trains a model
- saves a model to `model_dir` so that it can be hosted later

SageMaker uploads the model to Amazon S3 in the form of a `model.tar.gz` at the end of the training job.

In [220]:
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=large_instance_type
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=large_instance_type,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

3. Create a `TrainingStep` using the estimator instance and properties of the `ProcessingStep`. 
Pass in the `S3Uri` of the `"train"` and `"validation"` output channel to the `TrainingStep`. 

In [221]:
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

step_train = TrainingStep(
    name="AbaloneTrain",
    step_args = train_args
)

## 5: Define a processing step for model evaluation ##
This section shows how to create a processing step to evaluate the accuracy of the model.
The result of this model evaluation is used in the condition step to determine which run path to take.

Create an instance of a `ScriptProcessor` that is used to create a `ProcessingStep`.

In [222]:
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=large_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role,
)

Create a `ProcessingStep` using the processor instance, the input and output channels, and the `evaluation.py` script. Pass in:
- the S3ModelArtifacts property from the step_train training step
- the S3Uri of the "test" output channel of the step_process processing step

This is very similar to a processor instance's run method in the SageMaker Python SDK.

In [223]:
eval_args = script_eval.run(
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation",
                             source="/opt/ml/processing/evaluation"),
        ],
        code="abalone/evaluation.py",
    )

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## 6: Define a ModelStep for batch transformation ## 
This section shows how to create a SageMaker model from the output of the training step. This model is used for batch transformation on a new dataset. This step is passed into the condition step and only runs if the condition step evaluates to `true`.

Create a SageMaker model. Pass in the `S3ModelArtifacts` property from the `step_train` training step.

In [224]:
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [225]:
step_create_model = ModelStep(
   name="AbaloneCreateModel",
   step_args=model.create(instance_type=large_instance_type),
)

## 7: Define a TransformStep to perform batch transformation ##
This section shows how to create a `TransformStep` to perform batch transformation on a dataset after the model is trained. This step is passed into the condition step and only runs if the condition step evaluates to true.

Create a transformer instance with the appropriate compute instance type, instance count, and desired output Amazon S3 bucket URI. Pass in the `ModelName` property from the `step_create_model` `CreateModel` step.

In [226]:
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=large_instance_type,
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

Create a `TransformStep` using the transformer instance you defined and the `batch_data` pipeline parameter.

In [227]:
step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

In [228]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[basic_instance_type, large_instance_type],
    transform_instances=[large_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)



## 8: Define a RegisterModel step to create a model package ##
This section shows how to create an instance of RegisterModel. The result of running RegisterModel in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. It consists of an inference specification that defines the inference image to use along with an optional model weights location. A model package group is a collection of model packages. You can use a ModelPackageGroup for Pipelines to add a new version and model package to the group for every pipeline run. For more information about model registry, see Register and Deploy Models with Model Registry.

This step is passed into the condition step and only runs if the condition step evaluates to true.

## 9: Define a condition step to verify model accuracy ##
A `ConditionStep` allows Pipelines to support conditional running in your pipeline DAG based on the condition of step properties. In this case, you only want to register a model package if the accuracy of that model exceeds the required value. The accuracy of the model is determined by the model evaluation step. If the accuracy exceeds the required value, the pipeline also creates a SageMaker Model and runs batch transformation on a dataset. This section shows how to define the Condition step.

Define a `ConditionLessThanOrEqualTo` condition using the accuracy value found in the output of the model evaluation processing step, `step_eval`. Get this output using the property file you indexed in the processing step and the respective JSONPath of the mean squared error value, `"mse"`.

In [229]:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

Construct a `ConditionStep`. Pass the `ConditionEquals` condition in, then set the model package registration and batch transformation steps as the next steps if the condition passes.

In [230]:
step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

## 10: Create a pipeline ##
Now that you’ve created all of the steps, combine them into a pipeline!

In [231]:
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [232]:
# Examine the JSON pipeline definition to ensure that it's well-formed.

json.loads(pipeline.definition())



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-992382634893/aws-mlops-live/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-992382634893/aws-mlops-live/abalone-dataset-batch.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazona

# Run a pipeline #
After you’ve created a pipeline definition using the SageMaker Python SDK, you can submit it to SageMaker to start your execution. 

1. Submit the pipeline definition to the Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. The role passed in is used by Pipelines to create all of the jobs defined in the steps

In [233]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:992382634893:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': '9504b158-2ba6-48a0-a75b-fae17c64f16d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9504b158-2ba6-48a0-a75b-fae17c64f16d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Tue, 24 Sep 2024 21:03:40 GMT'},
  'RetryAttempts': 0}}

2. Start a pipeline execution.

In [234]:
execution = pipeline.start()

## Examine a Pipeline Execution ##
Describe the pipeline execution status to ensure that it has been created and started successfully.

In [235]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:992382634893:pipeline/AbalonePipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:992382634893:pipeline/AbalonePipeline/execution/wvofrolpmolz',
 'PipelineExecutionDisplayName': 'execution-1727211820766',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 9, 24, 21, 3, 40, 708000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 9, 24, 21, 3, 40, 708000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::992382634893:assumed-role/AmazonSageMaker-ExecutionRole-20240920T153378/SageMaker',
   'PrincipalId': 'AROA6ODU5JOG3CUHDUWHD:SageMaker'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:sts::992382634893:assumed-role/AmazonSageMaker-ExecutionRole-20240920T153378/SageMaker',
   'PrincipalId': 'AROA6ODU5JOG3CUHDUWHD:SageMaker'}},
 'ResponseMetadata': {'RequestId': '89df1085-1a4c-4f87-932d-fda727b939cf',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requesti

Wait for the execution to finish.

In [236]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

List the execution steps and their status.

In [237]:
execution.list_steps()

[{'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2024, 9, 24, 21, 3, 42, 382000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 9, 24, 21, 3, 44, 27000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': "ClientError: Failed to invoke sagemaker:CreateProcessingJob. Error Details: The account-level service limit 'ml.m5.xlarge for processing job usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please use AWS Service Quotas to request an increase for this quota. If AWS Service Quotas is not available, contact AWS support to request an increase for this quota.\nRetry not appropriate on execution of step with PipelineExecutionArn arn:aws:sagemaker:us-east-1:992382634893:pipeline/abalonepipeline/execution/wvofrolpmolz and StepId AbaloneProcess. No retry policy configured for the exception type SAGEMAKER_RESOURCE_LIMIT.",
  'Metadata': {},
  'AttemptCount': 1}]

After pipeline execution is complete, download the resulting  evaluation.json file from Amazon S3 to examine the report.

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

## Stop and Delete a Pipeline Execution ##

In [None]:
execution.stop()

In [None]:
pipeline.delete()