In [18]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.pipeline import PipelineModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep

from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.model import Model
from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.properties import PropertyFile

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

from sagemaker.tuner import HyperparameterTuner, CategoricalParameter
from sagemaker.workflow.steps import TuningStep

from sagemaker.workflow.functions import Join

In [19]:
#Initialization
role = get_execution_role()
sagemaker_session = PipelineSession()

bucket = 'temp129428'
data = 'bank_clean.csv'
region = region = boto3.Session().region_name
input_folder_name = 'input'
output_folder_name = 'output'

input_data_path = f's3://{bucket}/{input_folder_name}/{data}'
#input_mappings_path = f's3://{bucket}/{input_folder_name}/mappings.csv'
output_train_path = f's3://{bucket}/{output_folder_name}/train/'
output_test_path = f's3://{bucket}/{output_folder_name}/test/'
output_txt_path = f's3://{bucket}/{output_folder_name}/txt/' #Will be used for metadata

preprocessing_script_path = f's3://{bucket}/{input_folder_name}/preprocessing.py'
model_path = f"s3://{bucket}/{output_folder_name}/model"
eval_script_path = f's3://{bucket}/{input_folder_name}/evaluate.py'
deployment_script_path = f"s3://{bucket}/{input_folder_name}/deploy_model.py"

In [20]:
#Make sure if you create new params, add it in pipeline definition
input_data = ParameterString(
    name="InputData",
    default_value=input_data_path,
)

# input_mappings = ParameterString(
#     name="InputMappings",
#     default_value=input_mappings_path,
# )

preprocessed_data_train = ParameterString(
    name="PreprocessedDataTrain",
    default_value=output_train_path,
)

preprocessed_data_test = ParameterString(
    name="PreprocessedDataTest",
    default_value=output_test_path,
)

#Used for Metadata
preprocessed_data_txt = ParameterString(
    name="PreprocessedDataTXT",
    default_value=output_txt_path,
)

In [21]:
#Script processor with sklearn image
script_processor = ScriptProcessor(
    role=role,
    image_uri=sagemaker.image_uris.retrieve('sklearn', region=region, version='0.23-1'),  # Example image
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.4xlarge', #Use the instance that is optimal
    #instance_type='ml.m5.xlarge',
    sagemaker_session=sagemaker_session
)

In [22]:
processing_step = ProcessingStep(
    name="PreprocessData",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=input_data,
            destination='/opt/ml/processing/input'
        ),
        #Add as many inputs that are required
        # ProcessingInput(
        #     source=input_mappings,
        #     destination='/opt/ml/processing/mappings'
        # ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='trainoutput',
            source='/opt/ml/processing/output/train',
            destination=preprocessed_data_train
        ),
        ProcessingOutput(
            output_name='testoutput',
            source='/opt/ml/processing/output/test',
            destination=preprocessed_data_test
        ),
        ProcessingOutput(
            output_name='columnoutput',
            source='/opt/ml/processing/output/txt',
            destination=preprocessed_data_txt
        )
    ],
    code=preprocessing_script_path,
    job_arguments=[
        '--input', f'/opt/ml/processing/input/{data}',
        #'--input_mappings', '/opt/ml/processing/mappings/mappings.csv',
        '--output_train', '/opt/ml/processing/output/train/processed_train.csv',
        '--output_test', '/opt/ml/processing/output/test/processed_test.csv',
        '--output_txt', '/opt/ml/processing/output/txt/'
    ]
)

Training Steps

In [23]:
hyperparameters = {
 'num_round': 400,
 'objective': 'binary:logistic'  # Added binary logistic objective
}

estimator = sagemaker.estimator.Estimator(
    image_uri= sagemaker.image_uris.retrieve("xgboost", region, "1.2-1"), 
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1, 
    instance_type='ml.m5.2xlarge',
    volume_size=5,  # 5 GB 
    output_path=model_path,
    use_spot_instances=True,
    max_wait=600, 
    max_run=300,
    sagemaker_session=sagemaker_session
)

training_step = TrainingStep(
    name="TrainModel",
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs['trainoutput'].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

Eval Steps

In [24]:
# Define the ScriptProcessor for evaluation
evaluation_processor = ScriptProcessor(
    role=role,
    image_uri=sagemaker.image_uris.retrieve("xgboost", region, "1.2-1"), #Same one as training
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge',
    base_job_name="evaluate-model",
    sagemaker_session=sagemaker_session,
)

# Define the evaluation step and property file
evaluation_property_file = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['trainoutput'].S3Output.S3Uri,
            destination="/opt/ml/processing/train"
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['testoutput'].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f's3://{bucket}/{output_folder_name}/evaluation'
        )
    ],
    code=eval_script_path,
    job_arguments=[
        '--model-path', '/opt/ml/processing/model',
        '--train-path', 'opt/ml/processing/train',
        '--test-path', '/opt/ml/processing/test',
        '--output-path', '/opt/ml/processing/evaluation'
    ],
    property_files=[evaluation_property_file]
)

Inference Pipeline without Lambda

In [25]:
preprocessing_model = Model(
    image_uri=sagemaker.image_uris.retrieve("sklearn", region, "0.23-1"),
    model_data=None,  # No pre-trained model file required for preprocessing
    role=role,
    entry_point='preprocess_inference.py',  # Preprocessing script saved locally
    sagemaker_session=sagemaker_session,
    env={'BUCKET': bucket, "OUTPUTPATH": output_folder_name+'/txt/'}
)

inference_model = Model(
    image_uri=training_step.properties.AlgorithmSpecification.TrainingImage, #xgboost
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    entry_point='inference.py', #saved locally
    sagemaker_session=sagemaker_session,
    env={'BUCKET': bucket, "OUTPUTPATH": output_folder_name+'/txt/'}
)

model = PipelineModel(
    name='pipeline-model',
    role=role,
    models=[preprocessing_model, inference_model],
    sagemaker_session=sagemaker_session
)

# Create the model step
model_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type="ml.t3.medium")
)

Fail Step


In [26]:
fail_step = FailStep(
    name="AccuracyBelowThreshold",
    error_message=Join(on=" ", values=["Execution failed due to Model underperforming the baseline performance"]),
)

Deployment Step

In [27]:
# Define the SKLearnProcessor for deployment
deploy_model_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    volume_size_in_gb=60,
    base_job_name="deployingtest",
    sagemaker_session=sagemaker_session,
)

# Define the processing step to deploy the model
deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name", model_step.properties.ModelName,
        "--region", region,
        "--endpoint-instance-type", "ml.t2.medium",
        "--endpoint-name", "temp", 
    ],
    code=deployment_script_path,
)

Conditional Step

In [28]:
# Define the condition step
condition_step = ConditionStep(
    name="CheckEvaluation",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name="EvaluateModel",
                property_file=evaluation_property_file,
                json_path="classification_metrics.BSS.value"
            ),
            right=-0.5  # Set the accuracy threshold
        )
    ],
    if_steps=[model_step, deploy_step],
    else_steps=[fail_step] #Try to HPO, retrain, and then notify the developer
)

Pipeline Creation

In [29]:
# Create the pipeline
pipeline = Pipeline(
    name="demopipeline",
    parameters=[input_data, preprocessed_data_train, preprocessed_data_test, preprocessed_data_txt],
    steps=[processing_step, training_step, evaluation_step, condition_step],
    sagemaker_session=sagemaker_session,
)

Pipeline Execution

In [30]:
### Execute the pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

### Testing Endpoint

In [31]:
#!pip install requests_aws4auth
import json
import requests
import time
import boto3
from requests_aws4auth import AWS4Auth
import pandas as pd
import random
import datetime
import numpy as np

# AWS credentials and region
session = boto3.Session()
credentials = session.get_credentials()
region = 'us-east-2'

# Create an AWS4Auth object
auth = AWS4Auth(credentials.access_key, credentials.secret_key, region, 'sagemaker', session_token=credentials.token)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [32]:
# SageMaker endpoint URL
url = 'https://runtime.sagemaker.us-east-2.amazonaws.com/endpoints/.../invocations' #Edit this to your newly created endpoint

# Define the headers explicitly without charset
headers = {
    "Content-Type": "application/json"
}

d = pd.read_csv('bank_clean.csv')

In [None]:
for i in range(20):
    
    row = random.randint(0, d.shape[0])
    #row = i

    d_obs = d.iloc[row].to_dict()
    print(row)
    data = json.dumps(d_obs)

    # Send the POST request
    start_time = time.time()
    response = requests.post(url, auth=auth, headers=headers, data=data)
    end_time = time.time()

    latency = (end_time - start_time) * 1000

    # Print the latency
    print(f"Request latency: {latency:.0f}ms")

    # Print the response
    print("Status Code:", response.status_code)
    print("Response Body:", response.json())