# Using SageMaker pipelines for MLOps workflows

This notebook contains end-to-end code to construct and execute a secure MLOps pipeline in your data science environment. It contains all necessary all code in one place. You can use and modify this code for your experiments and tests.
  


In [58]:
if False:
    !pip install --disable-pip-version-check -q sagemaker==2.47.1

In [None]:
!python --version

In [60]:
if False:
    !pip install -U sagemaker

In [190]:
import boto3
import sagemaker
import sagemaker.session
import json

print(f"SageMaker version: {sagemaker.__version__}")

SageMaker version: 2.232.2


In [191]:
sm = boto3.client("sagemaker")
ssm = boto3.client("ssm")

def get_environment(project_name, ssm_params):
    r = sm.describe_domain(
            DomainId=sm.describe_project(
                ProjectName=project_name
                )["CreatedBy"]["DomainId"]
        )
    del r["ResponseMetadata"]
    del r["CreationTime"]
    del r["LastModifiedTime"]
    r = {**r, **r["DefaultUserSettings"]}
    del r["DefaultUserSettings"]

    i = {
        **r,
        **{t["Key"]:t["Value"] 
            for t in sm.list_tags(ResourceArn=r["DomainArn"])["Tags"] 
            if t["Key"] in ["EnvironmentName", "EnvironmentType"]}
    }

    for p in ssm_params:
        try:
            i[p["VariableName"]] = ssm.get_parameter(Name=f"{i['EnvironmentName']}-{i['EnvironmentType']}-{p['ParameterName']}")["Parameter"]["Value"]
        except:
            i[p["VariableName"]] = ""

    return i


<div class="alert alert-info"> 💡 <strong> Get environment variables </strong>

Set the <b>`project_name`</b> to the name of the current SageMaker project.
Various environment data is loaded and shown:
</div>

In [192]:
import boto3
import sagemaker
from sagemaker.session import Session

# Initialize SageMaker session
sagemaker_session = sagemaker.Session()

# Use your default S3 bucket for storing data
default_bucket = sagemaker_session.default_bucket()

# Set the data paths
input_data_uri = f"s3://{default_bucket}/your-data-path/traindata_medium.csv"
print(f"Input data path: {input_data_uri}")

# You can now use input_data_uri for your pipelines or other resources


Input data path: s3://sagemaker-us-east-1-361019805433/your-data-path/traindata_medium.csv


In [193]:
import boto3
import sagemaker
from sagemaker.session import Session

# Initialize SageMaker session
sagemaker_session = sagemaker.Session()

# Get the current region
region = boto3.Session().region_name

# Retrieve execution role (update this to the appropriate role if needed)
pipeline_role = sagemaker.get_execution_role()  # Use the current SageMaker execution role
processing_role = pipeline_role  # Assuming the same role for processing and training
training_role = pipeline_role

# Use default S3 bucket for storing data and models
data_bucket = sagemaker_session.default_bucket()
model_bucket = data_bucket  # Using the same bucket for models unless specified otherwise

# If you're using KMS encryption (remove or update these if you're not using KMS)
ebs_kms_id = None  # Set this if using KMS for EBS volume encryption
s3_kms_id = None   # Set this if using KMS for S3 encryption

print(f"SageMaker version: {sagemaker.__version__}")
print(f"Region: {region}")

# Customize project and pipeline names as per your needs
project_name = "HospitalLOS"
model_package_group_name = f"{project_name}"
pipeline_name = f"{project_name}-Pipeline"

print(f"Model Package Group Name: {model_package_group_name}")
print(f"Pipeline Name: {pipeline_name}")


SageMaker version: 2.232.2
Region: us-east-1
Model Package Group Name: HospitalLOS
Pipeline Name: HospitalLOS-Pipeline


In [194]:
import os

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.condition_step import (
    ConditionStep
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.network import NetworkConfig

BASE_DIR="./pipelines/Hlos/"

# Upload the data to S3

In [195]:
from sagemaker.s3 import S3Uploader

# Define the S3 base URI for the dataset
dataset_s3_uri = f"s3://{sagemaker_session.default_bucket()}/HospitalLOS/traindata_medium.csv"

# Upload the CSV to the defined URI
input_data_uri = S3Uploader.upload(
    local_path="traindata_medium.csv",  # Local path to your CSV
    desired_s3_uri=dataset_s3_uri
)
print(f"Data uploaded to {input_data_uri}")


Data uploaded to s3://sagemaker-us-east-1-361019805433/HospitalLOS/traindata_medium.csv/traindata_medium.csv


In [196]:
# You can proceed without specifying NetworkConfig if you don't have security group or subnet IDs

# Define instance parameters for processing and training
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=dataset_s3_uri,
)

# Continue with the rest of the pipeline setup without network configuration


In [197]:
# network_config = NetworkConfig(
#         enable_network_isolation=False, 
#         security_group_ids=env_data["SecurityGroups"],
#         subnets=env_data["SubnetIds"],
#         encrypt_inter_container_traffic=True)

In [198]:
base_job_prefix="Hlos"

# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-HLOS-preprocess",
    sagemaker_session=sagemaker_session,
    role=processing_role,
  #  network_config=network_config,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [199]:
 step_process = ProcessingStep(
        name="PreprocessHLOSData",
        processor=sklearn_processor,
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ],
        code=os.path.join(BASE_DIR, "preprocess.py"),
        job_arguments=["--input-data", input_data],
    )

In [200]:
# Training step for generating model artifacts
model_path = f"s3://{model_bucket}/{base_job_prefix}/HlosTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

# Set up the XGBoost Estimator with the updated classification parameters
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/Hlos-train",
    sagemaker_session=sagemaker_session,
    role=training_role,
    #subnets=network_config.subnets,
    #security_group_ids=network_config.security_group_ids,
    #encrypt_inter_container_traffic=True,
    #enable_network_isolation=False,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)

# Set hyperparameters for multi-class classification
xgb_train.set_hyperparameters(
    objective="multi:softmax",  # Change to "multi:softprob" for class probabilities
    num_class=12,  # Update with the actual number of unique classes in 'Stay'
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)




In [201]:
step_train = TrainingStep(
    name="TrainHlosModel",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

In [202]:
# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-Hlos-eval",
    sagemaker_session=sagemaker_session,
    role=processing_role,
    #network_config=network_config,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)

In [203]:
evaluation_report = PropertyFile(
        name="HlosEvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )

In [204]:
step_eval = ProcessingStep(
        name="EvaluateHlosModel",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        code=os.path.join(BASE_DIR, "evaluate.py"),
        property_files=[evaluation_report],
    )

In [205]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)



In [206]:
# vpc_config = {
#     "Subnets":network_config.subnets,
#     "SecurityGroupIds":network_config.security_group_ids
# }

In [207]:
"""
There is a bug in RegisterModel implementation
The RegisterModel step is implemented in the SDK as two steps, a _RepackModelStep and a _RegisterModelStep. 
The _RepackModelStep runs a SKLearn training step in order to repack the model.tar.gz to include any custom inference code in the archive. 
The _RegisterModelStep then registers the repacked model.

The problem is that the _RepackModelStep does not propagate VPC configuration from the Estimator object:
https://github.com/aws/sagemaker-python-sdk/blob/cdb633b3ab02398c3b77f5ecd2c03cdf41049c78/src/sagemaker/workflow/_utils.py#L88

This cause the AccessDenied exception because repacker cannot access S3 bucket (all access which is not via VPC endpoint is blocked by the bucket policy)

The issue is opened against SageMaker module: https://github.com/aws/sagemaker-python-sdk/issues/2302
"""

step_register = RegisterModel(
    name="RegisterHlosModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
   # vpc_config_override=vpc_config
)

In [208]:
#xgb_train.get_vpc_config()

In [209]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

# Condition step for evaluating model quality using accuracy
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value"  # Update to match the classification metric
    ),
    right=0.6,  # Desired minimum accuracy threshold
)

# Update the ConditionStep with the accuracy condition
step_cond = ConditionStep(
    name="CheckAccuracyHospitalLOSEvaluation",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)


In [210]:
# pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

In [211]:
pipeline.upsert(role_arn=pipeline_role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:361019805433:pipeline/HospitalLOS-Pipeline',
 'ResponseMetadata': {'RequestId': '6b1f487a-43f6-472f-b8e5-aec214997b90',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6b1f487a-43f6-472f-b8e5-aec214997b90',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Fri, 25 Oct 2024 21:26:05 GMT'},
  'RetryAttempts': 0}}

In [212]:
parsed = json.loads(pipeline.definition())
print(json.dumps(parsed, indent=2, sort_keys=True))



{
  "Metadata": {},
  "Parameters": [
    {
      "DefaultValue": "ml.m5.xlarge",
      "Name": "ProcessingInstanceType",
      "Type": "String"
    },
    {
      "DefaultValue": 1,
      "Name": "ProcessingInstanceCount",
      "Type": "Integer"
    },
    {
      "DefaultValue": "ml.m5.xlarge",
      "Name": "TrainingInstanceType",
      "Type": "String"
    },
    {
      "DefaultValue": "PendingManualApproval",
      "Name": "ModelApprovalStatus",
      "Type": "String"
    },
    {
      "DefaultValue": "s3://sagemaker-us-east-1-361019805433/HospitalLOS/traindata_medium.csv",
      "Name": "InputDataUrl",
      "Type": "String"
    }
  ],
  "PipelineExperimentConfig": {
    "ExperimentName": {
      "Get": "Execution.PipelineName"
    },
    "TrialName": {
      "Get": "Execution.PipelineExecutionId"
    }
  },
  "Steps": [
    {
      "Arguments": {
        "AppSpecification": {
          "ContainerArguments": [
            "--input-data",
            {
              "Get": "Par

The following line starts the pipeline execution. In this specific example it runs for about 13 minutes.

In [213]:
execution = pipeline.start()

In [216]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:361019805433:pipeline/HospitalLOS-Pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:361019805433:pipeline/HospitalLOS-Pipeline/execution/dpd4jxouj64g',
 'PipelineExecutionDisplayName': 'execution-1729891569344',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'hospitallos-pipeline',
  'TrialName': 'dpd4jxouj64g'},
 'CreationTime': datetime.datetime(2024, 10, 25, 21, 26, 9, 286000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 25, 21, 34, 24, 455000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:361019805433:user-profile/d-ttv0us1l2sdg/dshalaby',
  'UserProfileName': 'dshalaby',
  'DomainId': 'd-ttv0us1l2sdg',
  'IamIdentity': {'Arn': 'arn:aws:sts::361019805433:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROAVIDTVB342CB4YCBW4:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:361019805433:user-

In [215]:
execution.wait()

In [217]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))



{'classification_metrics': {'accuracy': {'value': 0.9595528199974878}}}


## Clean up

### Delete SageMaker project
This will delete the associated CloudFormation stack and CodeCommit repository

In [None]:
print(f"Deleting project {project_name}:{sm.delete_project(ProjectName=project_name)}")

### Delete project S3 bucket 
This will remove all files and S3 bucket

In [None]:
!aws s3 rb s3://sm-mlops-cp-{project_name}-{project_id} --force

## Release resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>