# The Pipeline Process follows AWS Developer Guide 
https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html#define-pipeline-create

In [27]:
import boto3
import sagemaker
import sagemaker.session

# get region, session, and role 
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
# creates default s3 bucket
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

# Step 1 Download Datasets

In [28]:
# creates a folder 
!mkdir -p data

In [29]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
# download data from a s3 bucket to local sagemaker
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

In [30]:
base_uri = f"s3://{default_bucket}/abalone"
# upload downloaded local data to s3 default bucket
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-west-1-861237447287/abalone/abalone-dataset.csv


In [31]:
# Download a second dataset for batch transformation after your model is created.
local_path = "data/abalone-dataset-batch.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-west-1-861237447287/abalone/abalone-dataset-batch.csv


# Step 2 Define Pipeline Parameters

This code block defines the following parameters for your pipeline:

- processing_instance_count – The instance count of the processing job.

- input_data – The Amazon S3 location of the input data.

- batch_data – The Amazon S3 location of the input data for batch transformation.

- model_approval_status – The approval status to register the trained model with for CI/CD. For more information, see Automate MLOps with SageMaker Projects.

In [32]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval" # this will prevent model to be deployed automatically
)

# Step 3: Define a Processing Step for Feature Engineering

This section shows how to create a processing step to prepare the data from the dataset for training.

### 3.1 create a py file for data processing

In [33]:
!mkdir -p abalone

In [34]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Because this is a headerless CSV file, specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting abalone/preprocessing.py


### 3.2 Create an instance of an SKLearnProcessor to pass in to the processing step.

In [35]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

### 3.3 Create a processing step. 

This step takes in the SKLearnProcessor, the input and output channels, and the preprocessing.py script that you created. This is very similar to a processor instance's run method in the SageMaker Python SDK. The input_data parameter passed into ProcessingStep is the input data of the step itself. This input data is used by the processor instance when it runs.

Note the  "train, "validation, and "test" named channels specified in the output configuration for the processing job. Step Properties such as these can be used in subsequent steps and resolve to their runtime values at execution.

In [36]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# the input_data is a define ParameterString located at input_data_uri above. 
# a processing instance sklearn_processor needs to download input_data to a specific location in the processing container to run the code. source is the location of data and destination is where the data should be downloaded to for processing container
# For both ProcessingInput and ProcessingOutput, the path in the processing container must begin with /opt/ml/processing/
# ProcessingOutput data can also be saved to s3 bucket using destination
# for more details: https://docs.aws.amazon.com/sagemaker/latest/dg/build-your-own-processing-container.html
step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor, 
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

# Step 4: Define a Training step

Configure an estimator for the XGBoost algorithm and the input dataset. The training instance type is passed into the estimator. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. SageMaker uploads the model to Amazon S3 in the form of a model.tar.gz at the end of the training job.

In [37]:
# Specify the model path where you want to save the models from training.
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [38]:
from sagemaker.estimator import Estimator

# get container image uri
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path, # S3 location for saving the training result (model artifacts and output files)
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

### 4.2 Create a TrainingStep using the estimator instance and properties of the ProcessingStep. 

In particular, pass in the S3Uri of the "train" and "validation" output channel to the TrainingStep. 

In [39]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    # there is no outpus parameter in TrainingStep
)

# Step 5: Define a Processing Step for Model Evaluation

This section shows how to create a processing step to evaluate the accuracy of the model. The result of this model evaluation is used in the condition step to determine which execute path to take.

In [40]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost


from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz" # location of model saved automatically in step_train
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing abalone/evaluation.py


### 5.2 Create an instance of a ScriptProcessor that is used to create a ProcessingStep.

In [41]:
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

Create a ProcessingStep using the processor instance, the input and output channels, and the  evaluation.py script. In particular, pass in the S3ModelArtifacts property from the step_train training step, as well as the S3Uri of the "test" output channel of the step_process processing step. This is very similar to a processor instance's run method in the SageMaker Python SDK. 

In [43]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

# Step 6: Define a CreateModelStep for Batch Transformation

Model Artifacts are created and saved in model_path in TrainingStep. But Model object is not created

Need a CreateModelStep to create model object

Estimators: Encapsulate training on SageMaker. Models: Encapsulate built ML models

In [44]:
# Create a SageMaker model. Pass in the S3ModelArtifacts property from the step_train training step.
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [45]:
# Define the model input for your SageMaker model.
from sagemaker.inputs import CreateModelInput

inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

In [46]:
# Create your CreateModelStep using the CreateModelInput and SageMaker model instance you defined.
from sagemaker.workflow.steps import CreateModelStep

step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

# Step 7: Define a TransformStep to Perform Batch Transformation

This section shows how to create a TransformStep to perform batch transformation on a dataset after the model is trained. 

This step is passed into the condition step and only executes if the condition step evaluates to true.

In [47]:
# Create a transformer instance with the appropriate compute instance type, instance count, and desired output Amazon S3 bucket URI. 
# Pass in the ModelName property from the step_create_model CreateModel step.

from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

In [48]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

# Step 8: Define a RegisterModel Step to Create a Model Package

This section shows how to construct an instance of RegisterModel. The result of executing RegisterModel in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. It consists of an inference specification that defines the inference image to use along with an optional model weights location. A model package group is a collection of model packages. You can use a ModelPackageGroup for SageMaker Pipelines to add a new version and model package to the group for every pipeline execution. 

In [49]:
# Construct a RegisterModel step using the estimator instance you used for the training step . 
# Pass in the S3ModelArtifacts property from the step_train training step and specify a ModelPackageGroup. 
# SageMaker Pipelines creates this ModelPackageGroup for you.

from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


# Step 9: Define a Condition Step to Verify Model Accuracy

A ConditionStep allows SageMaker Pipelines to support conditional execution in your pipeline DAG based on the condition of step properties. In this case, you only want to register a model package if the accuracy of that model, as determined by the model evaluation step, exceeds the required value. If the accuracy exceeds the required value, the pipeline also creates a SageMaker Model and runs batch transformation on a dataset. This section shows how to define the Condition step.

In [50]:
# Define a ConditionLessThanOrEqualTo condition using the accuracy value found in the output of the model evaluation processing step, step_eval. 
# Get this output using the property file you indexed in the processing step and the respective JSONPath of the mean squared error value, "mse".

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

In [51]:
# Construct a ConditionStep. Pass the ConditionEquals condition in, 
# then set the model package registration and batch transformation steps as the next steps if the condition passes.
step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

# Step 10: Create a pipeline

Now that you’ve created all of the steps, combine them into a pipeline.

In [52]:
# Define the following for your pipeline: name, parameters, and steps. Names must be unique within an (account, region) pair.
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

# Step 11: Start the Pipeline

In [53]:
import json

json.loads(pipeline.definition())

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipel

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-861237447287/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-west-1-861237447287/abalone/abalone-dataset-batch.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemak

In [54]:
# Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. 
# The role passed in is used by SageMaker Pipelines to create all of the jobs defined in the steps.
pipeline.upsert(role_arn=role)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipel

{'PipelineArn': 'arn:aws:sagemaker:us-west-1:861237447287:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': 'dc616dcc-e370-419f-99b1-59835a0c2554',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dc616dcc-e370-419f-99b1-59835a0c2554',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Fri, 18 Aug 2023 05:29:28 GMT'},
  'RetryAttempts': 0}}

In [55]:
# Start a pipeline execution.
execution = pipeline.start()

# Step 12: Examine a Pipeline Execution

In [56]:
# Describe the pipeline execution status to ensure that it has been created and started successfully.
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-1:861237447287:pipeline/AbalonePipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-1:861237447287:pipeline/AbalonePipeline/execution/jc77at2bp5wx',
 'PipelineExecutionDisplayName': 'execution-1692336577673',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'jc77at2bp5wx'},
 'CreationTime': datetime.datetime(2023, 8, 18, 5, 29, 37, 592000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 8, 18, 5, 29, 37, 592000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-1:861237447287:user-profile/d-taxlrovo7pbh/default-1691609480239',
  'UserProfileName': 'default-1691609480239',
  'DomainId': 'd-taxlrovo7pbh'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-1:861237447287:user-profile/d-taxlrovo7pbh/default-1691609480239',
  'UserProfileName': 'default-1691609480239',
  'DomainId': 'd-taxlrovo7pbh'},
 'Res

In [57]:
# Wait for the execution to finish.
execution.wait()

In [58]:
# List the execution steps and their status.
execution.list_steps()

[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2023, 8, 18, 5, 40, 26, 780000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 8, 18, 5, 44, 50, 508000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-west-1:861237447287:transform-job/pipelines-jc77at2bp5wx-AbaloneTransform-N6X7qziX7K'}}},
 {'StepName': 'AbaloneCreateModel',
  'StartTime': datetime.datetime(2023, 8, 18, 5, 40, 24, 968000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 8, 18, 5, 40, 26, 43000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-west-1:861237447287:model/pipelines-jc77at2bp5wx-abalonecreatemodel-lkl8nacqkm'}}},
 {'StepName': 'AbaloneRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2023, 8, 18, 5, 40, 24, 968000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 8, 18, 5, 40, 25, 788000, tzinfo=tzlo

In [59]:
# After your pipeline execution is complete, download the resulting  evaluation.json file from Amazon S3 to examine the report.
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'regression_metrics': {'mse': {'value': 5.560277186237306,
   'standard_deviation': 2.3577605579624095}}}

# Step 13: Override Default Parameters for a Pipeline Execution

You can run additional executions of the pipeline by specifying different pipeline parameters to override the defaults.

In [None]:
# Create the pipeline execution. This starts another pipeline execution with the model approval status override set to "Approved". 
# This means that the model package version generated by the RegisterModel step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects.
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

# Step 14: Stop and Delete a Pipeline Execution

In [None]:
# execution.stop()

In [None]:
# pipeline.delete()

### Use EventBridge to trigger sagemaker pipeline 