In [1]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [11]:
local_path = "train_user_product_pairs.csv"

s3 = boto3.resource("s3")

base_uri = f"s3://{default_bucket}"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-669784505207//train_user_product_pairs.csv


In [3]:
local_path = "batch_commerce.csv"

s3 = boto3.resource("s3")

base_uri = f"s3://{default_bucket}"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-669784505207//batch_commerce.csv


## Define Parameters to Parametrize Pipeline Execution

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

In [7]:
!mkdir -p code

## Define a Processing Step for Feature Engineering

In [5]:
# Get your session and default bucket
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "commerce-recommender"
role = sagemaker.get_execution_role()

# S3 paths to your ORIGINAL, unprocessed data
s3_input_train = f"s3://{bucket}/{prefix}/input/raw/train_user_product_pairs.csv"
s3_input_validation = f"s3://{bucket}/{prefix}/input/raw/validation_user_product_pairs.csv"

# First, upload your original files if you haven't already
sess.upload_data(path='train_user_product_pairs.csv', bucket=bucket, key_prefix=f"{prefix}/input/raw")
sess.upload_data(path='validation_user_product_pairs.csv', bucket=bucket, key_prefix=f"{prefix}/input/raw")

's3://sagemaker-us-east-1-669784505207/commerce-recommender/input/raw/validation_user_product_pairs.csv'

In [6]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="recommender-preprocessing",
    sagemaker_session=pipeline_session
)

print("Starting SageMaker Processing Job.")
processor_args = sklearn_processor.run(
    code="preprocess.py",
    inputs=[
        ProcessingInput(
            source=s3_input_train,
            destination="/opt/ml/processing/input/train",
            s3_data_distribution_type="FullyReplicated"
        ),
        ProcessingInput(
            source=s3_input_validation,
            destination="/opt/ml/processing/input/validation",
            s3_data_distribution_type="FullyReplicated"
        )
    ],

    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/train", destination=f"s3://{bucket}/{prefix}/processed/train"),
        ProcessingOutput(source="/opt/ml/processing/output/validation", destination=f"s3://{bucket}/{prefix}/processed/validation"),
        ProcessingOutput(source="/opt/ml/processing/output/batch", destination=f"s3://{bucket}/{prefix}/processed/batch")
    ],

    arguments=[
        "--train-input", "train_user_product_pairs.csv",
        "--validation-input", "validation_user_product_pairs.csv"
    ]
)

step_process = ProcessingStep(name="CommerceProcess", step_args=processor_args)

Starting SageMaker Processing Job.




## Define a Training Step to Train a Model

In [9]:
%%time
from time import gmtime, strftime

job_name = "xgb-recommender-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_location = "s3://{}/{}/output/{}".format(bucket, prefix, job_name)
image = sagemaker.image_uris.retrieve(
    framework="xgboost", region=boto3.Session().region_name, version="1.7-1"
)

sm_estimator = sagemaker.estimator.Estimator(
    image,
    role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=50,
    input_mode="File",
    output_path=output_location,
    sagemaker_session=pipeline_session
)

sm_estimator.set_hyperparameters(
    objective="binary:logistic",
    eval_metric="auc",
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    verbosity=0,
    num_round=100,
)

# --- Define the data input channels ---
s3_processed_train_path = f"s3://{bucket}/{prefix}/processed/train"
s3_processed_validation_path = f"s3://{bucket}/{prefix}/processed/validation"

train_data = sagemaker.inputs.TrainingInput(
    s3_processed_train_path,
    content_type="text/csv"
)

validation_data = sagemaker.inputs.TrainingInput(
    s3_processed_validation_path,
    content_type="text/csv"
)

data_channels = {"train": train_data, "validation": validation_data}

# --- Start the training job ---
train_args = sm_estimator.fit(inputs=data_channels, job_name=job_name, logs=True)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


CPU times: user 72.6 ms, sys: 0 ns, total: 72.6 ms
Wall time: 118 ms




In [10]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

## Define a Model Evaluation Step to Evaluate the Trained Model

In [14]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    # Download Prediction Output
    print("\nDownloading prediction results from S3")
    # Define the name of the file that was submitted to the batch transform job
    batch_input_filename = 'batch_inference_data.csv'
    # SageMaker names the output file based on the input file, adding ".out"
    output_filename_on_s3 = f"{batch_input_filename}.out"
    output_s3_path = f"{sm_transformer.output_path}/{output_filename_on_s3}"
    
    # Use the AWS CLI to download the file
    !aws s3 cp {output_s3_path} .
    
    # Merge Predictions with Product Details and Analyze
    print("\nMerging all data to generate final recommendations")
    # Load the downloaded predictions
    predictions_df = pd.read_csv(output_filename_on_s3, header=None)
    predictions_df.columns = ['purchase_probability']
    
    # Merge predictions with the user/product IDs from the recreated batch data
    results_df = pd.concat([original_batch_data[['user_id', 'product_id']], predictions_df], axis=1)
    
    # Merge the results with the product details from the lookup file we created
    final_recommendations_df = pd.merge(
        results_df,
        products_df,
        on='product_id',
        how='left'
    )
    
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(final_recommendations_df.to_json())

Writing code/evaluation.py


In [15]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-commerce-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)



In [16]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="CommerceEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

In [25]:
from sagemaker.model import Model

model_name_from_colleague = "xgb-recommender-2025-10-12-01-34-42"

# Create a SageMaker Model object referencing the existing model
model = sagemaker.Model(
     image_uri=image,
     name=model_name_from_colleague,
     sagemaker_session=pipeline_session,
     role = role
)

In [26]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="CommerceCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium")
)



## Define a Transform Step to Perform Batch Transformation


In [27]:
# 1. Create a Transformer from your trained estimator
# sm_estimator is the variable from your training job
sm_transformer = sm_estimator.transformer(
    model_name=step_create_model.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{bucket}/{prefix}/batch-output" # Define where to save predictions
)



In [30]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="CommerceTransform", transformer=sm_transformer, inputs=TransformInput(data=batch_data)
)

## Define a Register Model Step to Create a Model Package

In [31]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [32]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="CommmerceMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [33]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="CommerceCond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions


In [34]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [35]:
pipeline.upsert(role_arn=role)

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:669784505207:pipeline/AbalonePipeline',
 'ResponseMetadata': {'RequestId': 'c43ff18a-94a5-4691-b626-be017019484b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c43ff18a-94a5-4691-b626-be017019484b',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Thu, 16 Oct 2025 18:56:14 GMT'},
  'RetryAttempts': 0}}

In [36]:
execution = pipeline.start()

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


In [37]:
execution.wait()

In [38]:
execution.list_steps()

[{'StepName': 'AbaloneEval',
  'StartTime': datetime.datetime(2025, 10, 16, 19, 1, 26, 140000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 10, 16, 19, 1, 26, 140000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': "ClientError: Invalid property reference Steps.CommerceProcess.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri. Key ['test'] on property 'ProcessingOutputConfig.Outputs' does not exist.",
  'Metadata': {},
  'AttemptCount': 1},
 {'StepName': 'AbaloneTrain',
  'StartTime': datetime.datetime(2025, 10, 16, 18, 56, 22, 554000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 10, 16, 19, 1, 14, 383000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:669784505207:training-job/pipelines-cp0inkh47gfu-AbaloneTrain-8ulTtNwpdR'}},
  'AttemptCount': 1},
 {'StepName': 'CommerceProcess',
  'StartTime': datetime.datetime(2025, 10, 16, 18, 56, 22, 554000, tzinfo=tzlocal()),
  'EndTime'