# CI-CD Pipeline

In [25]:
# Import libraries
import sys
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline
from pprint import pprint
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()

base_dir = "/opt/ml/processing"
instance_type = "ml.m5.xlarge"
model_package_group_name = "NutriscoreModelPackageGroupName"

In [28]:
# Set dataset S3 paths
input_data_uri = "s3://sagemaker-us-east-1-654654380268/food_us_subset_100k/"
batch_data_uri = "s3://sagemaker-us-east-1-654654380268/nutriscore-prediction-xgboost/test/test_features_only.csv"

## Define Parameters to Parametrize Pipeline Execution

In [35]:
# Define parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value=instance_type)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri
)
rmse_threshold = ParameterFloat(name="RmseThreshold", default_value=2.0)

In [16]:
# Define processing step
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-nutriscore-process",
    role=role,
    sagemaker_session=pipeline_session,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [122]:
# Run the preprocessing script
processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination=f"{base_dir}/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source=f"{base_dir}/train"),
        ProcessingOutput(output_name="validation", source=f"{base_dir}/validation"),
        ProcessingOutput(output_name="test", source=f"{base_dir}/test"),
    ],
    code="utils/preprocess_nutriscore.py"
)

step_process = ProcessingStep(name="NutriscoreProcess", step_args=processor_args)

## Define a Training Step to Train a Model

In [123]:
# Define model training step
model_path = f"s3://{default_bucket}/NutriscoreTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",
    instance_type=instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:squarederror",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    verbosity=0,
    num_round=100,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)
step_train = TrainingStep(
    name="NutriscoreTrain",
    step_args=train_args,
)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


## Define a Model Evaluation Step

In [124]:
# Define model evaluator
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=instance_type,
    instance_count=1,
    base_job_name="script-nutriscore-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="utils/evaluate_nutriscore.py",
)

In [125]:
# Define evaluation step
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="NutriscoreEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

In [126]:
# Define model step
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

step_create_model = ModelStep(
    name="NutriscoreCreateModel",
    step_args=model.create(instance_type=instance_type, accelerator_type="ml.eia1.medium"),
)

## Define a Transform Step to Perform Batch Transformation

In [127]:
# Define batch transform step
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=instance_type,
    instance_count=1,
    output_path=f"s3://{default_bucket}/NutriscoreTransform",
)

step_transform = TransformStep(
    name="NutriscoreTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## Define a Register Model Step to Create a Model Package

In [128]:
# Define register model step
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", instance_type],
    transform_instances=[instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="NutriscoreRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [129]:
# Define fail step
step_fail = FailStep(
    name="NutriscoreRMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [130]:
# Define condition step
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.rmse.value",
    ),
    right=rmse_threshold,
)

step_cond = ConditionStep(
    name="NutriscoreRMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions

In [131]:
# Define pipeline
pipeline_name = f"NutriscorePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        rmse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

## Submit Pipeline to SageMaker and Start Execution

In [132]:
# Submit pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()



In [40]:
# Describe the pipeline execution
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:654654380268:pipeline/NutriscorePipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:654654380268:pipeline/NutriscorePipeline/execution/sx7i97tw9pke',
 'PipelineExecutionDisplayName': 'execution-1760507965168',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'nutriscorepipeline',
  'TrialName': 'sx7i97tw9pke'},
 'CreationTime': datetime.datetime(2025, 10, 15, 5, 59, 25, 90000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 10, 15, 5, 59, 25, 90000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:654654380268:user-profile/d-bzkrqbjrtwzf/amalinsky',
  'UserProfileName': 'amalinsky',
  'DomainId': 'd-bzkrqbjrtwzf',
  'IamIdentity': {'Arn': 'arn:aws:sts::654654380268:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROAZQ3DRDDWK3JDJSTXQ:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:654654380268:user-profile

In [41]:
# Wait for execution to complete
execution.wait()

In [None]:
# List execution steps
execution.list_steps()

## Examine the Evaluation

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

## Review Artifact Lineage

In [None]:
viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)