# SageMaker Pipeline Blueprint

⚠️ **IMPORTANT: This notebook contains EXAMPLE/PLACEHOLDER credentials only.**

All AWS account IDs (497487485332), ARNs, security groups, and other sensitive-looking values are fictional examples. Replace them with your actual AWS resources before use.

See `SECURITY.md` for details.

In [None]:
from datetime import datetime

import boto3
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.network import NetworkConfig
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import (
    ParameterFloat,
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, TrainingStep

import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor


In [None]:
# Configuration
# ⚠️ REPLACE ALL VALUES BELOW WITH YOUR ACTUAL AWS RESOURCES
# These are EXAMPLE/PLACEHOLDER values only!

region = "us-east-1"  # Your AWS region
role = "arn:aws:iam::497487485332:role/service-role/AmazonSageMaker-ExecutionRole"  # EXAMPLE - Replace with your IAM role
bucket = "my-s3-bucket"  # EXAMPLE - Replace with your S3 bucket
base_prefix = "data"
job_id = f"pipe-{datetime.now().strftime('%Y%m%d%H%M')}"

# MLflow configuration (if using SageMaker MLflow tracking server)
mlflow_tracking_uri = "arn:aws:sagemaker:us-east-1:497487485332:mlflow-tracking-server/sagemaker-mlflow-3"  # EXAMPLE - Replace with your tracking server
mlflow_bucket = "sagemaker-us-east-1-497487485332"  # EXAMPLE - Replace with your bucket
mlflow_experiment_path = "mlflow-experiments-3/1"

# Create SageMaker session
sm_session = sagemaker.Session(boto3.Session(region_name=region))

# KMS key for encryption (optional)
kms_key = "arn:aws:kms:us-east-1:497487485332:key/1234abcd-12ab-34cd-56ef-1234567890ab"  # EXAMPLE - Replace with your KMS key

# Security groups (EXAMPLE - Replace with your VPC resources)
network_config = NetworkConfig(
    security_group_ids=["sg-a9e76d60e03a174ff"],  # EXAMPLE security group
    subnets=[
        "subnet-f28751aa8ad2ad51a",  # EXAMPLE subnet
        "subnet-0a26e0df316b74f23",  # EXAMPLE subnet
        "subnet-ccca708d00e55a2bb",  # EXAMPLE subnet
    ],
)
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="1 day",
)

# Pipeline parameters
model_version = ParameterString(name="ModelVersion", default_value="v1.1d")
date_filter = ParameterString(
    name="DateFilter", default_value="reporting_qdate = DATE '2025-01-01'"
)
test_size = ParameterFloat(name="TestSize", default_value=0.3)
sampling_ratio = ParameterFloat(name="SamplingRatio", default_value=0.1)
iterations = ParameterInteger(name="Iterations", default_value=357)
learning_rate = ParameterFloat(name="LearningRate", default_value=0.13)
depth = ParameterInteger(name="Depth", default_value=4)
early_stop_rounds = ParameterInteger(name="EarlyStopRounds", default_value=43)
instance_type = ParameterString(name="InstanceType", default_value="ml.m6i.8xlarge")
instance_count = ParameterInteger(name="InstanceCount", default_value=4)
batch_size = ParameterInteger(name="BatchSize", default_value=10000)

## Data Preparation Step

In [None]:
data_prep = ProcessingStep(
    name="DataPreparation",
    processor=ScriptProcessor(
        command=["python3"],
        image_uri="497487485332.dkr.ecr.us-east-1.amazonaws.com/catboost-mlflow-3-container:latest",
        role=role,
        instance_count=1,
        instance_type="ml.m6i.16xlarge",
        sagemaker_session=sm_session,
    ),
    code="../data_preparation.py",
    job_arguments=[
        "--bucket",
        bucket,
        "--base_prefix",
        base_prefix,
        "--out_prefix",
        f"{base_prefix}/ml-dataset",
        "--crawler_name",
        "ML_DATASET",
        "--custom_date_filter",
        date_filter,
        "--run-glue-job",
        "--test-size",
        test_size.to_string(),
        "--sampling-ratio",
        sampling_ratio.to_string(),
        "--output-prefix",
        f"{base_prefix}/ml-dataset-stratified/{job_id}",
        "--job-id",
        job_id,
    ],
    cache_config=cache_config,
)

## Training Step

In [None]:
# Define the estimator
estimator = Estimator(
    image_uri="497487485332.dkr.ecr.us-east-1.amazonaws.com/catboost-mlflow-3-container:latest",
    role=role,
    instance_count=1,
    instance_type="ml.m6i.32xlarge",
    sagemaker_session=sm_session,
    entry_point="train.py",
    source_dir="../",
    environment={"MLFLOW_TRACKING_URI": mlflow_tracking_uri},
    output_path=f"s3://{bucket}/{base_prefix}/ml-artifacts",
)
# Set the hyperparameters exactly as train.py expects them
estimator.set_hyperparameters(
    model_version=model_version,
    iterations=iterations,
    learning_rate=learning_rate,
    depth=depth,
    early_stopping_rounds=early_stop_rounds,
)

# Create the TrainingStep
training = TrainingStep(
    name="ModelTraining",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=f"s3://{bucket}/{base_prefix}/ml-dataset-stratified/{job_id}/train",
            content_type="application/x-parquet",
        ),
        "test": TrainingInput(
            s3_data=f"s3://{bucket}/{base_prefix}/ml-dataset-stratified/{job_id}/test",
            content_type="application/x-parquet",
        ),
    },
    depends_on=[data_prep],
    cache_config=cache_config,
)

## Inference Step

In [None]:
inference = ProcessingStep(
    name="BatchInference",
    processor=ScriptProcessor(
        command=["python3"],
        image_uri="497487485332.dkr.ecr.us-east-1.amazonaws.com/ray-inference-container:latest",
        role=role,
        instance_count=4,
        instance_type="ml.m6i.8xlarge",
        volume_size_in_gb=100,
        sagemaker_session=sm_session,
        env={"MLFLOW_TRACKING_URI": mlflow_tracking_uri},
        output_kms_key=kms_key,  # for SSE encryption
    ),
    code="../batch_inference.py",
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket}/{base_prefix}/ml-dataset",
            destination="/opt/ml/processing/input",
        ),
        ProcessingInput(  # model.tar.gz from training step
            source=training.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model-artifacts",
        ),
        ProcessingInput(  # output.tar.gz (MLflow metadata + model info)
            source=Join(
                on="",
                values=[
                    f"s3://{bucket}/{base_prefix}/ml-artifacts/",
                    training.properties.TrainingJobName,
                    "/output/",
                ],
            ),
            destination="/opt/ml/processing/mlflow",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="inference_results",
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket}/{base_prefix}/outputs/inference/{job_id}",
        ),
        ProcessingOutput(
            output_name="inference_metadata",
            source="/opt/ml/processing/output_metadata",
            destination=f"s3://{bucket}/{base_prefix}/outputs/inference/{job_id}/metadata",
        ),
    ],
    job_arguments=[
        "--data-path",
        "/opt/ml/processing/input",
        "--mlflow-metadata",
        "/opt/ml/processing/mlflow",
        "--model-version",
        model_version,
        "--output-path",
        "/opt/ml/processing/output",
        "--region",
        region,
        "--bucket",
        bucket,
        "--glue-crawler",
        "BATCH_INFERENCE",
        "--glue-database",
        "scoring_service",
        "--glue-table",
        "model_scores_aud",
    ],
    depends_on=[training],
)

## Load Step

In [None]:
load = ProcessingStep(
    name="RedshiftLoad",
    processor=ScriptProcessor(
        command=["python3"],
        image_uri="497487485332.dkr.ecr.us-east-1.amazonaws.com/ray-inference-container:latest",
        role=role,
        instance_count=1,
        instance_type="ml.m6i.large",
        sagemaker_session=sm_session,
        env={"AWS_DEFAULT_REGION": region},
        network_config=network_config,
    ),
    code="../s3_to_redshift.py",
    inputs=[
        ProcessingInput(
            source=inference.properties.ProcessingOutputConfig.Outputs[
                "inference_metadata"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/metadata",
        )
    ],
    job_arguments=[
        "--metadata-path",
        "/opt/ml/processing/metadata/metadata.json",
        "--redshift-schema",
        "scoring_service",
        "--redshift-table",
        "model_scores_aud",
        "--redshift-iam-role",
        "arn:aws:iam::497487485332:role/AWSGlueServiceRoleDefault",
        "--job-name",
        f"redshift-load-job-{job_id}",
    ],
    depends_on=[inference],
)

## Pipeline

In [None]:
pipeline = Pipeline(
    name=f"mlflow-training-pipeline-{job_id}",
    parameters=[
        model_version,
        date_filter,
        test_size,
        sampling_ratio,
        iterations,
        learning_rate,
        depth,
        early_stop_rounds,
        instance_type,
        instance_count,
        batch_size,
    ],
    steps=[data_prep, training, inference, load],
    sagemaker_session=sm_session,
)

# Create or update the pipeline
pipeline.upsert(role_arn=role)

# Start pipeline execution using default parameters
execution = pipeline.start()

print(f"✅ Pipeline started: {execution.arn}")
print("Monitor execution in SageMaker Studio or via AWS Console")