# Notebook 07: Pipeline Orchestration with SageMaker Pipelines


## Setup

In [2]:
import os
import sys
import json
import time
import warnings
warnings.filterwarnings('ignore')

import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.workflow.pipeline_context import PipelineSession

# Add project root to path
sys.path.append("..")

from config.config import (
    BUCKET_NAME,
    S3_PREFIX,
    AWS_REGION,
    TARGET_COLUMN
)

print(f"SageMaker SDK version: {sagemaker.__version__}")

sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/fatimatatanda/Library/Application Support/sagemaker/config.yaml
SageMaker SDK version: 2.256.0


In [3]:

boto_session = boto3.Session(region_name=AWS_REGION)

sagemaker_session = Session(boto_session=boto_session)
pipeline_session = PipelineSession(boto_session=boto_session)

sm_client = boto3.client('sagemaker', region_name=AWS_REGION)

# Get execution role
try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam_client = boto3.client('iam')
    role = iam_client.get_role(RoleName='sagemaker-execution-role')['Role']['Arn']

print(f"Region: {AWS_REGION}")
print(f"Role:   {role}")

Couldn't call 'get_role' to get Role ARN from role name fatimat-admin to get Role path.


Region: us-east-1
Role:   arn:aws:iam::306617143793:role/sagemaker-execution-role



## Define Pipeline Parameters

In [4]:
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterFloat,
    ParameterInteger,
)

# ── Tunable parameters ──
param_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.large"
)

param_prediction_length = ParameterInteger(
    name="PredictionLength",
    default_value=12
)

param_rmse_threshold = ParameterFloat(
    name="RMSEThreshold",
    default_value=0.30  # Register model only if RMSE ≤ this
)

print("Pipeline parameters defined:")
print(f"  InstanceType:      {param_instance_type.default_value}")
print(f"  PredictionLength:  {param_prediction_length.default_value}")
print(f"  RMSEThreshold:     {param_rmse_threshold.default_value}")

Pipeline parameters defined:
  InstanceType:      ml.m5.large
  PredictionLength:  12
  RMSEThreshold:     0.3


## Define the Preprocess Step

This step runs `src/preprocess.py` inside an SKLearn container.
It reads raw data from S3 and outputs DeepAR JSON files.

**How data flows:**

```
S3 raw parquet  ──→  /opt/ml/processing/input/
                         │
                    preprocess.py
                         │
                    /opt/ml/processing/output/
                         ├── train/train.json      ──→  S3 (auto)
                         ├── test/test.json        ──→  S3 (auto)
                         ├── inference/inference.json ──→ S3 (auto)
                         └── actuals/actuals.csv   ──→  S3 (auto)
```

## Create the processor

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# Create the processor (defines the container + resources)
sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type='ml.m5.large',
    instance_count=1,
    sagemaker_session=pipeline_session,  # Pipeline session — captures, doesn't run
)


## Define the Processing step

In [6]:

# Define the Processing step
raw_data_uri = f"s3://{BUCKET_NAME}/data/raw_parquet/state_month_full.parquet"

processor_args = sklearn_processor.run(
        inputs=[
            ProcessingInput(
                source=raw_data_uri,
                destination="/opt/ml/processing/input",
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/output/train", 
                destination=f"s3://{BUCKET_NAME}/pipeline/preprocess/train"
            ),
            ProcessingOutput(
                output_name="test",
                source="/opt/ml/processing/output/test",
                destination=f"s3://{BUCKET_NAME}/pipeline/preprocess/test"
            ),
            ProcessingOutput(
                output_name="inference",
                source="/opt/ml/processing/output/inference",
                destination=f"s3://{BUCKET_NAME}/pipeline/preprocess/inference"
            ),
            ProcessingOutput(
                output_name="actuals",   
                source="/opt/ml/processing/output/actuals",
                destination=f"s3://{BUCKET_NAME}/pipeline/preprocess/actuals"
            ),
        ],
        code="../src/preprocess.py",
        arguments=["--prediction-length", "12"],
)


step_preprocess = ProcessingStep(
    name="PreprocessData", step_args=processor_args
)

print("PreprocessData step defined")

PreprocessData step defined


## Define the Training Step

This step trains DeepAR using the JSON files from the Preprocess step.


In [7]:
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# Get DeepAR container image
deepar_container = retrieve(
    framework='forecasting-deepar',
    region=AWS_REGION
)
print(f"DeepAR container: {deepar_container}")

# Hyperparameters
hyperparameters = {
    'time_freq': 'M',
    'prediction_length': '12',
    'context_length': '24',
    'epochs': '100',
    'early_stopping_patience': '10',
    'num_cells': '40',
    'num_layers': '2',
    'likelihood': 'gaussian',
    'mini_batch_size': '32',
    'learning_rate': '0.001',
    'num_dynamic_feat': '9',   # 5 national + 4 state features
}

# Create estimator 
deepar_estimator = sagemaker.estimator.Estimator(
    image_uri=deepar_container,
    role=role,
    instance_count=1,
    instance_type=param_instance_type,
    output_path=f"s3://{BUCKET_NAME}/{S3_PREFIX['models']}",
    sagemaker_session=pipeline_session,
    hyperparameters=hyperparameters,
)

# Define training step — references preprocess outputs
# train_s3_uri = f"s3://{BUCKET_NAME}/data/training/deepar/train/train.json"
# test_s3_uri = f"s3://{BUCKET_NAME}/data/training/deepar/test/test.json"

step_train = TrainingStep(
    name="TrainDeepAR",
    step_args=deepar_estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                content_type="json",
            ),
            "test": TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                content_type="json",
            ),
        }
    )
)

print("TrainDeepAR step defined")

DeepAR container: 522234722520.dkr.ecr.us-east-1.amazonaws.com/forecasting-deepar:1
TrainDeepAR step defined


## Define Model Creation and Batch Transform Steps


In [8]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep
from sagemaker.inputs import TransformInput

# ── 7a. Create Model Step ──
# Wraps the trained model artifact into a deployable SageMaker Model

model = Model(
    image_uri=deepar_container,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_session,
)

step_create_model = ModelStep(
    name="CreateDeepARModel",
    step_args=model.create(instance_type="ml.m5.large"),
)

print("CreateDeepARModel step defined")

CreateDeepARModel step defined


##  Batch Transform Step

In [9]:

# Runs inference on the test data and saves predictions to S3

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://{BUCKET_NAME}/pipeline/transform-output",
    accept="application/jsonlines",
    sagemaker_session=pipeline_session,
)

step_transform = TransformStep(
    name="BatchTransform",
    step_args=transformer.transform(
        data=step_preprocess.properties.ProcessingOutputConfig.Outputs["inference"].S3Output.S3Uri,
        content_type="application/jsonlines",
        split_type="Line",         # Process one JSON line at a time
        join_source="None",        # Don't join input with output
    ),
)

print("BatchTransform step defined")

BatchTransform step defined



## Define the Evaluation Step

This step runs `src/evaluate.py` which:
1. Reads the Batch Transform predictions
2. Reads the actual NFCI values (from preprocess step)
3. Computes RMSE, MAE, R²
4. Writes `evaluation.json`

The **PropertyFile** tells the pipeline where to find the metrics
so the ConditionStep can read them.

In [10]:
from sagemaker.workflow.properties import PropertyFile

# PropertyFile: tells the pipeline where to find evaluation metrics
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",       
    path="evaluation.json",       
)

# Define the evaluation Processing step
step_evaluate = ProcessingStep(
    name="EvaluateModel",
    step_args=sklearn_processor.run(
        inputs=[
            ProcessingInput(
                source=step_transform.properties.TransformOutput.S3OutputPath,
                destination="/opt/ml/processing/input/predictions",
            ),
            ProcessingInput(
                source=step_preprocess.properties.ProcessingOutputConfig.Outputs["actuals"].S3Output.S3Uri,
                destination="/opt/ml/processing/input/actuals",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation",
                source="/opt/ml/processing/output/evaluation",
            ),
        ],
        code="../src/evaluate.py",
    ),
    property_files=[evaluation_report],  # Makes evaluation.json readable by pipeline
)

print("EvaluateModel step defined")

EvaluateModel step defined



## Define the Quality Gate and Model Registration

The **ConditionStep** reads RMSE from `evaluation.json` and checks
whether it meets our threshold. If yes, register the model. If no → stop.


In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import ModelMetrics, MetricsSource

#  Condition: RMSE ≤ threshold
rmse_condition = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_evaluate.name,
        property_file=evaluation_report,
        json_path="metrics.rmse",
    ),
    right=param_rmse_threshold,
)

# Model Metrics 
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=step_evaluate.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
        content_type="application/json",
    )
)

#  Register Model (runs only if condition passes) ──
step_register = RegisterModel(
    name="RegisterModel",
    estimator=deepar_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name="nfci-forecasting-models",
    approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

# Condition Step ──
step_condition = ConditionStep(
    name="CheckModelQuality",
    conditions=[rmse_condition],
    if_steps=[step_register],
    else_steps=[],
)

print("CheckModelQuality + RegisterModel steps defined")

✓ CheckModelQuality + RegisterModel steps defined



## Step 10: Build and Run the Pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline

# Build the pipeline
pipeline = Pipeline(
    name="nfci-forecasting-pipeline",
    parameters=[
        param_instance_type,
        param_prediction_length,
        param_rmse_threshold,
    ],
    steps=[
        step_preprocess,
        step_train,
        step_create_model,
        step_transform,
        step_evaluate,
        step_condition,
    ],
    sagemaker_session=sagemaker_session,  # Regular sagemaker session for submission
)

print("Pipeline assembled")

✓ Pipeline assembled


In [None]:
# Create or update the pipeline definition in SageMaker
pipeline.upsert(role_arn=role)
print("Pipeline created/updated in SageMaker")

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


✓ Pipeline created/updated in SageMaker


In [None]:
# Start the pipeline
execution = pipeline.start()

print(f" Pipeline execution started")
print(f"  Execution ARN: {execution.arn}")
print(f"  Status: {execution.describe()['PipelineExecutionStatus']}")

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


✓ Pipeline execution started
  Execution ARN: arn:aws:sagemaker:us-east-1:306617143793:pipeline/nfci-forecasting-pipeline/execution/87qaih00spbr
  Status: Executing


---
## Step 11: Monitor Pipeline Execution

The pipeline runs asynchronously. Use these methods to check progress.

In [None]:
# Wait for the pipeline to complete (this blocks until done)
# Remove this cell if you want to monitor manually instead
execution.wait()
print(f"Pipeline finished: {execution.describe()['PipelineExecutionStatus']}")

✓ Pipeline finished: Succeeded


In [190]:
# View each step's status
steps = execution.list_steps()

print(f"{'Step':<25} {'Status':<15} {'Start Time'}")
print("-" * 65)
for step in steps:
    name = step['StepName']
    status = step['StepStatus']
    start = str(step.get('StartTime', ''))[:19]
    print(f"{name:<25} {status:<15} {start}")

Step                      Status          Start Time
-----------------------------------------------------------------
RegisterModel-RegisterModel Succeeded       2026-02-06 20:42:00
CheckModelQuality         Succeeded       2026-02-06 20:41:59
EvaluateModel             Succeeded       2026-02-06 20:36:55
BatchTransform            Succeeded       2026-02-06 20:30:16
CreateDeepARModel-CreateModel Succeeded       2026-02-06 20:30:14
TrainDeepAR               Succeeded       2026-02-06 20:25:21
PreprocessData            Succeeded       2026-02-06 20:22:47


In [191]:
# Check if the model was registered (condition step passed)
for step in steps:
    if step['StepName'] == 'CheckModelQuality':
        print(f"Quality gate: {step['StepStatus']}")
        
        # Check the condition outcome from metadata
        metadata = step.get('Metadata', {})
        if 'Condition' in metadata:
            outcome = metadata['Condition'].get('Outcome', 'Unknown')
            print(f"Condition outcome: {outcome}")
            
    if step['StepName'] == 'RegisterModel':
        print(f"Model registration: {step['StepStatus']}")

Quality gate: Succeeded
Condition outcome: True
