# Build a SageMaker Pipeline to train and deploy a XGBoost-Based Turnover classifier

In [1]:
import sagemaker
import boto3
import json
import time
from pprint import pprint



# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker')
sm_runtime = boto3.client('sagemaker-runtime')
sess = sagemaker.Session(sagemaker_client=sm,sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
pipeline_name = 'xgb-turnover-pipeline'

# Configure the dataset and processing step

###  Configure S3 path for raw input data

In [4]:
raw_input_data_s3_uri = 's3://turnover-data-124578/input/'
print(raw_input_data_s3_uri)

s3://turnover-data-124578/input/


In [5]:
!aws s3 ls $raw_input_data_s3_uri

2024-08-18 12:40:20          0 
2024-08-18 14:49:31   12431134 data.csv


### Configure processing step


In [6]:
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterFloat,
)

In [7]:
input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

endpoint_name = ParameterString(
    name="EndpointName",
    default_value='xgb-turnover-prediction-endpoint',
)

In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    base_job_name="TurnoverProcessingJob",
    instance_type='ml.t3.medium',
    instance_count=1,
    env={'AWS_DEFAULT_REGION': region},                             
)

In [9]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='turnover-train',
                     source='/opt/ml/processing/output/turnover/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='turnover-validation',
                     source='/opt/ml/processing/output/turnover/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='turnover-test',
                     source='/opt/ml/processing/output/turnover/test',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='turnover-encoders',
                     source='/opt/ml/processing/output/turnover/encoders',
                     s3_upload_mode='EndOfJob')
]        

processing_step = ProcessingStep(
    name='ProcessingStep', 
    code='src/prepare_data.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train-split-percentage', str(.9),                   
                   '--validation-split-percentage', str(.05),
                   '--test-split-percentage', str(0.05),
                   '--balance-dataset', str("True"),
                  ]
)        

print(processing_step)

<sagemaker.workflow.steps.ProcessingStep object at 0x7f8d32ccbd00>


Pull and review processing inputs from the arguments of the processing step:

#  Configure training step

### Define parameters

Setup the parameters for the workflow. 

In [10]:
train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.m5.large"
)

### Configure the `XGBoostEstimator`

In [11]:
from sagemaker.xgboost.estimator import XGBoost
xgb_estimator = XGBoost(  
    framework_version='1.7-1',
    entry_point='train.py',
    source_dir='src',
    role=role,
    base_job_name="TurnoverTrainingJob",
    instance_count=1,
    instance_type=train_instance_type,
    input_mode='File'
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


### Setup pipeline step caching

In [12]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`

### Configure the `TrainingStep`

In [13]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='TrainingStep',
    estimator=xgb_estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'turnover-train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'turnover-validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'encoders': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'turnover-encoders'
            ].S3Output.S3Uri,
            content_type='application/octet-stream'
        )
    },
    cache_config=cache_config
)

print(training_step)

<sagemaker.workflow.steps.TrainingStep object at 0x7f8d33019f30>




# Configure model-evaluation step


In [14]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    base_job_name='TurnoverEvaluationProcessingJob',
    instance_type='ml.t3.medium',
    instance_count=1,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

In [15]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method.

In [16]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModelStep',
    processor=evaluation_processor,
    code='src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['turnover-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        ),
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    
    property_files=[evaluation_report],
)

# Configure and register model step

### Configure the model for deployment

In [17]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

In [18]:
timestamp = str(int(time.time()))
model_package_group_name = "xgb-turnover-mpgn-"+timestamp

print(model_package_group_name)

xgb-turnover-mpgn-1724866187


In [19]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


<sagemaker.model_metrics.ModelMetrics object at 0x7f8d32e97070>


In [20]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",
    py_version="py38",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.7-1


### Register the model for deployment

In [21]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModelStep",
    estimator=xgb_estimator,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=inference_image_uri, 
    content_types=["text/csv","application/json"],
    response_types=["text/csv","application/json"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

# Create model for deployment step

In [22]:
from sagemaker.model import Model

model_name = 'xgb-turnover-model-'+timestamp

model = Model(
    name=model_name,
    entry_point='src/inference.py',
    image_uri=inference_image_uri, 
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [23]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)

In [24]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModelStep",
    model=model, 
    inputs=create_inputs, 
)

## Create Lambda step

In [25]:
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda
from src.iam_helper import create_sagemaker_lambda_role

lambda_role = create_sagemaker_lambda_role("deploy-model-lambda-role")
deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda"

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script="src/deploy_model_lambda.py",
    handler="deploy_model_lambda.lambda_handler",
)

deploy_model_step = LambdaStep(
    name="Deploy-Turnover-Model-To-Endpoint",
    lambda_func=deploy_model_lambda_function,
    inputs={
        "model_name": create_step.properties.ModelName,
        "endpoint_name": endpoint_name,
        "endpoint_instance_type": deploy_instance_type,
    },
)

Using ARN from existing role: deploy-model-lambda-role


# Check accuracy condition step

In [26]:
min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.5
)

In [27]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyConditionStep",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step,deploy_model_step], 
    else_steps=[], 
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Create pipeline

### Define a pipeline of parameters, steps, and conditions

In [28]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        endpoint_name,
        train_instance_type,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

In [None]:
definition = json.loads(pipeline.definition())

pprint(definition)

In [None]:
pipeline.upsert(role)