# Sagemaker Pipeline for Disease Classification Model
In this notebook, we set up a sagemaker pipeline for various steps we have done so far in other notebooks : preprocessing, training, evaluation and deployment. The pipeline is then used for CI/CD by combining with Amazon EventBridge and Lambda.

In [1]:
import sys
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker.config INFO - Fetched defaults config from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


In [2]:
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

bucket = "disease-classification-12052025"
model_package_group_name = "disease-classification-model-07012026"

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


In [3]:
# Copy data files to S3
s3 = boto3.client("s3")

# s3.upload_file("train.csv", bucket, "pipeline/train.csv")
# s3.upload_file("test.csv", bucket, "pipeline/test.csv")
# s3.upload_file("test_file_no_target.csv", bucket, "pipeline/test_file_no_target.csv")
# s3.upload_file("test_file_no_target_for_monitor.csv", bucket, "pipeline/test_file_no_target_for_monitor.csv")

In [4]:
train_data_before_processing_uri = f"s3://{bucket}/pipeline/Training.csv"
test_data_before_processing_uri = f"s3://{bucket}/pipeline/Testing.csv"

In [5]:
pipeline_uri = f"s3://{bucket}/pipeline"
train_data_uri = f"{pipeline_uri}/train.csv"
val_data_uri = f"{pipeline_uri}/val.csv"
test_data_uri = f"{pipeline_uri}/test.csv"
test_no_target_data_uri = f"{pipeline_uri}/test_no_target.csv"

## Parameters to Parametrize Pipeline Execution
In this section, we define Pipeline parameters that can be used to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The parameters defined in this workflow include:

1. processing_instance_count - The instance count of the processing job.
2. instance_type - The ml.* instance type of the training job.
3. model_approval_status - The approval status to register with the trained model for CI/CD purposes ("PendingManualApproval" is the default).
4. input_data - The S3 bucket URI location of the input data.
5. test_data - The S3 bucket URI location of the test data.
6. acc_threshold - The accuracy threshold used to verify the performance of a model.

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=train_data_before_processing_uri,
)
test_data = ParameterString(
    name="TestData",
    default_value=test_data_before_processing_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=test_no_target_data_uri,
)
acc_threshold = ParameterFloat(name="AccThreshold", default_value=0.97)

## Processing Step for Feature Engineering

In this section, we first develop a preprocessing script that is specified in the Processing step.

We write a file preprocessing.py, which contains the preprocessing script. The preprocessing script does the following:
1. Removes the 'Unnamed:133' column in Training.csv
2. Label encodes target categories
3. Obtain training, validation, and test datasets.

The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

In [7]:
!mkdir -p code

In [8]:
%%writefile code/preprocessing.py
import numpy as np
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    train_val_df = pd.read_csv(f"{base_dir}/input/Training.csv")
    test_df = pd.read_csv(f"{base_dir}/test_input/Testing.csv")
    
    #-----------------------------------------------------------------------------------------------------
    # Handle Missing Values : Remmoving 'Unnamed: 133' column from training dataset
    #-----------------------------------------------------------------------------------------------------
    train_val_df = train_val_df.drop(labels='Unnamed: 133',axis=1)
    
    #-----------------------------------------------------------------------------------------------------
    # Label Encoding Target Values in Training and Testing Datasets
    #-----------------------------------------------------------------------------------------------------
    le = LabelEncoder()
    encoded = le.fit_transform(train_val_df['prognosis'])
    train_val_df['Encoded_Labels'] = encoded

    train_val_df = train_val_df.drop(labels="prognosis", axis=1)

    # Encode labels for test dataset
    encoded_test = le.transform(test_df['prognosis'])
    test_df['Encoded_Labels'] = encoded_test
    
    # Drop prognosis column from test dataset
    test_df = test_df.drop(labels="prognosis", axis=1)
    
    #-----------------------------------------------------------------------------------------------------
    # Replace space in column names with underscore
    train_val_df.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
    
    # Remove brackets in column names
    train_val_df.rename(columns=lambda x: x.replace('(', ''), inplace=True)
    train_val_df.rename(columns=lambda x: x.replace(')', ''), inplace=True)
    
    # Removing full stop in column names
    train_val_df.rename(columns=lambda x: x.replace('.', ''), inplace=True)
    
    # Removing multiple underscore together at the end of column names
    train_val_df.rename(columns=lambda x: x.replace('__', '_'), inplace=True)
    
    # Removing underscore at the end of column names
    train_val_df.columns = train_val_df.columns.str.rstrip('_')
    
    #-----------------------------------------------------------------------------------------------------
    # Replace space in column names with underscore
    test_df.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
    
    # Remove brackets in column names
    test_df.rename(columns=lambda x: x.replace('(', ''), inplace=True)
    test_df.rename(columns=lambda x: x.replace(')', ''), inplace=True)
    
    # Removing full stop in column names
    test_df.rename(columns=lambda x: x.replace('.', ''), inplace=True)
    
    # Removing multiple underscore together at the end of column names
    test_df.rename(columns=lambda x: x.replace('__', '_'), inplace=True)
    
    # Removing underscore at the end of column names
    test_df.columns = test_df.columns.str.rstrip('_')

    #-----------------------------------------------------------------------------------------------------
    # Creating Training and Validation datasets using data in Training.csv after eliminating unnecessary columns
    # and reordering columns.
    #-----------------------------------------------------------------------------------------------------
    target_column   = "Encoded_Labels"
    dataset  = train_val_df
    
    # Reorder the columns to make the last column (target) the first column
    dataset = dataset[[target_column] + [col for col in dataset.columns if col != target_column]]
    
    # Splitting to train and validation datasets
    X = dataset.drop(columns=['Encoded_Labels'])
    y = dataset['Encoded_Labels']
    
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    training_dataset = pd.DataFrame(y_train, columns=['Encoded_Labels'])
    training_dataset = pd.concat([training_dataset, X_train], axis=1)
    
    validation_dataset = pd.DataFrame(y_val, columns=['Encoded_Labels'])
    validation_dataset = pd.concat([validation_dataset, X_val], axis=1)
    #-----------------------------------------------------------------------------------------------------
    # Creating Test Dataset after eliminating and reordering certain columns
    #-----------------------------------------------------------------------------------------------------
    test_dataset        = test_df
    test_data_no_target = test_dataset.drop(columns=['Encoded_Labels'])
    
    # Reorder the columns to make the last column (target) the first column
    test_dataset = test_dataset[[target_column] + [col for col in test_dataset.columns if col != target_column]]

    pd.DataFrame(training_dataset).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation_dataset).to_csv(
        f"{base_dir}/val/val.csv", header=False, index=False
    )
    pd.DataFrame(test_dataset).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
    pd.DataFrame(test_data_no_target).to_csv(f"{base_dir}/test_no_target/test_no_target.csv", header=False, index=False)

Overwriting code/preprocessing.py


Next, we create an instance of a SKLearnProcessor processor and use that in our ProcessingStep. We take the output of the processor's run method and pass that as arguments to the ProcessingStep.

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-disease_classification-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
        ProcessingInput(source=test_data, destination="/opt/ml/processing/test_input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=train_data_uri),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/val", destination=val_data_uri),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=test_data_uri),
        ProcessingOutput(output_name="test_no_target", source="/opt/ml/processing/test_no_target", destination=test_no_target_data_uri),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="DiseaseClassificationProcess", step_args=processor_args)

## Training Step to Train the Model

In [11]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"{pipeline_uri}/models"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="multi:softmax",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    verbosity=0,
    num_class=41,
    num_round=100,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.Environment


We use the output of the estimator's .fit() method as arguments to the TrainingStep.

In [12]:
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="DiseaseClassificationTrain",
    step_args=train_args,
)

## Model Evaluation Step to Evaluate the Trained Model
In this section, we first code an evaluation script that is specified in a Processing step that performs the model evaluation. After pipeline execution, we can then examine the resulting evaluation.json.

The evaluation script uses xgboost to do the following:
1. Load the model.
2. Read the test data.
3. Issue predictions against the test data.
4. Build a classification report, including accuracy, recall, precision, F1 and ROC curve.
5. Save the evaluation report to the evaluation directory.

In [13]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    acc  = accuracy_score(y_test, predictions)
    rec  = recall_score(y_test, predictions, average='weighted')
    prec = precision_score(y_test, predictions, average='weighted')
    f1   = f1_score(y_test, predictions, average='weighted')
    
    report_dict = {
        "classification_metrics": {
            "accuracy": {"value": acc},
            "recall": {"value": rec},
            "precision": {"value": prec},
            "f1": {"value": f1},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


Next, we create an instance of a ScriptProcessor processor and use it in the ProcessingStep.

In [14]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-disease_classification-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

Next, we use the processor's arguments returned by .run() to construct a ProcessingStep.

In [15]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="DiseaseClassififcationEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Create Model Step to Create a Model
In order to perform batch transformation using the model, we create a SageMaker model.

In [16]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

Next, we define the ModelStep by providing the return values from model.create() as the step arguments.

In [17]:
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="DiseaseClassificationCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Transform Step to Perform Batch Transformation
We create a Transformer instance with the appropriate model type, compute instance type, and desired output S3 URI.

In [18]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"{pipeline_uri}/batch-transform",
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


We pass in the transformer instance and the TransformInput with the batch_data pipeline parameter defined earlier.

In [19]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="DiseaseClassificationTransform", transformer=transformer, inputs=TransformInput(data=batch_data,split_type = 'Line',content_type='text/csv')
)

## Register Model Step to Create a Model Package

In [20]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="DiseaseClassificationRegisterModel", step_args=register_args)

## Fail Step to Terminate the Pipeline Execution and Mark it as Failed
In this section, we create a fail step:

1. Define a FailStep with customized error message, which indicates the cause of the execution failure.
2. Enter the FailStep error message with a Join function, which appends a static text string with the dynamic acc_threshold parameter to build a more informative error message.

In [21]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="DiseaseClassificationAccFail",
    error_message=Join(on=" ", values=["Execution failed due to Accuracy <", acc_threshold]),
)

## Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State
In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step step_eval, exceeded a specified value. Otherwise, the pipeline execution fails and terminates. A ConditionStep enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

In the following section, we:

1. Define a ConditionLessThanOrEqualTo on the accuracy value found in the output of the evaluation step, step_eval.
2. Use the condition in the list of conditions in a ConditionStep.
3. Pass the CreateModelStep and TransformStep steps, and the RegisterModel step collection into the if_steps of the ConditionStep, which are only executed if the condition evaluates to True.
4. Pass the FailStep step into the else_stepsof the ConditionStep, which is only executed if the condition evaluates to False.

In [22]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value",
    ),
    right=acc_threshold,
)

step_cond = ConditionStep(
    name="DiseaseClassificationAccCond",
    conditions=[cond_gte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Defining and starting a Pipeline
In this section, we combine the steps into a Pipeline so it can be executed.

In [23]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"DiseaseClassificationPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        test_data,
        batch_data,
        acc_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


### Examining the pipeline definition
The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [24]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://disease-classification-12052025/pipeline/Training.csv'},
  {'Name': 'TestData',
   'Type': 'String',
   'DefaultValue': 's3://disease-classification-12052025/pipeline/Testing.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://disease-classification-12052025/pipeline/test_no_target.csv'},
  {'Name': 'AccThreshold', 'Type': 'Float', 'DefaultValue': 0.97}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DiseaseClassificationProcess',
   'Type': 'Processing

### Submit the pipeline to SageMaker and start execution
In this section, we submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [25]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:657480822269:pipeline/DiseaseClassificationPipeline',
 'ResponseMetadata': {'RequestId': '81e65085-38d5-4bae-9207-39322cd4c8a1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '81e65085-38d5-4bae-9207-39322cd4c8a1',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '97',
   'date': 'Sat, 10 Jan 2026 21:18:10 GMT'},
  'RetryAttempts': 0}}

We start the pipeline and accept all the default parameters.

In [26]:
execution = pipeline.start()

### Pipeline Operations: Examining and Waiting for Pipeline Execution
Let us describe the pipeline execution.

In [27]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:657480822269:pipeline/DiseaseClassificationPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-2:657480822269:pipeline/DiseaseClassificationPipeline/execution/2ot3jlyeen3c',
 'PipelineExecutionDisplayName': 'execution-1768079890739',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2026, 1, 10, 21, 18, 10, 635000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2026, 1, 10, 21, 18, 10, 635000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-2:657480822269:user-profile/d-ojcbzryynz4v/842ed27a-5424-4518-ab2c-889b655afc15',
  'UserProfileName': '842ed27a-5424-4518-ab2c-889b655afc15',
  'DomainId': 'd-ojcbzryynz4v',
  'IamIdentity': {'Arn': 'arn:aws:sts::657480822269:assumed-role/AmazonSageMakerAdminIAMExecutionRole/SageMaker',
   'PrincipalId': 'AROAZSFHJFX6TORGSBBD5:SageMaker',
   'SourceIdentity': '842ed27a-5424-4518-ab2c-889b655afc15'}},
 'LastModifiedBy': {'UserP

In [28]:
execution.wait()

Let us list the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [29]:
execution.list_steps()

[{'StepName': 'DiseaseClassificationTransform',
  'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 45, 628000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2026, 1, 10, 21, 30, 55, 684000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:transform-job/pipelines-2ot3jlyeen3c-DiseaseClassificatio-fXuMr8Vwjh'}},
  'AttemptCount': 1},
 {'StepName': 'DiseaseClassificationRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 43, 642000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2026, 1, 10, 21, 25, 44, 992000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:model-package/disease-classification-model-07012026/1'}},
  'AttemptCount': 1},
 {'StepName': 'DiseaseClassificationCreateModel-CreateModel',
  'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 43, 642000, tzinfo=tzlocal()),
  'EndT

### Examining the Evaluation
In this section, we examine the resulting model evaluation after the pipeline completes. We download the resulting evaluation.json file from S3 and print the report.

In [30]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
{'classification_metrics': {'accuracy': {'value': 0.9761904761904762},
                            'f1': {'value': 0.9761904761904762},
                            'precision': {'value': 0.9880952380952381},
                            'recall': {'value': 0.9761904761904762}}}


### Lineage
In this section, we review the lineage of the artifacts generated by the pipeline.

In [31]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
{'StepName': 'DiseaseClassificationProcess', 'StartTime': datetime.datetime(2026, 1, 10, 21, 18, 11, 496000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 20, 44, 486000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:processing-job/pipelines-2ot3jlyeen3c-DiseaseClassificatio-O5ytreMNMu'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...0404729e5eb8c7df6ab7407/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...sification-12052025/pipeline/Testing.csv,Input,DataSet,ContributedTo,artifact
2,s3://...ification-12052025/pipeline/Training.csv,Input,DataSet,ContributedTo,artifact
3,25775...com/sagemaker-scikit-learn:1.2-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...ion-12052025/pipeline/test_no_target.csv,Output,DataSet,Produced,artifact
5,s3://...lassification-12052025/pipeline/test.csv,Output,DataSet,Produced,artifact
6,s3://...classification-12052025/pipeline/val.csv,Output,DataSet,Produced,artifact
7,s3://...assification-12052025/pipeline/train.csv,Output,DataSet,Produced,artifact


{'StepName': 'DiseaseClassificationTrain', 'StartTime': datetime.datetime(2026, 1, 10, 21, 20, 44, 988000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 23, 8, 690000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:training-job/pipelines-2ot3jlyeen3c-DiseaseClassificatio-fpaUAV01Mf'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...classification-12052025/pipeline/val.csv,Input,DataSet,ContributedTo,artifact
1,s3://...assification-12052025/pipeline/train.csv,Input,DataSet,ContributedTo,artifact
2,25775...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...sificatio-fpaUAV01Mf/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'DiseaseClassififcationEval', 'StartTime': datetime.datetime(2026, 1, 10, 21, 23, 9, 414000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 25, 42, 395000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:processing-job/pipelines-2ot3jlyeen3c-DiseaseClassififcati-xeV3CRrCXt'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...ad5a9b84991c17b957ad7ccbed/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...lassification-12052025/pipeline/test.csv,Input,DataSet,ContributedTo,artifact
2,s3://...sificatio-fpaUAV01Mf/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,25775...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...026-01-10-21-18-01-027/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'DiseaseClassificationAccCond', 'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 42, 695000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 25, 43, 89000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Condition': {'Outcome': 'True'}}, 'AttemptCount': 1}


None

{'StepName': 'DiseaseClassificationCreateModel-CreateModel', 'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 43, 642000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 25, 45, 172000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:model/pipelines-2ot3jlyeen3c-DiseaseClassificatio-R5UVRt5ITO'}}, 'AttemptCount': 1}


None

{'StepName': 'DiseaseClassificationRegisterModel-RegisterModel', 'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 43, 642000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 25, 44, 992000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:model-package/disease-classification-model-07012026/1'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...sificatio-fpaUAV01Mf/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,25775...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,disease-classification-model-07012026-1-176808...,Input,ModelLifeCycle,ContributedTo,action
3,disease-classification-model-07012026-1-Pendin...,Input,Approval,ContributedTo,action
4,disease-classification-model-07012026-17680803...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'DiseaseClassificationTransform', 'StartTime': datetime.datetime(2026, 1, 10, 21, 25, 45, 628000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2026, 1, 10, 21, 30, 55, 684000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-2:657480822269:transform-job/pipelines-2ot3jlyeen3c-DiseaseClassificatio-fXuMr8Vwjh'}}, 'AttemptCount': 1}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...sificatio-fpaUAV01Mf/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,25775...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,s3://...ion-12052025/pipeline/test_no_target.csv,Input,DataSet,ContributedTo,artifact
3,s3://...cation-12052025/pipeline/batch-transform,Output,DataSet,Produced,artifact


## Clean-up

In [30]:
# List and delete all pipelines
pipelines = sagemaker_session.sagemaker_client.list_pipelines()
sm_client = boto3.client('sagemaker')


for p in pipelines['PipelineSummaries']:
    print(f"Name: {p['PipelineName']}")
    sm_client.delete_pipeline(PipelineName=p['PipelineName'])


Name: DiseaseClassificationPipeline


In [35]:
# definition = pipeline.definition()

# Print parameters section
print(definition)


{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "ModelApprovalStatus", "Type": "String", "DefaultValue": "PendingManualApproval"}, {"Name": "InputData", "Type": "String", "DefaultValue": "s3://disease-classification-12052025/pipeline/Training.csv"}, {"Name": "TestData", "Type": "String", "DefaultValue": "s3://disease-classification-12052025/pipeline/Testing.csv"}, {"Name": "BatchData", "Type": "String", "DefaultValue": "s3://disease-classification-12052025/pipeline/test_no_target.csv"}, {"Name": "AccThreshold", "Type": "Float", "DefaultValue": 0.97}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "DiseaseClassificationProcess", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig":