# Cursus: Automatic SageMaker (MODS) Pipeline Compiler

The main contribution of this work is **Cursus**, a **compiler** that automatically generate **[MODS (Model Training Workflow Operation and Development System) Pipeline](https://w.amazon.com/bin/view/CMLS/Overview/MODS/)** base on two set of user inputs
* The **Pipeline DAG (Directed Acylic Graph)**, which describe pipeline as a graph
* The **Unified Config JSON**, which provides a central hub to extract all user inputs and their associated step information
    * Run [demo_config](./demo_config.ipynb) first to generate the Unified Config JSON
    * The config json will be saved in `./pipeling_config/xxx/` folder

![mods_pipeline_train_eval_calib](./demo/mods_pipeline_train_eval_calib.png)


In [None]:
#!pip install amzn-secure-ai-sandbox-workflow-python-sdk --ignore-installed

In [None]:
#!pip install amzn-mods-workflow-helper amzn-mods-python-sdk --upgrade

In [None]:
#!pip install --upgrade sagemaker

In [None]:
import os
import json
import pandas as pd
import pickle
import sys
import subprocess
from datetime import datetime

from pathlib import Path

In [None]:
from pydantic import BaseModel, Field, model_validator, field_validator
from typing import List, Optional, Dict, Any, Type, Union, Tuple


In [None]:
from collections import (
    defaultdict,
    deque
)

In [None]:
import logging

In [None]:
logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
logger = logging.getLogger(__name__)


## Environment Setup

In [None]:
from sagemaker import Session

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession

In [None]:
bucket_name='buyer-seller-messaging-reversal'

In [None]:
pipeline_session = PipelineSession(default_bucket=bucket_name) # IMPORTANT now the session uses the generated sagemaker_config

In [None]:
role=PipelineSession().get_caller_identity_arn()
role

In [None]:
from pathlib import Path
import sys

# Get parent directory of current notebook
project_root = str(Path().absolute().parent)
if project_root not in sys.path:
    sys.path.insert(0, project_root)  
    print(f"add project root {project_root} into system")

## Basic Information

In [None]:
region_list = [
    'NA',
    'EU',
    'FE'
]

In [None]:
region_selection = 0

In [None]:
region = region_list[region_selection]
region

In [None]:
MODEL_CLASS='pytorch'

In [None]:
service_name="BuyerAbuseRnR"

#### Config and Hyperparameter Information

In [None]:
current_dir = Path.cwd()
config_dir = Path(current_dir) / 'pipeline_config'
print(config_dir)

In [None]:
#hyparam_filename = f'hyperparameters_{region}_{MODEL_CLASS}.json' #'hyperparameters.json'

In [None]:
pipeline_config_name = f'config.json'  #f'config_{region}.json'
pipeline_config_name

In [None]:
config_path = config_dir / pipeline_config_name

In [None]:
config_path

## Pipeline Imports

In [None]:
from enum import Enum
from pydantic import BaseModel

## [Optional]: Test Config Load Functionality

Please skip this section if you are not concern about the config information loaded

### Hyperparameters

In [None]:
#from cursus.steps.hyperparams.hyperparameters_xgboost import XGBoostModelHyperparameters

In [None]:
#hyparam_path = config_dir / hyparam_filename
#with open(hyparam_path, 'r') as file:
#    hyperparam_dict = json.load(file)

In [None]:
#hyperparams = XGBoostModelHyperparameters(**hyperparam_dict)

In [None]:
#hyperparams.num_classes

In [None]:
#hyperparams.is_binary

### Import Configs

In [None]:
from cursus.core.base.config_base import BasePipelineConfig

In [None]:
#from cursus.steps.configs.config_cradle_data_loading_step import (CradleDataLoadingConfig,
#                                                    MdsDataSourceConfig,
#                                                    EdxDataSourceConfig,
#                                                    DataSourceConfig,
#                                                    DataSourcesSpecificationConfig,
#                                                    JobSplitOptionsConfig,
#                                                    TransformSpecificationConfig,
#                                                    OutputSpecificationConfig,
#                                                    CradleJobSpecificationConfig
#                                                   )

In [None]:
from cursus.steps.configs.config_dummy_data_loading_step import DummyDataLoadingConfig
from cursus.steps.configs.config_tabular_preprocessing_step import TabularPreprocessingConfig
from cursus.steps.configs.config_bedrock_prompt_template_generation_step import BedrockPromptTemplateGenerationConfig
from cursus.steps.configs.config_bedrock_batch_processing_step import BedrockBatchProcessingConfig

### Load Config

In [None]:
from cursus.steps.configs.utils import serialize_config, merge_and_save_configs, load_configs, verify_configs

In [None]:
CONFIG_CLASSES = {
        'DummyDataLoadingConfig':                     DummyDataLoadingConfig,
        'BedrockPromptTemplateGenerationConfig':      BedrockPromptTemplateGenerationConfig,
        'BedrockBatchProcessingConfig':               BedrockBatchProcessingConfig,
        'TabularPreprocessingConfig':                 TabularPreprocessingConfig,
    }

In [None]:
config_path

In [None]:
# Load configs
loaded_configs = load_configs(config_path, CONFIG_CLASSES)

In [None]:
loaded_configs

In [None]:
len(loaded_configs)

In [None]:
[str(k) for k in loaded_configs.keys()]

In [None]:
first_config = next(iter(loaded_configs.values()))

In [None]:
PIPELINE_VERSION = first_config.pipeline_version

In [None]:
PIPELINE_DESCRIPTION = first_config.pipeline_description

In [None]:
PIPELINE_NAME = first_config.pipeline_name

## Parameter Setup

In [None]:
import boto3
from sagemaker.workflow.pipeline_context import PipelineSession

# Initialize boto3 clients
ec2_client = boto3.client('ec2')
kms_client = boto3.client('kms')
sts_client = boto3.client('sts')

# Get account and region info
account_id = sts_client.get_caller_identity()['Account']
region = boto3.Session().region_name

### Find VPC Subnet - Get default VPC subnets or list all

In [None]:
response = ec2_client.describe_subnets(
    Filters=[{'Name': 'default-for-az', 'Values': ['true']}]
)
vpc_subnet_id = response['Subnets'][0]['SubnetId'] if response['Subnets'] else None

# OR list all subnets and choose one
#all_subnets = ec2_client.describe_subnets()
#for subnet in all_subnets['Subnets']:
#    print(f"Subnet ID: {subnet['SubnetId']}, VPC: {subnet['VpcId']}, AZ: {subnet['AvailabilityZone']}")


### Find Security Group - Get default or list all

In [None]:
response = ec2_client.describe_security_groups(
    Filters=[{'Name': 'group-name', 'Values': ['default']}]
)
security_group_id = response['SecurityGroups'][0]['GroupId'] if response['SecurityGroups'] else None

# OR list all security groups
#all_sgs = ec2_client.describe_security_groups()
#for sg in all_sgs['SecurityGroups']:
#    print(f"SG ID: {sg['GroupId']}, Name: {sg['GroupName']}, VPC: {sg.get('VpcId')}")

### Find KMS Key - List KMS keys for SageMaker

In [None]:
response = kms_client.list_aliases()
for alias in response['Aliases']:
    if 'sagemaker' in alias['AliasName'].lower():
        print(f"KMS Alias: {alias['AliasName']}, Key ID: {alias.get('TargetKeyId')}")

# OR get account's default KMS key ARN
kms_key_id = f"arn:aws:kms:{region}:{account_id}:alias/aws/sagemaker"


In [None]:
print(f"\nFound values:")
print(f"VPC Subnet: {vpc_subnet_id}")
print(f"Security Group: {security_group_id}")
print(f"KMS Key: {kms_key_id}")

### Execution Id

In [None]:
execution_id = datetime.now().strftime("%Y%m%d%H%M%S")

### Define Parameter String

In [None]:
from sagemaker.network import NetworkConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterString


In [None]:
# Predefined Pipeline Parameters
PIPELINE_EXECUTION_TEMP_DIR = ParameterString(name="EXECUTION_S3_PREFIX", default_value=f"s3://{bucket_name}/pipeline/{PIPELINE_NAME}/{execution_id}")
KMS_ENCRYPTION_KEY_PARAM = ParameterString(name="KMS_ENCRYPTION_KEY_PARAM", default_value=kms_key_id)
VPC_SUBNET = ParameterString(
    name="VPC_SUBNET",
    default_value=vpc_subnet_id
)  # TODO: test if we can replace it with multiple subnets
SECURITY_GROUP_ID = ParameterString(name="SECURITY_GROUP_ID", default_value=security_group_id)
PROCESSING_JOB_SHARED_NETWORK_CONFIG = NetworkConfig(
    enable_network_isolation=False,
    security_group_ids=[SECURITY_GROUP_ID],
    subnets=[VPC_SUBNET],
    encrypt_inter_container_traffic=True,
)

## Import Packages

In [None]:
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Type
from pathlib import Path
import logging
import os
import importlib

In [None]:
import sagemaker
from sagemaker import Session
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.parameters import Parameter
from sagemaker.workflow.properties import Properties
from sagemaker.workflow.pipeline_context import PipelineSession # Crucial import

## Demo: An End-to-End Pipeline based on PipelineDAG Compiler
Let us use the following simpler DAG (without registration as example)


In this demo there are several user input
* the **Unified JSON file** in `config_path`
* the **Registry Manager**: an object that handles the map between step logical name to `step.properties`
* the **Dependency Resolver**: an object than handles the *automatic dependency resolution* between steps
* the other fields
    * `sagemaker_session`: pipelne session
    * `role`: IAM Role
    * `notebook_root`: track the root path 


In this pipeline template, we inherit from base class `PipelineTemplateBase`. 

The **major tasks** are
* *`Config` Classes Import*
* *Configuration Validation*
* *Step Builder Retrieval and Step Builder Map Creation*
* *Configuration Map Creation*
* **Pipeline DAG Generation**: ideally, user should create this DAG and use it as input
* **Automatic Pipeline Assemble**: Call `pipeline_assembler`


### DAG to Template Compiler

In [None]:
from cursus.api.dag.base_dag import PipelineDAG
from cursus.core.compiler.dag_compiler import compile_dag_to_pipeline, PipelineDAGCompiler
from cursus.core.compiler.validation import ConversionReport
from cursus.steps.configs.utils import load_configs

In [None]:
def create_bedrock_batch_data_processing_dag() -> PipelineDAG:
    """
    Create a DAG for Bedrock Batch data processing pipeline.

    This DAG represents the simplest possible workflow that includes
    cost-efficient Bedrock batch LLM enhancement for pure data processing
    without any training, calibration, packaging, registration, or evaluation steps.
    Perfect for data enhancement and annotation workflows.

    Returns:
        PipelineDAG: The directed acyclic graph for the pipeline
    """
    dag = PipelineDAG()

    # Add minimal data processing nodes with Bedrock batch enhancement
    #dag.add_node("DummyDataLoading_training")  # Dummy data load
    #dag.add_node("TabularPreprocessing_training")  # Tabular preprocessing
    dag.add_node("BedrockPromptTemplateGeneration")  # Bedrock prompt template generation
    #dag.add_node("BedrockBatchProcessing_training")  # Bedrock batch processing step

    # Simple data processing flow with Bedrock batch enhancement
    #dag.add_edge("DummyDataLoading_training", "TabularPreprocessing_training")

    # Bedrock batch processing flow - two inputs to BedrockBatchProcessing
    #dag.add_edge("TabularPreprocessing_training", "BedrockBatchProcessing_training")  # Data input
    #dag.add_edge("BedrockPromptTemplateGeneration", "BedrockBatchProcessing_training")  # Template input

    logger.info(
        f"Created Bedrock Batch data processing DAG with {len(dag.nodes)} nodes and {len(dag.edges)} edges"
    )
    return dag

In [None]:
dag = create_bedrock_batch_data_processing_dag()

In [None]:
pipeline_parameters = [
    PIPELINE_EXECUTION_TEMP_DIR,
    KMS_ENCRYPTION_KEY_PARAM,
    SECURITY_GROUP_ID,
    VPC_SUBNET,
]

In [None]:
dag_compiler = PipelineDAGCompiler(
    config_path=config_path,
    sagemaker_session=pipeline_session,
    role=role,
    pipeline_parameters=pipeline_parameters
)

### Create a Pipeline

#### DAG Validation and Preview of Config Resolution

In [None]:
preview_only = True

In [None]:
if preview_only:
    preview = dag_compiler.preview_resolution(dag)
    logger.info("DAG node resolution preview:")
    for node, config_type in preview.node_config_map.items():
        confidence = preview.resolution_confidence.get(node, 0.0)
        logger.info(f"  {node} → {config_type} (confidence: {confidence:.2f})")
        
    if preview.recommendations:
        logger.info("Recommendations:")
        for recommendation in preview.recommendations:
            logger.info(f"  - {recommendation}")
        
    validation = dag_compiler.validate_dag_compatibility(dag)
    logger.info(f"DAG validation: {'VALID' if validation.is_valid else 'INVALID'}")
    if not validation.is_valid:
        if validation.missing_configs:
            logger.warning(f"Missing configs: {validation.missing_configs}")
        if validation.unresolvable_builders:
            logger.warning(f"Unresolvable builders: {validation.unresolvable_builders}")
        if validation.config_errors:
            logger.warning(f"Config errors: {validation.config_errors}")

### Put it Together: Pipeline Generation from DAG

In [None]:
# Convert DAG to pipeline and get report
try:
    logger.info(f"Converting DAG to pipeline")
    template_pipeline, report = dag_compiler.compile_with_report(
        dag=dag
    )
        
    # Log report summary
    logger.info(f"Conversion complete: {report.summary()}")
    for node, details in report.resolution_details.items():
        logger.info(f"  {node} → {details['config_type']} ({details['builder_type']})")
        
    # Log pipeline creation details
    logger.info(f"Pipeline '{template_pipeline.name}' created successfully")
    logger.info(f"Pipeline ARN: {template_pipeline.arn if hasattr(template_pipeline, 'arn') else 'Not available until upserted'}")
    logger.info("To upsert the pipeline, call pipeline.upsert()")       
except Exception as e:
    logger.error(f"Failed to convert DAG to pipeline: {e}")
    raise

### Pipeline Template

After the pipeline is generated, we can retrieve the pipeline template

In [None]:
pipeline_template_builder = dag_compiler.get_last_template()

## Start Execution

In [None]:
role_arn = pipeline_session.get_caller_identity_arn()
role_arn

In [None]:
pipeline_description=PIPELINE_DESCRIPTION

In [None]:
PIPELINE_DESCRIPTION

### Upsert

In [None]:
template_pipeline.upsert(
                role_arn=role_arn, description=pipeline_description
            )

### Start

In [None]:
pipeline_execution_parameters={
    "EXECUTION_S3_PREFIX": f"s3://{bucket_name}/pipeline/{PIPELINE_NAME}/{execution_id}",
    "KMS_ENCRYPTION_KEY_PARAM": kms_key_id,
    "VPC_SUBNET": vpc_subnet_id,
    "SECURITY_GROUP_ID": security_group_id,
}

In [None]:
pipeline_execution = template_pipeline.start(
                parameters=pipeline_execution_parameters
            )