In [1]:
import os
import boto3
import sagemaker
import tempfile
import shutil
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.estimator import Estimator
from sagemaker.workflow.parameters import ParameterString, ParameterFloat
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.functions import JsonGet, Join
from sagemaker.workflow.properties import PropertyFile
from datetime import datetime
import time


# Configuration Parameters
bucket_name = "arxiv-project-bucket"
role = "arn:aws:iam::221082214706:role/MYLabRole"
region = "us-east-1"
sm_session = sagemaker.Session(boto_session=boto3.Session(region_name=region))

# Pipeline parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
quality_threshold = ParameterFloat(name="QualityThreshold", default_value=0.05)



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [2]:
'''
Below is a helper function to copy all files and directories from parent directory
into a temporary directory to avoid a lost+found directory error implicit to AWS system. 
In the end, we will delete the temporary directory. 
'''

def prepare_source_dir(src_dir, exclude_dirs=None):
    if exclude_dirs is None:
        exclude_dirs = []
    temp_dir = tempfile.mkdtemp()
    for item in os.listdir(src_dir):
        if item in exclude_dirs:
            continue
        src_path = os.path.join(src_dir, item)
        dest_path = os.path.join(temp_dir, item)
        if os.path.isdir(src_path):
            shutil.copytree(src_path, dest_path, symlinks=False)
        else:
            shutil.copy2(src_path, dest_path)
    return temp_dir

In [3]:
# Preprocessing portion of pipeline beginning with initial .json file
preprocess_processor = ScriptProcessor(
    role=role,
    image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1", py_version="py3"),
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="preprocess-job",
    sagemaker_session=sm_session,
)
preprocess_step = ProcessingStep(
    name="PreprocessData",
    processor=preprocess_processor,
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket_name}/arxiv-metadata-oai-snapshot.json",
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket_name}/processed_csv/"
        )
    ],
    code="preprocess.py",   # Script in current (flat) directory
)

In [4]:
# Create a temporary source directory for training to avoid aforementioned error
temp_source_dir = prepare_source_dir(".", exclude_dirs=["lost+found"])
print("Temporary source directory for training created at:", temp_source_dir)

sklearn_image_uri = sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1", py_version="py3")
estimator = Estimator(
    image_uri=sklearn_image_uri,
    role=role,
    instance_count=1,
    instance_type=training_instance_type,
    output_path=f"s3://{bucket_name}/models/",
    sagemaker_session=sm_session,
    entry_point="train.py",  # our training script
    source_dir=temp_source_dir  # Temporary directory
)
estimator.set_hyperparameters(num_clusters=3, random_state=39)
training_step = TrainingStep(
    name="TrainModel",
    estimator=estimator,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=Join(on="", values=[f"s3://{bucket_name}/processed_csv/train/"]),
            content_type="text/csv"
        )
    }
)

Temporary source directory for training created at: /tmp/tmpiq5d6coc


In [5]:

# Model Evaluation
'''
Here we run the custom_baseline.py once more to compute the silhouette score
'''
evaluation_processor = ScriptProcessor(
    role=role,
    image_uri=sklearn_image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="evaluation-job",
    sagemaker_session=sm_session,
)
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=f"s3://{bucket_name}/model_monitoring/baseline/",
            destination="/opt/ml/processing/baseline"
        )
    ],
    outputs=[
    ProcessingOutput(
        source="/opt/ml/processing/evaluation",
        destination=f"s3://{bucket_name}/evaluation/",
        output_name="evaluation"
        )
    ],
    code="custom_baseline.py",
    property_files=[
         PropertyFile(
              name="EvaluationReport",
              output_name="evaluation",  # Must match the output name in ProcessingOutput if applicable
              path="evaluation.json"
         )
    ]
)

In [6]:
# Conditional Quality Check
# Compare the silhouette score from evaluation.json against the quality_threshold to see if it passes
condition_step = ConditionStep(
    name="CheckQuality",
    conditions=[
        ConditionLessThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file="EvaluationReport",
                json_path="regression_metrics.silhouette_score.value"
            ),
            right=quality_threshold
        )
    ],
    if_steps=[],
    else_steps=[FailStep(name="QualityFail", error_message="Silhouette score did not meet threshold.")]
)

In [7]:
# Data Drift Monitoring
# Run custom_drift.py to compute drift metrics from captured data and see if in violation
drift_processor = ScriptProcessor(
    role=role,
    image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1", py_version="py3"),
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="drift-job",
    sagemaker_session=sm_session,
)
drift_monitor_step = ProcessingStep(
    name="DataDriftMonitor",
    processor=drift_processor,
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket_name}/captured_data/",  # Endpoint data capture location
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket_name}/drift_evaluation/"
        )
    ],
    code="custom_drift.py"
)

In [8]:
# Building and executing the pipeline, with model registration later
# Choice behind model registration afterwards is due to unusual problems requiring manual registration
pipeline_steps = [
    preprocess_step,
    training_step,
    evaluation_step,
    condition_step,
    drift_monitor_step
]

pipeline = Pipeline(
    name="ArxivClusteringPipeline",
    parameters=[processing_instance_type, training_instance_type, quality_threshold],
    steps=pipeline_steps,
    sagemaker_session=sm_session
)

# pipeline definition for review
pipeline_definition = pipeline.definition()
print("Pipeline Definition:\n", pipeline_definition)

try:
    # create/update the pipeline
    pipeline.upsert(role_arn=role)

    # Start pipeline execution and wait
    execution = pipeline.start()
    execution.wait()

    print("Pipeline execution complete.")
except Exception as e:
    print("")

print("Pipeline execution complete.")

Pipeline Definition:
 {"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "QualityThreshold", "Type": "Float", "DefaultValue": 0.05}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "PreprocessData", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig": {"InstanceType": {"Get": "Parameters.ProcessingInstanceType"}, "InstanceCount": 1, "VolumeSizeInGB": 30}}, "AppSpecification": {"ImageUri": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", "ContainerEntrypoint": ["python3", "/opt/ml/processing/input/code/preprocess.py"]}, "RoleArn": "arn:aws:iam::221082214706:role/MYLabRole", "ProcessingInputs": [{"InputName": "input-1", "AppManaged": false


Pipeline execution complete.


In [9]:
# Help debug/review
steps = execution.list_steps()
for step in steps:
    print(step)

{'StepName': 'TrainModel', 'StartTime': datetime.datetime(2025, 3, 4, 1, 47, 22, 587000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2025, 3, 4, 1, 50, 16, 324000, tzinfo=tzlocal()), 'StepStatus': 'Failed', 'FailureReason': 'ClientError: AlgorithmError: framework error: \nTraceback (most recent call last):\n  File "/miniconda3/lib/python3.9/site-packages/sagemaker_containers/_trainer.py", line 84, in train\n    entrypoint()\n  File "/miniconda3/lib/python3.9/site-packages/sagemaker_sklearn_container/training.py", line 39, in main\n    train(environment.Environment())\n  File "/miniconda3/lib/python3.9/site-packages/sagemaker_sklearn_container/training.py", line 31, in train\n    entry_point.run(uri=training_environment.module_dir,\n  File "/miniconda3/lib/python3.9/site-packages/sagemaker_training/entry_point.py", line 108, in run\n    return runner.get(runner_type, user_entry_point, args, env_vars, extra_opts).run(\n  File "/miniconda3/lib/python3.9/site-packages/sagemaker_trainin

In [10]:
'''
Since everything is running, we will clean up the temporary directory after the job is packaged 
and delete it after pipeline.upsert(), since pipeline steps package the code at that time.
'''
shutil.rmtree(temp_source_dir)
print("Temporary source directory removed.")

Temporary source directory removed.


In [13]:
# Manual Model Registration (Post pipeline)
# Retrieve the model artifact S3 URI from the training step.
# We use the .expr attribute to get its JSON expression.
from sagemaker.workflow.functions import Join
model_artifact_uri = training_step.properties.ModelArtifacts.S3ModelArtifacts.expr

sm_client = boto3.client("sagemaker", region_name=region)
model_name = "arxiv-model-" + datetime.utcnow().strftime("%Y%m%d%H%M%S")
print("Registering model with name:", model_name)

try:
    response = sm_client.create_model(
        ModelName=model_name,
        ExecutionRoleArn=role,
        PrimaryContainer={
            "Image": sklearn_image_uri,
            "ModelDataUrl": model_artifact_uri
        }
    )
    print("Model registered. ARN:", response["ModelArn"])
except Exception as e:
    print(" ")

Registering model with name: arxiv-model-20250304023251
 
