# SageMaker Pipelines with SDK v3

This notebook demonstrates how to build **MLOps pipelines** using the new **SageMaker Python SDK v3** unified APIs.

## What You'll Learn

1. **SDK v3 Pipeline Imports** - New module structure for pipelines
2. **Pipeline Components** - Parameters, steps, conditions, and properties
3. **Step Types** - Processing, Training, Evaluation, Model Registration
4. **Quality Gates** - Conditional model registration based on metrics
5. **Model Registry** - Version control for production models

## Why SageMaker Pipelines?

| Challenge | Solution |
|-----------|----------|
| Manual ML workflows | Automated, reproducible pipelines |
| No quality control | Metric-based gates before production |
| Model versioning chaos | Integrated Model Registry |
| Expensive reruns | Step caching (skip unchanged steps) |

---

## 1. Setup and SDK v3 Imports

### SDK v3 Module Structure

SDK v3 reorganized imports into logical modules:

| Module | Purpose |
|--------|--------|
| `sagemaker.mlops.workflow` | Pipeline, steps, conditions |
| `sagemaker.core.workflow` | Parameters, properties, functions |
| `sagemaker.train` | ModelTrainer, training configs |
| `sagemaker.core.processing` | ScriptProcessor, Processing I/O |
| `sagemaker.serve` | ModelBuilder for deployment/registration |

In [None]:
# Install required packages
!pip install "sagemaker>=3.0.0" boto3 pandas scikit-learn xgboost joblib --quiet

In [None]:
# =============================================================================
# SDK v3.4.1 BUG WORKAROUND
# =============================================================================
# There's a bug in SDK v3.4.1 where get_training_code_hash() passes
# requirements (str) as dependencies (expects List[str]).
# This patch must be applied BEFORE other sagemaker imports.
# Bug tracked at: https://github.com/aws/sagemaker-python-sdk/issues/5518

import sagemaker.core.workflow.utilities as _workflow_utils

_original_get_training_code_hash = _workflow_utils.get_training_code_hash

def _patched_get_training_code_hash(entry_point, source_dir, dependencies):
    """Patched version that handles string/None dependencies."""
    if dependencies is None:
        dependencies = []
    elif isinstance(dependencies, str):
        dependencies = [dependencies] if dependencies else []
    return _original_get_training_code_hash(entry_point, source_dir, dependencies)

_workflow_utils.get_training_code_hash = _patched_get_training_code_hash
print("SDK v3.4.1 bug workaround applied")

In [None]:
# =============================================================================
# SDK v3 PIPELINE IMPORTS
# =============================================================================

# Pipeline and steps (mlops module)
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.mlops.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.mlops.workflow.condition_step import ConditionStep
from sagemaker.mlops.workflow.fail_step import FailStep
from sagemaker.mlops.workflow.model_step import ModelStep

# Parameters, conditions, and functions (core.workflow module)
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.core.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.core.workflow.properties import PropertyFile
from sagemaker.core.workflow.functions import JsonGet

# Training (train module)
from sagemaker.train.model_trainer import ModelTrainer
from sagemaker.train.configs import InputData, Compute, SourceCode

# Processing (core.processing module)
from sagemaker.core.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.core.shapes.shapes import ProcessingS3Input, ProcessingS3Output

# Model registration (serve module)
from sagemaker.serve.model_builder import ModelBuilder

# Session and utilities
from sagemaker.core.helper.session_helper import Session, get_execution_role
from sagemaker.core.image_uris import retrieve

print("SDK v3 pipeline imports loaded successfully")

In [None]:
import boto3
import json
from datetime import datetime

# Initialize sessions
sagemaker_session = Session()          # For regular operations
pipeline_session = PipelineSession()   # For pipeline definition (deferred execution)

region = sagemaker_session.boto_region_name
bucket = sagemaker_session.default_bucket()

# Get execution role
def get_sagemaker_role():
    """Get a valid SageMaker execution role."""
    iam = boto3.client('iam')
    try:
        roles = iam.list_roles()['Roles']
        sm_roles = [r for r in roles 
                    if 'SageMaker' in r['RoleName'] 
                    and 'Execution' in r['RoleName']]
        if sm_roles:
            return sm_roles[0]['Arn']
    except Exception:
        pass
    return get_execution_role()

role = get_sagemaker_role()

print(f"Region: {region}")
print(f"Bucket: {bucket}")
print(f"Role: {role.split('/')[-1]}")

## 2. Pipeline Concepts

### Key Components

```
Pipeline
├── Parameters          # Runtime inputs (instance types, thresholds, S3 paths)
├── Steps               # Processing, Training, Evaluation, Model Registration
│   ├── ProcessingStep  # Data preprocessing, evaluation scripts
│   ├── TrainingStep    # Model training with ModelTrainer
│   ├── ConditionStep   # Quality gates (if/else branching)
│   ├── ModelStep       # Model registration to Model Registry
│   └── FailStep        # Explicit failure with error message
└── Properties          # Step outputs (S3 URIs, metrics) for chaining
```

### PipelineSession vs Session

| Session Type | When Used | Behavior |
|--------------|-----------|----------|
| `Session()` | Regular operations | Executes immediately |
| `PipelineSession()` | Pipeline definition | **Deferred** - captures args, doesn't execute |

## 3. Define Pipeline Parameters

Parameters make pipelines **reusable** - change inputs without modifying code.

In [None]:
# =============================================================================
# PIPELINE PARAMETERS
# =============================================================================
# Parameters are resolved at pipeline execution time, not definition time.
# This allows reusing the same pipeline with different configurations.

# Infrastructure parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge",
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge",
)

# Data parameters
input_data_uri = ParameterString(
    name="InputDataUri",
    default_value=f"s3://{bucket}/pipeline-demo/data/",
)

# Hyperparameters
max_depth = ParameterInteger(name="MaxDepth", default_value=6)
n_estimators = ParameterInteger(name="NEstimators", default_value=100)
learning_rate = ParameterFloat(name="LearningRate", default_value=0.1)

# Quality gate threshold
r2_threshold = ParameterFloat(
    name="R2Threshold",
    default_value=0.85,  # Model must achieve R² >= 0.85 to be registered
)

# Model approval status
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval",  # Options: Approved, Rejected, PendingManualApproval
)

print("Pipeline parameters defined:")
print(f"  - ProcessingInstanceType (default: ml.m5.xlarge)")
print(f"  - TrainingInstanceType (default: ml.m5.xlarge)")
print(f"  - InputDataUri (default: s3://{bucket}/pipeline-demo/data/)")
print(f"  - MaxDepth (default: 6)")
print(f"  - NEstimators (default: 100)")
print(f"  - LearningRate (default: 0.1)")
print(f"  - R2Threshold (default: 0.85)")
print(f"  - ModelApprovalStatus (default: PendingManualApproval)")

## 4. Configure Container Image and Caching

In [None]:
# Get SKLearn container (used for all steps - XGBoost installed via requirements.txt)
sklearn_image = retrieve(
    framework="sklearn",
    version="1.2-1",
    region=region,
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
print(f"Container: {sklearn_image.split('/')[-1]}")

# =============================================================================
# STEP CACHING
# =============================================================================
# CacheConfig enables step caching - if inputs haven't changed, skip re-execution.
# This dramatically speeds up pipeline reruns during development.

cache_config = CacheConfig(
    enable_caching=True,
    expire_after="P30D",  # ISO 8601 duration: 30 days
)
print(f"Caching enabled: 30 days")

## 5. Step 1: Preprocessing

The preprocessing step:
1. Loads raw data from S3
2. Cleans and transforms features
3. Creates train/test split
4. Outputs to S3 for downstream steps

In [None]:
%%writefile ../pipeline/preprocessing.py
"""Preprocessing Script for SageMaker Pipeline

Loads CSV/Parquet data, keeps numeric columns, fills NaN with 0,
and creates 80/20 train/test split.

Input: /opt/ml/processing/input/
Output: /opt/ml/processing/train/, /opt/ml/processing/test/
"""

import os
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.model_selection import train_test_split


def load_data(input_dir: str) -> pd.DataFrame:
    """Load all CSV/Parquet files from input directory."""
    input_path = Path(input_dir)

    csv_files = list(input_path.glob("*.csv"))
    parquet_files = list(input_path.glob("*.parquet"))

    if csv_files:
        df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)
        print(f"Loaded {len(csv_files)} CSV files")
    elif parquet_files:
        df = pd.concat([pd.read_parquet(f) for f in parquet_files], ignore_index=True)
        print(f"Loaded {len(parquet_files)} Parquet files")
    else:
        raise ValueError(f"No CSV or Parquet files found in {input_dir}")

    return df


def preprocess(df: pd.DataFrame, target_column: str = "production") -> pd.DataFrame:
    """Preprocess data: keep numeric columns, fill NaN with 0."""
    print(f"Raw data shape: {df.shape}")

    if target_column not in df.columns:
        raise ValueError(f"Target column '{target_column}' not found. Columns: {list(df.columns)}")

    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if target_column not in numeric_cols:
        numeric_cols.append(target_column)

    df_processed = df[numeric_cols].fillna(0)
    print(f"Processed data shape: {df_processed.shape}")
    return df_processed


def main():
    print("=" * 50)
    print("PREPROCESSING")
    print("=" * 50)

    input_dir = "/opt/ml/processing/input"
    train_dir = "/opt/ml/processing/train"
    test_dir = "/opt/ml/processing/test"

    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    df = load_data(input_dir)
    df_processed = preprocess(df, target_column="production")

    train_df, test_df = train_test_split(df_processed, test_size=0.2, random_state=42)
    print(f"Train: {len(train_df)}, Test: {len(test_df)}")

    train_df.to_csv(os.path.join(train_dir, "train.csv"), index=False)
    test_df.to_csv(os.path.join(test_dir, "test.csv"), index=False)

    print("PREPROCESSING COMPLETE!")


if __name__ == "__main__":
    main()

In [None]:
# =============================================================================
# PROCESSING STEP
# =============================================================================
# Note: We use PipelineSession, not regular Session.
# This captures the step configuration without executing it.

base_job_prefix = "pipeline-demo"

script_processor = ScriptProcessor(
    image_uri=sklearn_image,
    instance_type=processing_instance_type,  # Pipeline parameter!
    instance_count=1,
    base_job_name=f"{base_job_prefix}-preprocess",
    role=role,
    sagemaker_session=pipeline_session,  # PipelineSession for deferred execution
)

# Define the preprocessing step
preprocessing_step = ProcessingStep(
    name="Preprocess",
    step_args=script_processor.run(
        code="../pipeline/preprocessing.py",
        inputs=[
            ProcessingInput(
                input_name="input",
                s3_input=ProcessingS3Input(
                    s3_uri=input_data_uri,  # Pipeline parameter!
                    s3_data_type="S3Prefix",
                    local_path="/opt/ml/processing/input",
                ),
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                s3_output=ProcessingS3Output(
                    s3_uri=f"s3://{bucket}/{base_job_prefix}/processed/train",
                    s3_upload_mode="EndOfJob",
                    local_path="/opt/ml/processing/train",
                ),
            ),
            ProcessingOutput(
                output_name="test",
                s3_output=ProcessingS3Output(
                    s3_uri=f"s3://{bucket}/{base_job_prefix}/processed/test",
                    s3_upload_mode="EndOfJob",
                    local_path="/opt/ml/processing/test",
                ),
            ),
        ],
    ),
    cache_config=cache_config,
)

print("Preprocessing step configured")
print(f"  Input: {{InputDataUri parameter}}")
print(f"  Output: s3://{bucket}/{base_job_prefix}/processed/{{train,test}}")

## 6. Step 2: Training

The training step:
1. Uses `ModelTrainer` (SDK v3 unified API)
2. Reads preprocessed data from Step 1 output
3. Trains XGBoost model
4. Outputs model artifacts to S3

### Cross-Step Data Flow

```python
# Reference output from previous step using .properties
preprocessing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
```

In [None]:
# =============================================================================
# TRAINING STEP
# =============================================================================

# Configure source code (points to existing training script)
source_code = SourceCode(
    source_dir="../training",
    entry_script="train.py",
    # Note: requirements not specified due to SDK v3.4.1 bug
    # The requirements.txt in source_dir is still used at runtime
)

# Configure compute
compute = Compute(
    instance_type=training_instance_type,  # Pipeline parameter!
    instance_count=1,
)

# Create ModelTrainer with hyperparameters from pipeline parameters
model_trainer = ModelTrainer(
    role=role,
    training_image=sklearn_image,
    source_code=source_code,
    compute=compute,
    base_job_name=f"{base_job_prefix}-train",
    hyperparameters={
        "n-estimators": n_estimators,      # Pipeline parameter!
        "max-depth": max_depth,            # Pipeline parameter!
        "learning-rate": learning_rate,    # Pipeline parameter!
        "target-column": "production",
        "test-size": 0.0,  # No internal split - we already have train/test
    },
    sagemaker_session=pipeline_session,
)

# Define training step with cross-step data reference
training_step = TrainingStep(
    name="Train",
    step_args=model_trainer.train(
        input_data_config=[
            InputData(
                channel_name="train",
                # Reference output from preprocessing step!
                data_source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
            )
        ]
    ),
    cache_config=cache_config,
)

print("Training step configured")
print(f"  Input: {{Preprocess step output}}")
print(f"  Hyperparameters: n_estimators={{NEstimators}}, max_depth={{MaxDepth}}")

## 7. Step 3: Evaluation

The evaluation step:
1. Loads trained model from Step 2
2. Runs predictions on test data from Step 1
3. Calculates metrics (R², RMSE, MAE)
4. Outputs `evaluation.json` for the quality gate

### PropertyFile for Cross-Step Metrics

```python
# Define a PropertyFile to extract values from evaluation.json
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

# Later, use JsonGet to extract specific values
r2_value = JsonGet(
    step_name="Evaluate",
    property_file=evaluation_report,
    json_path="regression_metrics.r2.value",
)
```

In [None]:
%%writefile ../pipeline/evaluation.py
"""Model Evaluation Script for SageMaker Pipeline

Extracts model from model.tar.gz, runs predictions on test data,
and outputs evaluation.json with R2/RMSE/MAE metrics.

Input:
  - /opt/ml/processing/model/model.tar.gz
  - /opt/ml/processing/test/*.csv

Output:
  - /opt/ml/processing/evaluation/evaluation.json
"""

import os
import json
import tarfile
import pandas as pd
import numpy as np
import joblib
from pathlib import Path
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error


def extract_model(model_dir: str) -> str:
    """Extract model.tar.gz and return path to model.joblib."""
    tarball = os.path.join(model_dir, "model.tar.gz")

    if not os.path.exists(tarball):
        model_path = os.path.join(model_dir, "model.joblib")
        if os.path.exists(model_path):
            return model_path
        raise FileNotFoundError(f"No model.tar.gz or model.joblib in {model_dir}")

    print(f"Extracting {tarball}")
    with tarfile.open(tarball, "r:gz") as tar:
        tar.extractall(path=model_dir)

    return os.path.join(model_dir, "model.joblib")


def load_test_data(test_dir: str, target_column: str = "production"):
    """Load test data and separate features/target."""
    test_path = Path(test_dir)
    csv_files = list(test_path.glob("*.csv"))

    if not csv_files:
        raise ValueError(f"No CSV files found in {test_dir}")

    df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)
    print(f"Test data shape: {df.shape}")

    y = df[target_column]
    X = df.drop(columns=[target_column])
    return X, y


def save_evaluation(metrics: dict, output_dir: str):
    """Save evaluation.json in format expected by pipeline condition step.
    
    Format: {"regression_metrics": {"r2": {"value": 0.95}, ...}}
    This allows JsonGet to extract values like: regression_metrics.r2.value
    """
    os.makedirs(output_dir, exist_ok=True)

    evaluation = {
        "regression_metrics": {
            "r2": {"value": metrics["r2"]},
            "rmse": {"value": metrics["rmse"]},
            "mae": {"value": metrics["mae"]},
        }
    }

    output_path = os.path.join(output_dir, "evaluation.json")
    with open(output_path, "w") as f:
        json.dump(evaluation, f, indent=2)

    print(f"Saved evaluation to {output_path}")


def main():
    print("=" * 50)
    print("MODEL EVALUATION")
    print("=" * 50)

    model_dir = "/opt/ml/processing/model"
    test_dir = "/opt/ml/processing/test"
    output_dir = "/opt/ml/processing/evaluation"

    model_path = extract_model(model_dir)
    model = joblib.load(model_path)

    X, y = load_test_data(test_dir, target_column="production")
    y_pred = model.predict(X)

    metrics = {
        "r2": r2_score(y, y_pred),
        "rmse": np.sqrt(mean_squared_error(y, y_pred)),
        "mae": mean_absolute_error(y, y_pred),
    }

    print(f"R2: {metrics['r2']:.4f}")
    print(f"RMSE: {metrics['rmse']:.4f}")
    print(f"MAE: {metrics['mae']:.4f}")

    save_evaluation(metrics, output_dir)
    print("EVALUATION COMPLETE!")


if __name__ == "__main__":
    main()

In [None]:
# =============================================================================
# EVALUATION STEP
# =============================================================================

evaluation_processor = ScriptProcessor(
    image_uri=sklearn_image,
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}-evaluate",
    role=role,
    sagemaker_session=pipeline_session,
)

# Define PropertyFile to capture evaluation metrics
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

evaluation_step = ProcessingStep(
    name="Evaluate",
    step_args=evaluation_processor.run(
        code="../pipeline/evaluation.py",
        inputs=[
            # Model artifacts from training step
            ProcessingInput(
                input_name="model",
                s3_input=ProcessingS3Input(
                    s3_uri=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                    s3_data_type="S3Prefix",
                    local_path="/opt/ml/processing/model",
                ),
            ),
            # Test data from preprocessing step
            ProcessingInput(
                input_name="test",
                s3_input=ProcessingS3Input(
                    s3_uri=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                        "test"
                    ].S3Output.S3Uri,
                    s3_data_type="S3Prefix",
                    local_path="/opt/ml/processing/test",
                ),
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation",
                s3_output=ProcessingS3Output(
                    s3_uri=f"s3://{bucket}/{base_job_prefix}/evaluation",
                    s3_upload_mode="EndOfJob",
                    local_path="/opt/ml/processing/evaluation",
                ),
            ),
        ],
    ),
    property_files=[evaluation_report],  # Capture metrics for quality gate
    cache_config=cache_config,
)

print("Evaluation step configured")
print(f"  Model input: {{Training step model artifacts}}")
print(f"  Test input: {{Preprocess step test output}}")
print(f"  Metrics output: s3://{bucket}/{base_job_prefix}/evaluation/evaluation.json")

## 8. Step 4: Quality Gate (Conditional Registration)

The quality gate:
1. Extracts R² from evaluation.json using `JsonGet`
2. Compares against `R2Threshold` parameter
3. **If R² >= threshold**: Register model to Model Registry
4. **If R² < threshold**: Fail pipeline with error message

In [None]:
# =============================================================================
# MODEL REGISTRATION STEP (executed only if quality gate passes)
# =============================================================================

model_package_group_name = "PipelineDemoModels"

# Use ModelBuilder for model registration (SDK v3 pattern)
model_builder = ModelBuilder(
    image_uri=sklearn_image,
    s3_model_data_url=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role_arn=role,
    sagemaker_session=pipeline_session,
)

register_step = ModelStep(
    name="RegisterModel",
    step_args=model_builder.register(
        content_types=["text/csv", "application/json"],
        response_types=["application/json"],
        inference_instances=["ml.t3.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,  # Pipeline parameter!
    ),
)

print(f"Model registration step configured")
print(f"  Model Registry group: {model_package_group_name}")
print(f"  Approval status: {{ModelApprovalStatus parameter}}")

In [None]:
# =============================================================================
# FAIL STEP (executed if quality gate fails)
# =============================================================================

fail_step = FailStep(
    name="QualityGateFailed",
    error_message="Model R2 score is below the threshold. Model not registered.",
)

print("Fail step configured")

In [None]:
# =============================================================================
# CONDITION STEP (quality gate)
# =============================================================================

# Extract R² value from evaluation.json
# The JSON path must match the structure in evaluation.py:
# {"regression_metrics": {"r2": {"value": 0.95}}}

condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="regression_metrics.r2.value",  # Must match evaluation.json structure!
    ),
    right=r2_threshold,  # Pipeline parameter!
)

condition_step = ConditionStep(
    name="CheckR2Threshold",
    conditions=[condition],
    if_steps=[register_step],   # R² >= threshold: register model
    else_steps=[fail_step],     # R² < threshold: fail pipeline
)

print("Quality gate configured")
print(f"  Condition: R² >= {{R2Threshold parameter}}")
print(f"  If true: Register model to {model_package_group_name}")
print(f"  If false: Fail with error message")

## 9. Create the Pipeline

Now we assemble all steps into a pipeline.

In [None]:
# =============================================================================
# ASSEMBLE PIPELINE
# =============================================================================

pipeline_name = "SDKv3DemoPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        training_instance_type,
        input_data_uri,
        model_approval_status,
        max_depth,
        n_estimators,
        learning_rate,
        r2_threshold,
    ],
    steps=[
        preprocessing_step,
        training_step,
        evaluation_step,
        condition_step,  # Contains register_step and fail_step
    ],
    sagemaker_session=pipeline_session,
)

print(f"Pipeline '{pipeline_name}' assembled")
print(f"  Parameters: {len(pipeline.parameters)}")
print(f"  Steps: {len(pipeline.steps)}")

In [None]:
# Validate pipeline definition (generates JSON)
import json

definition = json.loads(pipeline.definition())

print("Pipeline definition:")
print(f"  Parameters: {[p['Name'] for p in definition.get('Parameters', [])]}")
print(f"  Steps:")
for step in definition.get('Steps', []):
    print(f"    - {step['Name']} ({step['Type']})")

## 10. Create Sample Data and Upload to S3

Before running the pipeline, we need sample data in S3.

In [None]:
import pandas as pd
import numpy as np

# Create synthetic data for demonstration
np.random.seed(42)
n_samples = 10000

# Features that affect production
gas_injection_rate = np.random.uniform(0.5, 3.0, n_samples)
wellhead_pressure = np.random.uniform(100, 500, n_samples)
temperature = np.random.uniform(50, 150, n_samples)
water_cut = np.random.uniform(0, 0.5, n_samples)

# Production with realistic relationships
production = (
    50  # Base production
    + 30 * np.tanh(gas_injection_rate)  # Non-linear gas lift response
    + 0.1 * wellhead_pressure            # Pressure effect
    - 20 * water_cut                      # Water cut penalty
    + np.random.normal(0, 5, n_samples)   # Noise
)
production = np.clip(production, 10, 150)

df = pd.DataFrame({
    'gas_injection_rate': gas_injection_rate,
    'wellhead_pressure': wellhead_pressure,
    'temperature': temperature,
    'water_cut': water_cut,
    'production': production,
})

print(f"Created synthetic data: {df.shape}")
print(f"Production range: {df['production'].min():.1f} - {df['production'].max():.1f}")
df.head()

In [None]:
# Upload to S3
from pathlib import Path

local_data_dir = Path('./data')
local_data_dir.mkdir(exist_ok=True)

data_file = local_data_dir / 'production_data.csv'
df.to_csv(data_file, index=False)

s3_client = boto3.client('s3')
s3_data_key = f"{base_job_prefix}/data/production_data.csv"
s3_data_uri = f"s3://{bucket}/{s3_data_key}"

s3_client.upload_file(str(data_file), bucket, s3_data_key)
print(f"Uploaded to: {s3_data_uri}")

## 11. Deploy and Run the Pipeline

In [None]:
# =============================================================================
# CREATE/UPDATE PIPELINE
# =============================================================================

# upsert() creates the pipeline if it doesn't exist, or updates it if it does
response = pipeline.upsert(role_arn=role)

print(f"Pipeline ARN: {response['PipelineArn']}")

In [None]:
# =============================================================================
# START PIPELINE EXECUTION
# =============================================================================

# Override default parameters for this execution
execution = pipeline.start(
    parameters={
        "InputDataUri": f"s3://{bucket}/{base_job_prefix}/data/",
        "R2Threshold": 0.7,  # Lower threshold for demo (model should pass)
        "MaxDepth": 8,
        "NEstimators": 150,
    }
)

print(f"Execution ARN: {execution.arn}")
print(f"\nMonitor in SageMaker Studio or run:")
print(f"  execution.describe()")
print(f"  execution.list_steps()")

In [None]:
# =============================================================================
# WAIT FOR COMPLETION (optional)
# =============================================================================

print("Waiting for pipeline execution to complete...")
print("(This typically takes 10-15 minutes for all steps)")

execution.wait()

status = execution.describe()["PipelineExecutionStatus"]
print(f"\nFinal status: {status}")

In [None]:
# =============================================================================
# VIEW STEP RESULTS
# =============================================================================

steps = execution.list_steps()

print("Pipeline execution steps:")
for step in steps["PipelineExecutionSteps"]:
    status = step["StepStatus"]
    emoji = "✅" if status == "Succeeded" else "❌" if status == "Failed" else "⏳"
    print(f"  {emoji} {step['StepName']}: {status}")

## 12. View Registered Model in Model Registry

In [None]:
# =============================================================================
# VIEW MODEL REGISTRY
# =============================================================================

sm_client = boto3.client('sagemaker')

# List model packages in the group
model_packages = sm_client.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    SortBy='CreationTime',
    SortOrder='Descending',
    MaxResults=5
)

print(f"Model packages in '{model_package_group_name}':")
for pkg in model_packages.get('ModelPackageSummaryList', []):
    print(f"  Version: {pkg['ModelPackageVersion']}")
    print(f"    Status: {pkg['ModelApprovalStatus']}")
    print(f"    Created: {pkg['CreationTime']}")
    print()

In [None]:
# =============================================================================
# APPROVE MODEL FOR PRODUCTION (optional)
# =============================================================================

# Uncomment to approve the latest model
# if model_packages.get('ModelPackageSummaryList'):
#     latest_arn = model_packages['ModelPackageSummaryList'][0]['ModelPackageArn']
#     sm_client.update_model_package(
#         ModelPackageArn=latest_arn,
#         ModelApprovalStatus="Approved"
#     )
#     print(f"Approved: {latest_arn}")

## 13. Test the Quality Gate

Run with a high threshold to see the quality gate fail.

In [None]:
# =============================================================================
# TEST QUALITY GATE FAILURE
# =============================================================================

# Run with threshold of 0.99 - model won't achieve this, so pipeline should fail
fail_execution = pipeline.start(
    parameters={
        "InputDataUri": f"s3://{bucket}/{base_job_prefix}/data/",
        "R2Threshold": 0.99,  # Unrealistically high - should fail
    }
)

print(f"Started execution with R2Threshold=0.99")
print(f"Execution ARN: {fail_execution.arn}")
print(f"\nThis execution should fail at the quality gate.")
print(f"Check status with: fail_execution.describe()")

## 14. Cleanup

In [None]:
# =============================================================================
# CLEANUP (uncomment to run)
# =============================================================================

# Delete pipeline
# pipeline.delete()
# print(f"Deleted pipeline: {pipeline_name}")

# Delete S3 data
# !aws s3 rm s3://{bucket}/{base_job_prefix} --recursive
# print(f"Deleted S3 data: s3://{bucket}/{base_job_prefix}")

print("Cleanup commands (uncomment to run):")
print(f"  pipeline.delete()")
print(f"  aws s3 rm s3://{bucket}/{base_job_prefix} --recursive")

## Summary

### SDK v3 Pipeline Imports

```python
# Pipeline and steps
from sagemaker.mlops.workflow.pipeline import Pipeline
from sagemaker.mlops.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.mlops.workflow.condition_step import ConditionStep
from sagemaker.mlops.workflow.fail_step import FailStep
from sagemaker.mlops.workflow.model_step import ModelStep

# Parameters and conditions
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.core.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.core.workflow.properties import PropertyFile
from sagemaker.core.workflow.functions import JsonGet

# Training
from sagemaker.train.model_trainer import ModelTrainer
from sagemaker.train.configs import InputData, Compute, SourceCode

# Processing
from sagemaker.core.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.core.shapes.shapes import ProcessingS3Input, ProcessingS3Output

# Model registration
from sagemaker.serve.model_builder import ModelBuilder
```

### Key Patterns

| Pattern | Description |
|---------|-------------|
| `PipelineSession()` | Deferred execution for pipeline definition |
| `step.properties.X` | Reference outputs from previous steps |
| `PropertyFile` + `JsonGet` | Extract values from JSON for conditions |
| `CacheConfig` | Skip unchanged steps (30-day cache) |
| `ModelBuilder.register()` | Register model to Model Registry |

### Pipeline Flow

```
Preprocess → Train → Evaluate → [R² >= threshold?]
                                    ↓ Yes        ↓ No
                              RegisterModel   FailStep
```

### Known Issues (SDK v3.4.1)

- **Bug**: `get_training_code_hash()` expects `List[str]` but receives `str`
- **Workaround**: Monkey-patch at top of script (see cell 3)
- **Tracked**: https://github.com/aws/sagemaker-python-sdk/issues/5518