In [13]:
# =====================================================================
# STEP 1 — Imports and Configuration
# =====================================================================

import sagemaker
import boto3
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost import XGBoost
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import TrainingStep
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.execution_variables import ExecutionVariables

region = boto3.Session().region_name
bucket = "ml-pipeline-project-aniolmg"
project_name = "titanic"
train_file = "titanic_train.csv"
test_file = "titanic_test.csv"
train_s3 = f"s3://{bucket}/data/{train_file}"

pipeline_session = PipelineSession(default_bucket=bucket)

sm_client = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
output_s3 = f"s3://{bucket}/{project_name}/output/"

print(f"✅ Using region: {region}")
print(f"✅ Output S3: {output_s3}")

✅ Using region: eu-west-3
✅ Output S3: s3://ml-pipeline-project-aniolmg/titanic/output/


In [14]:
# =====================================================================
# STEP 2 — Define and Configure the XGBoost Estimator
# =====================================================================

xgb_estimator = XGBoost(
    entry_point="train_model.py",
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    framework_version="1.7-1",
    py_version="py3",
    output_path=f"s3://{bucket}/{project_name}/output/",
    base_job_name=f"{project_name}-train",
    hyperparameters={
        "max_depth": 8,
        "eta": 0.3,
        "objective": "binary:logistic",
        "num_round": 200,
        "bucket": bucket,
        "train_file": train_file,
    },
    sagemaker_session=pipeline_session,
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.large.


In [15]:
# =====================================================================
# STEP 3 — Create Training, Metrics, and Registration Steps
# =====================================================================

# --- Training step ---
train_step = TrainingStep(
    name="TrainTitanicModel",
    estimator=xgb_estimator,
    inputs={"train": TrainingInput(train_s3, content_type="csv")},
)

# --- Define a ScriptProcessor for computing metrics ---
script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.7-1",
        py_version="py3",
    ),
    command=["python3"],
    instance_type="ml.t3.medium",
    instance_count=1,
    role=role,
)

# --- ProcessingStep to compute metrics ---

metrics_output_path = Join(
    on="/",
    values=[
        f"s3://{bucket}/{project_name}/metrics",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ],
)

metrics_step = ProcessingStep(
    name="ComputeTitanicMetrics",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=f"s3://{bucket}/data/{test_file}",
            destination="/opt/ml/processing/data",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="metrics",
            source="/opt/ml/processing/metrics",
            destination=metrics_output_path,
        )
    ],
    code="compute_metrics.py",
    job_arguments=[
        "--input-model", "/opt/ml/processing/model",
        "--input-data", f"/opt/ml/processing/data/{test_file}",
        "--output-metrics", "/opt/ml/processing/metrics"
    ],
    depends_on=[train_step],
)

# --- Define model metrics using the ProcessingStep output ---
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on="/",
            values=[
                metrics_step.properties.ProcessingOutputConfig.Outputs["metrics"].S3Output.S3Uri,
                "metrics.json"
            ],
        ),
        content_type="application/json",
    )
)

# --- Register the model using the metrics from ProcessingStep ---
model_package_group_name = "TitanicModel"

register_step = RegisterModel(
    name="RegisterTitanicModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status="PendingManualApproval",
    model_metrics=model_metrics,
    depends_on=[metrics_step],
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [16]:
# =====================================================================
# STEP 4 — Define and Execute SageMaker Pipeline
# =====================================================================

pipeline = Pipeline(
    name="TitanicPipeline",
    steps=[train_step, metrics_step, register_step],
    sagemaker_session=pipeline_session,
)

print("⏳ Starting pipeline...")

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()


print("✅ Pipeline executed successfully!")



⏳ Starting pipeline...




✅ Pipeline executed successfully!
