# Cursus: Automatic SageMaker (MODS) Pipeline Compiler

The main contribution of this work is **Cursus**, a **compiler** that automatically generate **[MODS (Model Training Workflow Operation and Development System) Pipeline](https://w.amazon.com/bin/view/CMLS/Overview/MODS/)** base on two set of user inputs
* The **Pipeline DAG (Directed Acylic Graph)**, which describe pipeline as a graph
* The **Unified Config JSON**, which provides a central hub to extract all user inputs and their associated step information
    * Run [template_config_xgb_eval_v2](./template_config_xgb_eval_v2.ipynb) first to generate the Unified Config JSON
    * The config json will be saved in `./pipeling_config/xxx/` folder

![mods_pipeline_train_eval_calib](../demo/mods_pipeline_train_eval_calib.png)


In [1]:
#!pip install amzn-secure-ai-sandbox-workflow-python-sdk --ignore-installed

In [2]:
#!pip install amzn-mods-workflow-helper amzn-mods-python-sdk --upgrade

In [3]:
#!pip install --upgrade sagemaker

In [4]:
import os
import json
import pandas as pd
import pickle
import sys
import subprocess
from datetime import datetime

from pathlib import Path

In [5]:
from pydantic import BaseModel, Field, model_validator, field_validator
from typing import List, Optional, Dict, Any, Type, Union, Tuple


In [6]:
from collections import (
    defaultdict,
    deque
)

In [7]:
import logging

In [8]:
logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
logger = logging.getLogger(__name__)


## Environment Setup

In [9]:
from sagemaker import Session
from secure_ai_sandbox_python_lib.session import Session as SaisSession


2025-08-20 06:29:36,372 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


2025-08-20 06:29:36,666 - INFO - CA certs are provided via the AmazonCACerts installation at /home/ec2-user/.local/lib/python3.10/site-packages/amazoncerts


In [10]:
from mods_workflow_helper.utils.secure_session import create_secure_session_config
from mods_workflow_helper.sagemaker_pipeline_helper import SecurityConfig

from sagemaker.workflow.pipeline_context import PipelineSession

In [11]:
# Initialize session with team bucket
sais_session = SaisSession(".")

security_config = SecurityConfig(
    kms_key=sais_session.get_team_owned_bucket_kms_key(),
    security_group=sais_session.sandbox_vpc_security_group(),
    vpc_subnets=sais_session.sandbox_vpc_subnets()
)

2025-08-20 06:29:37,114 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
2025-08-20 06:29:37,916 - INFO - successfully patched module botocore


In [12]:
sagemaker_config = create_secure_session_config(
    role_arn=PipelineSession().get_caller_identity_arn(),
    # If you are uploading to andes, use cradle_read_s3_bucket_name() and get_cradle_read_bucket_kms_key() respecitely
    bucket_name=sais_session.team_owned_s3_bucket_name(),
    kms_key=sais_session.get_team_owned_bucket_kms_key(),
    vpc_subnet_ids=sais_session.sandbox_vpc_subnets(),
    vpc_security_groups=[sais_session.sandbox_vpc_security_group()]
)

2025-08-20 06:29:37,937 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
2025-08-20 06:29:38,120 - INFO - There is no MODS workflow execution id provided, this is probably because you are running your pipeline outside of MODS.


In [13]:

pipeline_session = PipelineSession(default_bucket=sais_session.team_owned_s3_bucket_name(), 
                                   sagemaker_config=sagemaker_config) # IMPORTANT now the session uses the generated sagemaker_config

2025-08-20 06:29:38,139 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [14]:
pipeline_session.config = sagemaker_config

In [15]:
bucket=sais_session.team_owned_s3_bucket_name()
bucket

'sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um'

In [16]:
role=PipelineSession().get_caller_identity_arn()
role

2025-08-20 06:29:38,674 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


'arn:aws:iam::601857636239:role/SandboxRole-lukexie-us-east-1'

In [17]:
from pathlib import Path
import sys

# Get parent directory of current notebook
project_root = str(Path().absolute().parent)
if project_root not in sys.path:
    sys.path.insert(0, project_root)  
    print(f"add project root {project_root} into system")

add project root /home/ec2-user/SageMaker/Cursus into system


## Basic Information

In [18]:
region_list = [
    'NA',
    'EU',
    'FE'
]

In [19]:
region_selection = 0

In [20]:
region = region_list[region_selection]
region

'NA'

In [21]:
MODEL_CLASS='xgboost'

In [22]:
service_name="AtoZ" #"BuyerAbuseCAPPDA" #

#### Config and Hyperparameter Information

In [23]:
current_dir = Path.cwd()
#config_dir = Path(current_dir) / 'pipeline_config' / f'config_{region}_{MODEL_CLASS}_v2'
config_dir = Path(current_dir).parent / 'pipeline_config' / f'config_{region}_{MODEL_CLASS}_{service_name}_v2'
print(config_dir)

/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2


In [24]:
hyparam_filename = f'hyperparameters_{region}_{MODEL_CLASS}.json' #'hyperparameters.json'

In [25]:
pipeline_config_name = f'config_{region}_{MODEL_CLASS}_{service_name}.json'  #f'config_{region}.json'
pipeline_config_name

'config_NA_xgboost_AtoZ.json'

In [26]:
config_path = config_dir / pipeline_config_name

In [27]:
config_path

PosixPath('/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json')

## Pipeline Imports

In [28]:
from enum import Enum
from pydantic import BaseModel

## [Optional]: Test Config Load Functionality

Please skip this section if you are not concern about the config information loaded

### Hyperparameters

In [29]:
from src.cursus.steps.hyperparams.hyperparameters_xgboost import XGBoostModelHyperparameters

2025-08-20 06:29:39,033 - pipeline_registry.builder_registry - INFO - Registered builder: BatchTransform -> BatchTransformStepBuilder
2025-08-20 06:29:39,033 - INFO - Registered builder: BatchTransform -> BatchTransformStepBuilder
2025-08-20 06:29:39,034 - pipeline_registry.builder_registry - INFO - Registered builder: CurrencyConversion -> CurrencyConversionStepBuilder
2025-08-20 06:29:39,034 - INFO - Registered builder: CurrencyConversion -> CurrencyConversionStepBuilder
2025-08-20 06:29:39,036 - pipeline_registry.builder_registry - INFO - Registered builder: DummyTraining -> DummyTrainingStepBuilder
2025-08-20 06:29:39,036 - INFO - Registered builder: DummyTraining -> DummyTrainingStepBuilder
2025-08-20 06:29:39,037 - pipeline_registry.builder_registry - INFO - Registered builder: ModelCalibration -> ModelCalibrationStepBuilder
2025-08-20 06:29:39,037 - INFO - Registered builder: ModelCalibration -> ModelCalibrationStepBuilder
2025-08-20 06:29:39,040 - pipeline_registry.builder_regi

In [30]:
hyparam_path = config_dir / hyparam_filename
with open(hyparam_path, 'r') as file:
    hyperparam_dict = json.load(file)

In [31]:
hyperparams = XGBoostModelHyperparameters(**hyperparam_dict)

In [32]:
hyperparams.num_classes

2

In [33]:
hyperparams.is_binary

True

### Import Configs

In [34]:
from src.cursus.core.base.config_base import BasePipelineConfig

In [35]:
from src.cursus.steps.configs.config_cradle_data_loading_step import (CradleDataLoadConfig,
                                                    MdsDataSourceConfig,
                                                    EdxDataSourceConfig,
                                                    DataSourceConfig,
                                                    DataSourcesSpecificationConfig,
                                                    JobSplitOptionsConfig,
                                                    TransformSpecificationConfig,
                                                    OutputSpecificationConfig,
                                                    CradleJobSpecificationConfig
                                                   )

In [36]:
from secure_ai_sandbox_workflow_python_sdk.utils.constants import (
    OUTPUT_TYPE_DATA,
    OUTPUT_TYPE_METADATA,
    OUTPUT_TYPE_SIGNATURE,
)

In [37]:
from src.cursus.steps.configs.config_processing_step_base import ProcessingStepConfigBase

In [38]:
from src.cursus.steps.configs.config_tabular_preprocessing_step import TabularPreprocessingConfig

In [39]:
from src.cursus.steps.configs.config_xgboost_training_step import XGBoostTrainingConfig

In [40]:
from src.cursus.steps.configs.config_model_calibration_step import ModelCalibrationConfig

In [41]:
from src.cursus.steps.configs.config_xgboost_model_eval_step import XGBoostModelEvalConfig

In [42]:
from src.cursus.steps.configs.config_package_step import PackageConfig

In [43]:
from src.cursus.steps.configs.config_registration_step import RegistrationConfig

In [44]:
from secure_ai_sandbox_workflow_python_sdk.mims_model_registration.mims_model_registration_processing_step import (
    MimsModelRegistrationProcessingStep,
)

In [45]:
from secure_ai_sandbox_workflow_python_sdk.mims_model_registration.mims_model_registration_processor import (
    MimsModelRegistrationProcessor,
)

In [46]:
from src.cursus.steps.configs.config_payload_step import PayloadConfig

### Load Config

In [47]:
from src.cursus.steps.configs.utils import serialize_config, merge_and_save_configs, load_configs, verify_configs

In [48]:
CONFIG_CLASSES = {
        'BasePipelineConfig':          BasePipelineConfig,
        'XGBoostTrainingConfig':       XGBoostTrainingConfig,
        'ModelCalibrationConfig':      ModelCalibrationConfig,
        'ProcessingStepConfigBase':    ProcessingStepConfigBase,
        'PackageConfig':               PackageConfig,
        'RegistrationConfig':          RegistrationConfig,
        'PayloadConfig':               PayloadConfig,
        'CradleDataLoadConfig':        CradleDataLoadConfig,
        'TabularPreprocessingConfig':  TabularPreprocessingConfig,
        'XGBoostModelEvalConfig':      XGBoostModelEvalConfig,
    }

In [49]:
config_path

PosixPath('/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json')

In [50]:
# Load configs
loaded_configs = load_configs(config_path, CONFIG_CLASSES)

2025-08-20 06:29:39,204 - INFO - Loading configs from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:29:39,204 - INFO - Loading configuration from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: CradleJobSpecificationConfig()
Reference path: 
This creates a cycle in the object graph.
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: CradleJobSpecificationConfig()
Reference path: 
This creates a cycle in the object graph.
Object: DataSourcesSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: DataSourcesSpecificationConfig()
Reference path: 
This creates a cycle in the object g

In [51]:
base_config = loaded_configs['Base']
base_config

BasePipelineConfig(author='lukexie', bucket='sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um', role='arn:aws:iam::601857636239:role/SandboxRole-lukexie-us-east-1', region='NA', service_name='AtoZ', pipeline_version='2.0.0', model_class='xgboost', current_date='2025-08-20', framework_version='1.7-1', py_version='py3', source_dir='/home/ec2-user/SageMaker/Cursus/dockers/xgboost_atoz')

## Import Packages

In [52]:
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Type
from pathlib import Path
import logging
import os
import importlib

In [53]:
import sagemaker
from sagemaker import Session, TrainingInput
from sagemaker import image_uris, model_uris, script_uris
from sagemaker.processing import ProcessingOutput, ProcessingInput, FrameworkProcessor
from sagemaker.sklearn import SKLearnProcessor, SKLearn
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString
from sagemaker.image_uris import retrieve
from sagemaker.sklearn import SKLearnProcessor
from sagemaker.workflow.steps import CacheConfig  # Add this import
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    TuningStep,
    TransformStep,
    Step
)
from sagemaker.workflow.parameters import Parameter
from sagemaker.workflow.properties import Properties
from sagemaker.workflow.pipeline_context import PipelineSession # Crucial import

### MODS and SAIS Python SDK

In [54]:
from mods.mods_template import MODSTemplate


In [55]:
from secure_ai_sandbox_workflow_python_sdk.mims_model_registration.mims_model_registration_processing_step import (
    MimsModelRegistrationProcessingStep,
)
from secure_ai_sandbox_workflow_python_sdk.cradle_data_loading.cradle_data_loading_step import (
    CradleDataLoadingStep,
)
from mods_workflow_core.utils.constants import (
    PIPELINE_EXECUTION_TEMP_DIR,
    KMS_ENCRYPTION_KEY_PARAM,
    PROCESSING_JOB_SHARED_NETWORK_CONFIG,
    SECURITY_GROUP_ID,
    VPC_SUBNET,
)
from secure_ai_sandbox_workflow_python_sdk.model_performance_evaluation.model_performance_evaluation_step import (
    ModelPerformanceEvaluationStep,
)

from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum

from secure_ai_sandbox_workflow_python_sdk.model_performance_evaluation.model_performance_evaluation_processor import (
    ModelPerformanceEvaluationProcessor,
)
from secure_ai_sandbox_workflow_python_sdk.utils.constants import PROCESSOR_DIRECTORY_ROOT

## [OPTIONAL] Import Internal Packages: 

### Step Builder

In [56]:
from src.cursus.core.base.builder_base import StepBuilderBase
from src.cursus.steps.builders.builder_cradle_data_loading_step import CradleDataLoadingStepBuilder
from src.cursus.steps.builders.builder_tabular_preprocessing_step import TabularPreprocessingStepBuilder
from src.cursus.steps.builders.builder_xgboost_training_step import XGBoostTrainingStepBuilder
from src.cursus.steps.builders.builder_model_calibration_step import ModelCalibrationStepBuilder
from src.cursus.steps.builders.builder_xgboost_model_eval_step import XGBoostModelEvalStepBuilder
from src.cursus.steps.builders.builder_package_step import PackageStepBuilder
from src.cursus.steps.builders.builder_payload_step import PayloadStepBuilder
from src.cursus.steps.builders.builder_registration_step import RegistrationStepBuilder

2025-08-20 06:29:39,265 - pipeline_registry.builder_registry - INFO - Registered builder: CradleDataLoading -> CradleDataLoadingStepBuilder
2025-08-20 06:29:39,265 - INFO - Registered builder: CradleDataLoading -> CradleDataLoadingStepBuilder
2025-08-20 06:29:39,267 - pipeline_registry.builder_registry - INFO - Registered builder: Registration -> RegistrationStepBuilder
2025-08-20 06:29:39,267 - INFO - Registered builder: Registration -> RegistrationStepBuilder


### Dependency Resolver and Registry Manager

In [57]:
from src.cursus.core.deps.registry_manager import RegistryManager
from src.cursus.core.deps.dependency_resolver import UnifiedDependencyResolver

### Pipeline DAG

In [58]:
from src.cursus.api.dag.base_dag import PipelineDAG

### Pipeline Template Builder and AbstractPipelineTemplate

In [59]:
from src.cursus.core.assembler import PipelineAssembler
from src.cursus.core.assembler import PipelineTemplateBase

### Pipeline Examples

The following are a set of pipeline template examples. We import them for demo purpose

In [60]:
#from src.pipeline_builder.template_pipeline_xgboost_train_calibrate_evaluate_e2e import XGBoostTrainCalibrateEvaluateE2ETemplate

In [61]:
#from src.pipeline_builder.template_pipeline_xgboost_train_calibrate import XGBoostTrainCalibrateTemplate

In [62]:
#from src.pipeline_builder.template_pipeline_xgboost_train_evaluate_e2e import XGBoostTrainEvaluateE2ETemplate

In [63]:
#from src.pipeline_builder.template_pipeline_xgboost_train_evaluate_no_registration import XGBoostTrainEvaluateNoRegistrationTemplate

In [64]:
#from src.pipeline_builder.template_pipeline_xgboost_simple import XGBoostSimpleTemplate

In [65]:
#from src.pipeline_builder.template_pipeline_xgboost_dataload_preprocess import XGBoostDataloadPreprocessTemplate

In [66]:
#from src.pipeline_builder.template_pipeline_cradle_only import CradleOnlyTemplate

## Demo: An End-to-End Pipeline based on PipelineDAG Compiler
Let us use the following simpler DAG (without registration as example)

![mods_pipeline_train_eval_calib](./mods_pipeline_train_eval_calib.png)

In this demo there are several user input
* the **Unified JSON file** in `config_path`
* the **Registry Manager**: an object that handles the map between step logical name to `step.properties`
* the **Dependency Resolver**: an object than handles the *automatic dependency resolution* between steps
* the other fields
    * `sagemaker_session`: pipelne session
    * `role`: IAM Role
    * `notebook_root`: track the root path 


In this pipeline template, we inherit from base class `PipelineTemplateBase`. 

The **major tasks** are
* *`Config` Classes Import*
* *Configuration Validation*
* *Step Builder Retrieval and Step Builder Map Creation*
* *Configuration Map Creation*
* **Pipeline DAG Generation**: ideally, user should create this DAG and use it as input
* **Automatic Pipeline Assemble**: Call `pipeline_assembler`


In [67]:
current_dir = Path.cwd()

In [68]:
#config_path = Path(current_dir) / 'pipeline_config' / f'config_{region}.json'
config_path

PosixPath('/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json')

In [69]:
from sagemaker.image_uris import retrieve

### Create Registry Manager and Dependency Resolver via Factory

In [70]:
from src.cursus.core.deps.factory import create_pipeline_components

In [71]:
base_config.pipeline_name

'lukexie-AtoZ-xgboost-NA'

In [72]:
pipeline_components = create_pipeline_components(context_name=base_config.pipeline_name)

2025-08-20 06:29:39,355 - INFO - Initialized registry manager
2025-08-20 06:29:39,355 - INFO - Created specification registry for context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:39,355 - INFO - Created new registry for context 'lukexie-AtoZ-xgboost-NA'


In [73]:
pipeline_components

{'semantic_matcher': <src.cursus.core.deps.semantic_matcher.SemanticMatcher at 0x7fb9c95ed300>,
 'registry_manager': RegistryManager(contexts=1),
 'registry': SpecificationRegistry(context='lukexie-AtoZ-xgboost-NA', steps=0),
 'resolver': <src.cursus.core.deps.dependency_resolver.UnifiedDependencyResolver at 0x7fb9c95531f0>}

In [74]:
semantic_matcher = pipeline_components['semantic_matcher']

In [75]:
registry_manager = pipeline_components['registry_manager']

In [76]:
dependency_resolver = pipeline_components['resolver']

In [77]:
spec_registry = pipeline_components['registry']

#### Check user input

In [78]:
config_path

PosixPath('/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json')

In [79]:
pipeline_session

<sagemaker.workflow.pipeline_context.PipelineSession at 0x7fb9d0862b90>

In [80]:
role

'arn:aws:iam::601857636239:role/SandboxRole-lukexie-us-east-1'

In [81]:
registry_manager

RegistryManager(contexts=1)

In [82]:
dependency_resolver

<src.cursus.core.deps.dependency_resolver.UnifiedDependencyResolver at 0x7fb9c95531f0>

### Legacy Code Example
The Legacy pipeline has the following limitations
- **Hard-Wired connection** between steps
- **Fixed Pipeline Topology**: cannot change pipeline without siginificant rewrite the code
- **Code Maintenance**: 1000+ lines of code

```python
from sagemaker.workflow.functions import Join

class AtoZRegionalXGBoostModel:
    ARTIFACT_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "Artifacts"])
    TRAINING_DATA_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "data/training"])
    VALIDATION_DATA_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "data/validation"])
    TESTING_DATA_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "data/testing"])
    CALIBRATION_DATA_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "data/calibration"]
    )
    TRAINING_FULL_DATA_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "full_data/training"]
    )
    VALIDATION_FULL_DATA_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "full_data/validation"]
    )
    TESTING_FULL_DATA_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "full_data/testing"]
    )
    CALIBRATION_FULL_DATA_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "full_data/calibration"]
    )
    TRAINING_TAG_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "tag/training"])
    VALIDATION_TAG_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "tag/validation"])
    TESTING_TAG_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "tag/testing"])
    TRAINING_STEP_OUTPUT_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "training_step_output"]
    )
    TUNING_STEP_OUTPUT_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "tuning_step_output"]
    )
    TESTING_SCORES_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "scores/testing"])
    CALIBRATION_SCORES_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "scores/calibration"]
    )
    TESTING_EVALUATION_LOCATION = Join(
        on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "evaluation/testing"]
    )
    PROD_LOCATION = Join(on="/", values=[PIPELINE_EXECUTION_TEMP_DIR, "prod"])

    def __init__(
        self,
        regional_alias,
        sagemaker_session=None,
        execution_role=None,
        #         config="config.json",
    ):

        config_path = str(Path(__file__).parent / "config" / f"config_{regional_alias}.json")
        with open(config_path, "r") as file:
            self.config = json.load(file)
        region = self.config["region"]
        self.pipeline_name = self.config["pipeline_name"]
        self.pipeline_version = self.config["pipeline_version"]
        self.author = self.config["author"]
        self.pipeline_description = self.config["pipeline_description"]
        self.model_class = self.config["model_class"]
        self.model_version = self.config["model_version"]
        self.processing_sagemaker_instance_type_small = self.config[
            "processing_sagemaker_instance_type_small"
        ]
        self.processing_sagemaker_instance_type_large = self.config[
            "processing_sagemaker_instance_type_large"
        ]
        self.training_sagemaker_instance_type = self.config["training_sagemaker_instance_type"]
        self.transform_sagemaker_instance_type = self.config["transform_sagemaker_instance_type"]

        self.processing_instance_count = self.config["processing_instance_count"]
        self.training_instance_count = self.config["training_instance_count"]
        self.transform_instance_count = self.config["transform_instance_count"]

        self.processing_volume_size = self.config["processing_volume_size"]
        self.training_volume_size = self.config["training_volume_size"]

        data_config_path = str(
            Path(__file__).parent / "config" / f"data_config_{regional_alias}.json"
        )

        with open(data_config_path, "r") as file:
            data_config = json.load(file)
        self.data_loading_info = data_config["data_loading_info"]

        # DATA_TYPE_LIST is the list of data_type:
        # training, validation, testing, calibration

        DATA_TYPE_LIST = [
            "training",
            "validation",
            "testing",
            "calibration",
        ]
        # self.loading_data_type_list is the list of data_type requiring a data loading job
        # some data_type already have data downloaded in the s3_output_override location
        self.loading_data_type_list = [
            data_type
            for data_type in DATA_TYPE_LIST
            if (data_type in self.data_loading_info)
            and (not self.data_loading_info[data_type].get("s3_output_override"))
        ]

        model_registration_info = data_config["model_registration_info"]
        self.registration_step_name_list = list(set(model_registration_info["step_name"].values()))

        model_training_config_path = str(
            Path(__file__).parent / "config" / f"model_training_config_{regional_alias}.json"
        )

        with open(model_training_config_path, "r") as file:
            self.model_training_config = json.load(file)

        self.sagemaker_session = sagemaker_session or Session()
        self.execution_role = execution_role or self.sagemaker_session.get_caller_identity_arn()
        self.s3_loc = self.config["s3_loc"]
        self.s3_bucket = self.config["s3_bucket"]
        print("Sagemaker_version:", sagemaker.__version__)

        self.data_loading_steps = {
            data_type: CradleDataLoadingStep(
                step_name=f"{data_type}_data_download",
                role=self.execution_role,
                sagemaker_session=self.sagemaker_session,
            )
            for data_type in self.loading_data_type_list
        }

        self.binning_step = self.create_binning_step()
        self.preprocessing_step_train = self.create_preprocessing_step_train()
        self.preprocessing_step_test = self.create_preprocessing_step_test()
        self.preprocessing_step_calib = self.create_preprocessing_step_calib()

        self.is_hyperparameter_tuning = self.model_training_config.get(
            "is_hyperparameter_tuning", False
        )
        if self.is_hyperparameter_tuning:
            self.tuning_step = self.create_tuning_step()
        elif not self.is_hyperparameter_tuning:
            self.training_step = self.create_training_step()

        self.model_step = self.create_model_step()
        self.test_scoring_step = self.create_test_scoring_step()
        self.test_evaluation_step = self.create_test_evaluation_step()
        self.calib_scoring_step = self.create_calib_scoring_step()
        self.pctlscore_step = self.create_pctlscore_step()
        self.mims_packaging_step = self.create_mims_packaging_step()

        self.eval_step = self.create_model_evaluation_step()

        self.generate_payload_step = self.create_generate_payload_step()

        self.registration_steps = {
            step_name: MimsModelRegistrationProcessingStep(
                step_name=step_name,
                role=self.execution_role,
                sagemaker_session=self.sagemaker_session,
                processing_input=[
                    ProcessingInput(
                        source=self.mims_packaging_step.properties.ProcessingOutputConfig.Outputs[
                            "output"
                        ].S3Output.S3Uri,
                        destination="/opt/ml/processing/input/model",
                    ),
                    ProcessingInput(
                        source=self.generate_payload_step.properties.ProcessingOutputConfig.Outputs[
                            "output"
                        ].S3Output.S3Uri,
                        destination="/opt/ml/processing/mims_payload",
                    ),
                ],
                depends_on=[
                    self.mims_packaging_step,
                    self.eval_step,
                    self.generate_payload_step,
                ],
                performance_metadata_location=self.eval_step.get_output_locations(True),
            )
            for step_name in self.registration_step_name_list
        }

    def generate_pipeline(self):
        steps = list(self.data_loading_steps.values())
        steps.extend(
            [
                self.binning_step,
                self.preprocessing_step_train,
                self.preprocessing_step_test,
                self.preprocessing_step_calib,
            ]
        )

        if self.is_hyperparameter_tuning:
            steps.append(self.tuning_step)
        elif not self.is_hyperparameter_tuning:
            steps.append(self.training_step)

        steps.extend(
            [
                self.model_step,
                self.test_scoring_step,
                self.test_evaluation_step,
                self.calib_scoring_step,
                self.pctlscore_step,
                self.mims_packaging_step,
                self.eval_step,
                self.generate_payload_step,
            ]
        )

        # comment the line below if you don't want register your models using MIMS
        steps.extend(list(self.registration_steps.values()))

        return Pipeline(
            name=self.pipeline_name,
            parameters=[
                PIPELINE_EXECUTION_TEMP_DIR,
                KMS_ENCRYPTION_KEY_PARAM,
                VPC_SUBNET,
                SECURITY_GROUP_ID,
            ],
            sagemaker_session=self.sagemaker_session,
            steps=steps,
        )

    ########################################
    ### Binning step

    def create_binning_step(self):
        depends_on = []
        if "training" not in self.loading_data_type_list:
            train_data_location = self.data_loading_info["training"]["s3_output_override"]
        else:
            training_data_loading_step = self.data_loading_steps["training"]
            depends_on.append(training_data_loading_step)
            train_data_location = training_data_loading_step.get_output_locations("DATA")
        config_location = str(Path(__file__).parent / "config")
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Binning",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "bin.py"),
            inputs=[
                ProcessingInput(
                    source=train_data_location,
                    destination=f"/opt/ml/processing/input/training",
                ),
                ProcessingInput(
                    source=config_location,
                    destination="/opt/ml/processing/input/config",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="output",
                    source="/opt/ml/processing/output",
                    destination=artifacts_location,
                ),
            ],
            job_arguments=[
                "--data_type",
                "training",
            ],
            depends_on=depends_on,
        )

    ########################################
    ### Preprocessing steps

    def create_preprocessing_step_train(self):
        depends_on = []
        if "training" not in self.loading_data_type_list:
            train_data_location = self.data_loading_info["training"]["s3_output_override"]
        else:
            training_data_loading_step = self.data_loading_steps["training"]
            depends_on.append(training_data_loading_step)
            train_data_location = training_data_loading_step.get_output_locations("DATA")
        depends_on.append(self.binning_step)
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION
        output_train_data_location = AtoZRegionalXGBoostModel.TRAINING_DATA_LOCATION
        output_valid_data_location = AtoZRegionalXGBoostModel.VALIDATION_DATA_LOCATION
        output_train_full_data_location = AtoZRegionalXGBoostModel.TRAINING_FULL_DATA_LOCATION
        output_valid_full_data_location = AtoZRegionalXGBoostModel.VALIDATION_FULL_DATA_LOCATION
        output_train_tag_location = AtoZRegionalXGBoostModel.TRAINING_TAG_LOCATION
        output_valid_tag_location = AtoZRegionalXGBoostModel.VALIDATION_TAG_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Preprocess-training-set",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "preprocess.py"),
            inputs=[
                ProcessingInput(
                    source=train_data_location,
                    destination="/opt/ml/processing/input/data",
                ),
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    source="/opt/ml/processing/output/artifacts",
                    destination=artifacts_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/training",
                    destination=output_train_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/validation",
                    destination=output_valid_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/full_training",
                    destination=output_train_full_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/full_validation",
                    destination=output_valid_full_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/training_tag",
                    destination=output_train_tag_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/validation_tag",
                    destination=output_valid_tag_location,
                ),
            ],
            depends_on=depends_on,
            job_arguments=[
                "--n_workers",
                "16",
                "--data_type",
                "training",
            ],
        )

    def create_preprocessing_step_test(self):
        depends_on = []
        if "testing" not in self.loading_data_type_list:
            testing_data_location = self.data_loading_info["testing"]["s3_output_override"]
        else:
            testing_data_loading_step = self.data_loading_steps["testing"]
            depends_on.append(testing_data_loading_step)
            testing_data_location = testing_data_loading_step.get_output_locations("DATA")
        depends_on.append(self.binning_step)
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION
        output_testing_data_location = AtoZRegionalXGBoostModel.TESTING_DATA_LOCATION
        output_test_full_data_location = AtoZRegionalXGBoostModel.TESTING_FULL_DATA_LOCATION
        output_testing_tag_location = AtoZRegionalXGBoostModel.TESTING_TAG_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Preprocess-test-set",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "preprocess.py"),
            inputs=[
                ProcessingInput(
                    source=testing_data_location,
                    destination="/opt/ml/processing/input/data",
                ),
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    source="/opt/ml/processing/output/artifacts",
                    destination=artifacts_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/testing",
                    destination=output_testing_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/full_testing",
                    destination=output_test_full_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/testing_tag",
                    destination=output_testing_tag_location,
                ),
            ],
            depends_on=depends_on,
            job_arguments=[
                "--n_workers",
                "16",
                "--data_type",
                "testing",
            ],
        )

    def create_preprocessing_step_calib(self):
        depends_on = []
        if "calibration" not in self.loading_data_type_list:
            calib_data_location = self.data_loading_info["calibration"]["s3_output_override"]
        else:
            calibration_data_loading_step = self.data_loading_steps["calibration"]
            depends_on.append(calibration_data_loading_step)
            calib_data_location = calibration_data_loading_step.get_output_locations("DATA")
        depends_on.append(self.binning_step)

        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION
        output_calib_data_location = AtoZRegionalXGBoostModel.CALIBRATION_DATA_LOCATION
        output_calib_full_data_location = AtoZRegionalXGBoostModel.CALIBRATION_FULL_DATA_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_large,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Preprocess-calibration-set",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "preprocess.py"),
            inputs=[
                ProcessingInput(
                    source=calib_data_location,
                    destination="/opt/ml/processing/input/data",
                ),
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    source="/opt/ml/processing/output/artifacts",
                    destination=artifacts_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/calibration",
                    destination=output_calib_data_location,
                ),
                ProcessingOutput(
                    source="/opt/ml/processing/output/data/full_calibration",
                    destination=output_calib_full_data_location,
                ),
            ],
            depends_on=depends_on,
            job_arguments=[
                "--n_workers",
                "16",
                "--data_type",
                "calibration",
            ],
        )

    ########################################
    ### Training step
    def create_training_step(self):
        s3_output_location = AtoZRegionalXGBoostModel.TRAINING_STEP_OUTPUT_LOCATION
        content_type = "text/csv"

        train_input = TrainingInput(
            AtoZRegionalXGBoostModel.TRAINING_DATA_LOCATION, content_type=content_type
        )
        validation_input = TrainingInput(
            AtoZRegionalXGBoostModel.VALIDATION_DATA_LOCATION, content_type=content_type
        )

        training_instance_type = self.training_sagemaker_instance_type

        training_image_uri = image_uris.retrieve(
            self.model_class, TRAINING_MODEL_REGION, self.model_version, image_scope="training"
        )

        hyperparameters = self.model_training_config["model_params"]

        # Create SageMaker Estimator instance
        xgb_estimator = Estimator(
            role=self.execution_role,
            image_uri=training_image_uri,
            instance_count=self.training_instance_count,
            volume_size=self.training_volume_size,
            instance_type=training_instance_type,
            hyperparameters=hyperparameters,
            output_path=s3_output_location,
            enable_sagemaker_metrics=True,
            sagemaker_session=self.sagemaker_session,
            encrypt_inter_container_traffic=True,
            disable_profiler=True,
        )

        return TrainingStep(
            name="Training",
            estimator=xgb_estimator,
            inputs={
                "train": train_input,
                "validation": validation_input,
            },
            depends_on=[
                self.preprocessing_step_train,
            ],
        )

    ########################################
    ### Tuning step

    def create_tuning_step(self):
        tuning_step_s3_output_location = AtoZRegionalXGBoostModel.TUNING_STEP_OUTPUT_LOCATION
        content_type = "text/csv"

        train_input = TrainingInput(
            AtoZRegionalXGBoostModel.TRAINING_DATA_LOCATION, content_type=content_type
        )
        validation_input = TrainingInput(
            AtoZRegionalXGBoostModel.VALIDATION_DATA_LOCATION, content_type=content_type
        )

        training_instance_type = self.training_sagemaker_instance_type

        training_image_uri = image_uris.retrieve(
            self.model_class, TRAINING_MODEL_REGION, self.model_version, image_scope="training"
        )

        hyperparameters = self.model_training_config["model_params"]
        hyperparameter_ranges_raw = self.model_training_config.get("model_params_ranges", dict())
        hyperparameter_ranges = dict()
        for key, value in hyperparameter_ranges_raw.items():
            if value["type"] == "continuous":
                hyperparameter_ranges[key] = ContinuousParameter(
                    float(value["min"]), float(value["max"])
                )
            elif value["type"] == "integer":
                hyperparameter_ranges[key] = IntegerParameter(int(value["min"]), int(value["max"]))

        # Create SageMaker Estimator instance
        xgb_estimator = Estimator(
            role=self.execution_role,
            image_uri=training_image_uri,
            instance_count=self.training_instance_count,
            volume_size=self.training_volume_size,
            instance_type=training_instance_type,
            hyperparameters=hyperparameters,
            output_path=tuning_step_s3_output_location,
            enable_sagemaker_metrics=True,
            sagemaker_session=self.sagemaker_session,
            encrypt_inter_container_traffic=True,
            disable_profiler=True,
        )

        tuner = HyperparameterTuner(
            estimator=xgb_estimator,
            objective_metric_name="validation:auc",  # Specify the metric to optimize
            hyperparameter_ranges=hyperparameter_ranges,
            max_jobs=10,
            max_parallel_jobs=2,
            objective_type="Maximize",
        )

        tuning_step = TuningStep(
            name="XGBoostHyperparameterTuning",
            tuner=tuner,
            inputs={
                "train": train_input,
                "validation": validation_input,
            },
            depends_on=[
                self.preprocessing_step_train,
            ],
        )

        return tuning_step

    def create_model_step(self):
        training_image_uri = image_uris.retrieve(
            self.model_class, TRAINING_MODEL_REGION, self.model_version, image_scope="training"
        )
        if self.is_hyperparameter_tuning:
            self.model_data = Join(
                on="/",
                values=[
                    AtoZRegionalXGBoostModel.TUNING_STEP_OUTPUT_LOCATION,
                    # from DescribeHyperParameterTuningJob
                    self.tuning_step.properties.BestTrainingJob.TrainingJobName,
                    "output/model.tar.gz",
                ],
            )
        elif not self.is_hyperparameter_tuning:
            self.model_data = self.training_step.properties.ModelArtifacts.S3ModelArtifacts

        model = Model(
            image_uri=training_image_uri,
            model_data=self.model_data,
            sagemaker_session=self.sagemaker_session,
            role=self.execution_role,
        )

        create_model_step = ModelStep(
            name="CreateInferenceModelStep",
            step_args=model.create(),
        )
        return create_model_step

    def create_test_scoring_step(self):
        # Set up a transformer to join the input and output data
        # See
        # https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html
        # for more info
        model_name = self.model_step.properties.ModelName
        testing_data_location = AtoZRegionalXGBoostModel.TESTING_DATA_LOCATION
        testing_output_location = AtoZRegionalXGBoostModel.TESTING_SCORES_LOCATION

        transformer = Transformer(
            model_name=model_name,
            instance_type=self.transform_sagemaker_instance_type,
            instance_count=self.transform_instance_count,
            output_path=testing_output_location,
            sagemaker_session=self.sagemaker_session,
            # Set accept to text/csv so that the output_fn in the inference code knows to output csv instead of json
            accept="text/csv",
            # assemble_with must equal split_type below
            assemble_with="Line",
        )
        return TransformStep(
            name="TestScoring",
            transformer=transformer,
            inputs=TransformInput(
                data=testing_data_location,
                content_type="text/csv",
                split_type="Line",
                join_source="Input",
                # Use "$" to send all columns to the transformer
                # exclude the first column (tag)
                input_filter="$[1:]",
                # Join the input and output data
                output_filter="$[-1]",  # Use $[-1] to get the last column which is the predicted scores
            ),
            depends_on=[
                self.model_step,
                self.preprocessing_step_test,
            ],
        )

    def create_test_evaluation_step(self):
        data_type = "testing"
        depends_on = [self.test_scoring_step]
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION
        full_data_location = AtoZRegionalXGBoostModel.TESTING_FULL_DATA_LOCATION
        tag_location = AtoZRegionalXGBoostModel.TESTING_TAG_LOCATION
        score_location = AtoZRegionalXGBoostModel.TESTING_SCORES_LOCATION
        evaluation_output_location = AtoZRegionalXGBoostModel.TESTING_EVALUATION_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name=f"Evaluation-{data_type}",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "evaluation.py"),
            inputs=[
                ProcessingInput(
                    source=full_data_location,
                    destination="/opt/ml/processing/input/full_data",
                ),
                ProcessingInput(
                    source=tag_location,
                    destination="/opt/ml/processing/input/tag",
                ),
                ProcessingInput(
                    source=score_location,
                    destination="/opt/ml/processing/input/scores",
                ),
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    source="/opt/ml/processing/output/artifacts",
                    destination=artifacts_location,
                ),
                ProcessingOutput(
                    source=f"/opt/ml/processing/output/evaluation/",
                    destination=evaluation_output_location,
                ),
            ],
            depends_on=depends_on,
            job_arguments=[
                "--n_workers",
                "16",
                "--data_type",
                data_type,
            ],
        )

    def create_calib_scoring_step(self):
        # Set up a transformer to join the input and output data
        # See
        # https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html
        # for more info
        model_name = self.model_step.properties.ModelName
        calibration_data_location = AtoZRegionalXGBoostModel.CALIBRATION_DATA_LOCATION
        calibration_output_location = AtoZRegionalXGBoostModel.CALIBRATION_SCORES_LOCATION

        transformer = Transformer(
            model_name=model_name,
            instance_type=self.transform_sagemaker_instance_type,
            instance_count=self.transform_instance_count,
            output_path=calibration_output_location,
            sagemaker_session=self.sagemaker_session,
            #             volume_kms_key=KMS_ENCRYPTION_KEY_PARAM,
            #             output_kms_key=KMS_ENCRYPTION_KEY_PARAM,
            # Set accept to text/csv so that the output_fn in the inference code knows to output csv instead of json
            accept="text/csv",
            # assemble_with must equal split_type below
            assemble_with="Line",
        )
        return TransformStep(
            name="CalibScoring",
            transformer=transformer,
            inputs=TransformInput(
                data=calibration_data_location,
                content_type="text/csv",
                split_type="Line",
                join_source="Input",
                # Use $[0,1] to send the 0th and 1st column to the transformer
                # Use "$" to send all columns to the transformer
                input_filter="$",
                # Join the input and output data
                output_filter="$[-1]",  # Use $[-1] to get the last column which is the predicted scores.
            ),
            depends_on=[
                self.model_step,
                self.preprocessing_step_calib,
            ],
        )

    ## 1 ===== the old architecture using SKLearnProcessor =====
    def create_pctlscore_step(self):
        calibration_output_location = AtoZRegionalXGBoostModel.CALIBRATION_SCORES_LOCATION
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            volume_size_in_gb=self.processing_volume_size,
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Percentile_score_calculation",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "percentile_score_mapping.py"),
            inputs=[
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
                ProcessingInput(
                    source=calibration_output_location,
                    destination="/opt/ml/processing/input/data",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="output",
                    source="/opt/ml/processing/output",
                    destination=artifacts_location,
                ),
            ],
            depends_on=[
                self.calib_scoring_step,
            ],
            job_arguments=[
                "--n_workers",
                "16",
                "--score_column",
                "0",
            ],
        )

    ########################################
    ### Packaging step

    def create_mims_packaging_step(self):
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION
        inference_script_location = str(Path(__file__).parent / "online_inference_src")
        output_location = AtoZRegionalXGBoostModel.PROD_LOCATION

        data_output_path = "/opt/ml/processing/output"
        processor = SKLearnProcessor(
            framework_version="1.2-1",
            role=self.execution_role,
            volume_size_in_gb=self.processing_volume_size,
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            sagemaker_session=self.sagemaker_session,
        )

        return ProcessingStep(
            name="Package",
            processor=processor,
            code=str(Path(__file__).parent / "scripts" / "mims_package.py"),
            inputs=[
                ProcessingInput(
                    source=self.model_data,
                    destination="/opt/ml/processing/input/model",
                ),
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
                ProcessingInput(
                    source=inference_script_location,
                    destination="/opt/ml/processing/input/script",
                ),
            ],
            outputs=[
                ProcessingOutput(
                    output_name="output",
                    source=data_output_path,
                    destination=output_location,
                )
            ],
            depends_on=[
                self.pctlscore_step,
            ],
        )

    def create_model_evaluation_step(self):
        # Setup predefined model evaluation step that takes output of
        # model monitoring step and generates the model performance testing info
        # which is then used in registering model to MIMS
        model_evaluation_processor = ModelPerformanceEvaluationProcessor(
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
        )
        constraints_input_dest = PROCESSOR_DIRECTORY_ROOT + "/input/constraints"
        statistics_input_dest = PROCESSOR_DIRECTORY_ROOT + "/input/statistics"
        violations_input_dest = PROCESSOR_DIRECTORY_ROOT + "/input/violations"
        model_evaluation_processor.set_processing_job_inputs(
            inputs=[
                # https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html
                ProcessingInput(
                    source=Join(
                        on="/",
                        values=[
                            AtoZRegionalXGBoostModel.TESTING_EVALUATION_LOCATION,
                            "constraints.json",
                        ],
                    ),
                    destination=constraints_input_dest,
                ),
                # https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-statistics.html
                ProcessingInput(
                    source=Join(
                        on="/",
                        values=[
                            AtoZRegionalXGBoostModel.TESTING_EVALUATION_LOCATION,
                            "statistics.json",
                        ],
                    ),
                    destination=statistics_input_dest,
                ),
                # https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-violations.html
                ProcessingInput(
                    source=Join(
                        on="/",
                        values=[
                            AtoZRegionalXGBoostModel.TESTING_EVALUATION_LOCATION,
                            "constraint_violations.json",
                        ],
                    ),
                    destination=violations_input_dest,
                ),
            ],
            is_pipeline_execution=True,
        )
        model_evaluation_step = ModelPerformanceEvaluationStep(
            step_name="ModelPerformanceEvaluation",
            role=self.execution_role,
            sagemaker_session=self.sagemaker_session,
            processor=model_evaluation_processor,
            depends_on=[self.test_evaluation_step],
        )
        return model_evaluation_step

    def create_generate_payload_step(self):
        artifacts_location = AtoZRegionalXGBoostModel.ARTIFACT_LOCATION

        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            volume_size_in_gb=self.processing_volume_size,
            instance_type=self.processing_sagemaker_instance_type_small,
            instance_count=self.processing_instance_count,
            role=self.sagemaker_session.get_caller_identity_arn(),
            sagemaker_session=self.sagemaker_session,
            #             output_kms_key=KMS_ENCRYPTION_KEY_PARAM,
            #             volume_kms_key=KMS_ENCRYPTION_KEY_PARAM,
            #             network_config=NetworkConfig(
            #                 enable_network_isolation=True,
            #                 security_group_ids=[SECURITY_GROUP_ID],
            #                 subnets=[VPC_SUBNET],
            #                 encrypt_inter_container_traffic=True,
            #             ),
        )

        return ProcessingStep(
            name="create_payload",
            processor=sklearn_processor,
            code=str(Path(__file__).parent / "scripts" / "create_payload.py"),
            inputs=[
                ProcessingInput(
                    source=artifacts_location,
                    destination="/opt/ml/processing/input/artifacts",
                ),
            ],
            outputs=[
                ProcessingOutput(output_name="output", source="/opt/ml/processing/mims_payload"),
            ],
            depends_on=[
                self.mims_packaging_step,
            ],
        )
``` 

### DAG to Template Compiler

In [83]:
from src.cursus.steps.registry.step_names import STEP_NAMES
from src.cursus.api.dag.base_dag import PipelineDAG
from src.cursus.core.compiler.dag_compiler import compile_dag_to_pipeline, PipelineDAGCompiler
from src.cursus.core.compiler.validation import ConversionReport
from src.cursus.steps.configs.utils import load_configs

In [84]:
def create_xgboost_pipeline_dag() -> PipelineDAG:
    """
    Create the DAG structure for the XGBoost train-calibrate-evaluate pipeline.
    
    This is extracted from the _create_pipeline_dag method in the
    XGBoostTrainCalibrateEvaluateE2ETemplate class.
    
    Returns:
        PipelineDAG: The directed acyclic graph representing the pipeline
    """
    dag = PipelineDAG()
    
    # Add all nodes - renamed to match configuration names exactly
    dag.add_node("CradleDataLoading_training")    # Data load for training
    dag.add_node("TabularPreprocessing_training") # Tabular preprocessing for training
    dag.add_node("XGBoostTraining")              # XGBoost training step
    dag.add_node("ModelCalibration")             # Model calibration step
    dag.add_node("Package")                      # Package step
    dag.add_node("Registration")                 # MIMS registration step
    dag.add_node("Payload")                      # Payload step
    dag.add_node("CradleDataLoading_calibration") # Data load for calibration
    dag.add_node("TabularPreprocessing_calibration") # Tabular preprocessing for calibration
    dag.add_node("XGBoostModelEval_calibration")     # Model evaluation step
    
    # Training flow
    dag.add_edge("CradleDataLoading_training", "TabularPreprocessing_training")
    dag.add_edge("TabularPreprocessing_training", "XGBoostTraining")
    dag.add_edge("XGBoostTraining", "ModelCalibration")
    
    # Output flow
    dag.add_edge("ModelCalibration", "Package")
    dag.add_edge("XGBoostTraining", "Package")  # Raw model is also input to packaging
    dag.add_edge("XGBoostTraining", "Payload")  # Payload test uses the raw model
    dag.add_edge("Package", "Registration")
    dag.add_edge("Payload", "Registration")
    
    # Calibration flow
    dag.add_edge("CradleDataLoading_calibration", "TabularPreprocessing_calibration")
    
    # Evaluation flow
    dag.add_edge("XGBoostTraining", "XGBoostModelEval_calibration")
    dag.add_edge("TabularPreprocessing_calibration", "XGBoostModelEval_calibration")
    
    logger.info(f"Created DAG with {len(dag.nodes)} nodes and {len(dag.edges)} edges")
    return dag

In [85]:
dag = create_xgboost_pipeline_dag()

2025-08-20 06:29:39,429 - INFO - Added node: CradleDataLoading_training
2025-08-20 06:29:39,429 - INFO - Added node: TabularPreprocessing_training
2025-08-20 06:29:39,429 - INFO - Added node: XGBoostTraining
2025-08-20 06:29:39,430 - INFO - Added node: ModelCalibration
2025-08-20 06:29:39,430 - INFO - Added node: Package
2025-08-20 06:29:39,431 - INFO - Added node: Registration
2025-08-20 06:29:39,431 - INFO - Added node: Payload
2025-08-20 06:29:39,431 - INFO - Added node: CradleDataLoading_calibration
2025-08-20 06:29:39,432 - INFO - Added node: TabularPreprocessing_calibration
2025-08-20 06:29:39,432 - INFO - Added node: XGBoostModelEval_calibration
2025-08-20 06:29:39,433 - INFO - Added edge: CradleDataLoading_training -> TabularPreprocessing_training
2025-08-20 06:29:39,433 - INFO - Added edge: TabularPreprocessing_training -> XGBoostTraining
2025-08-20 06:29:39,433 - INFO - Added edge: XGBoostTraining -> ModelCalibration
2025-08-20 06:29:39,434 - INFO - Added edge: ModelCalibrati

In [86]:
dag_compiler = PipelineDAGCompiler(
        config_path=config_path,
        sagemaker_session=pipeline_session,
        role=role
    )

#### DAG Validation and Preview of Config Resolution

In [87]:
preview_only = True

In [88]:
if preview_only:
    preview = dag_compiler.preview_resolution(dag)
    logger.info("DAG node resolution preview:")
    for node, config_type in preview.node_config_map.items():
        confidence = preview.resolution_confidence.get(node, 0.0)
        logger.info(f"  {node} → {config_type} (confidence: {confidence:.2f})")
        
    if preview.recommendations:
        logger.info("Recommendations:")
        for recommendation in preview.recommendations:
            logger.info(f"  - {recommendation}")
        
    validation = dag_compiler.validate_dag_compatibility(dag)
    logger.info(f"DAG validation: {'VALID' if validation.is_valid else 'INVALID'}")
    if not validation.is_valid:
        if validation.missing_configs:
            logger.warning(f"Missing configs: {validation.missing_configs}")
        if validation.unresolvable_builders:
            logger.warning(f"Unresolvable builders: {validation.unresolvable_builders}")
        if validation.config_errors:
            logger.warning(f"Config errors: {validation.config_errors}")

2025-08-20 06:29:39,453 - INFO - Previewing resolution for 10 DAG nodes
2025-08-20 06:29:39,453 - INFO - Creating template for DAG with 10 nodes
2025-08-20 06:29:39,454 - INFO - Detected 10 required config classes in configuration file
2025-08-20 06:29:39,455 - INFO - Successfully loaded 10 of 10 required classes
2025-08-20 06:29:39,455 - INFO - Loading configs from: /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:29:39,457 - INFO - Loading configs from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:29:39,458 - INFO - Loading configuration from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: CradleJobSpecificationConfig()
Reference path: 
This creates a cycle in 

### Put it Together: Pipeline Generation from DAG

In [89]:
# Convert DAG to pipeline and get report
try:
    logger.info(f"Converting DAG to pipeline")
    template_pipeline, report = dag_compiler.compile_with_report(
        dag=dag
    )
        
    # Log report summary
    logger.info(f"Conversion complete: {report.summary()}")
    for node, details in report.resolution_details.items():
        logger.info(f"  {node} → {details['config_type']} ({details['builder_type']})")
        
    # Log pipeline creation details
    logger.info(f"Pipeline '{template_pipeline.name}' created successfully")
    logger.info(f"Pipeline ARN: {template_pipeline.arn if hasattr(template_pipeline, 'arn') else 'Not available until upserted'}")
    logger.info("To upsert the pipeline, call pipeline.upsert()")       
except Exception as e:
    logger.error(f"Failed to convert DAG to pipeline: {e}")
    raise

2025-08-20 06:29:39,565 - INFO - Converting DAG to pipeline
2025-08-20 06:29:39,565 - INFO - Compiling DAG with detailed reporting
2025-08-20 06:29:39,566 - INFO - Compiling DAG with 10 nodes to pipeline
2025-08-20 06:29:39,566 - INFO - Creating template for DAG with 10 nodes
2025-08-20 06:29:39,567 - INFO - Loading configs from: /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:29:39,569 - INFO - Loading configs from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:29:39,569 - INFO - Loading configuration from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: CradleJobSpecificationConfig()
Reference path: 
This creates a cycle in the object graph.
Object: CradleJobSpe

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:40,670 - INFO - Created CradleDataLoadingStep with name: CradleDataLoading-Training
2025-08-20 06:29:40,671 - INFO - CradleDataLoadingStep output locations: {...}
2025-08-20 06:29:40,671 - INFO - Built step CradleDataLoading_training
2025-08-20 06:29:40,674 - INFO - Stored Cradle data loading request for step: CradleDataLoading-Training
2025-08-20 06:29:40,674 - INFO - Creating CradleDataLoadingStep...


sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:41,690 - INFO - Created CradleDataLoadingStep with name: CradleDataLoading-Calibration
2025-08-20 06:29:41,690 - INFO - CradleDataLoadingStep output locations: {...}
2025-08-20 06:29:41,691 - INFO - Built step CradleDataLoading_calibration
2025-08-20 06:29:41,693 - INFO - Stored Cradle data loading request for step: CradleDataLoading-Calibration
2025-08-20 06:29:41,694 - INFO - Registered specification for step 'TabularPreprocessingStepStep' of type 'TabularPreprocessing_Training' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:41,694 - INFO - Registered specification for step 'CradleDataLoading-Training' of type 'CradleDataLoading_Training' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:41,695 - INFO - Best match for DATA: CradleDataLoading-Training.DATA (confidence: 1.000)
2025-08-20 06:29:41,695 - INFO - Resolved TabularPreprocessingStepStep.DATA -> CradleDataLoading-Training.DATA
2025-08-20 06:29:41,697 - INFO - Defaulting to only available Python vers

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:42,725 - INFO - Setting job_type argument to: training
2025-08-20 06:29:42,726 - INFO - Built step TabularPreprocessing_training
2025-08-20 06:29:42,726 - INFO - Registered specification for step 'TabularPreprocessingStepStep' of type 'TabularPreprocessing_Calibration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:42,727 - INFO - Registered specification for step 'CradleDataLoading-Calibration' of type 'CradleDataLoading_Calibration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:42,727 - INFO - Best match for DATA: CradleDataLoading-Calibration.DATA (confidence: 1.000)
2025-08-20 06:29:42,728 - INFO - Resolved TabularPreprocessingStepStep.DATA -> CradleDataLoading-Calibration.DATA
2025-08-20 06:29:42,729 - INFO - Defaulting to only available Python version: py3


sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:43,759 - INFO - Setting job_type argument to: calibration
2025-08-20 06:29:43,760 - INFO - Built step TabularPreprocessing_calibration
2025-08-20 06:29:43,761 - INFO - Creating XGBoost TrainingStep...
2025-08-20 06:29:43,761 - INFO - Registered specification for step 'XGBoostTrainingStepStep' of type 'XGBoostTraining' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:43,762 - INFO - Registered specification for step 'TabularPreprocessing-Training' of type 'TabularPreprocessing_Training' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:43,762 - INFO - Best match for input_path: TabularPreprocessing-Training.processed_data (confidence: 0.814)
2025-08-20 06:29:43,763 - INFO - Resolved XGBoostTrainingStepStep.input_path -> TabularPreprocessing-Training.processed_data
2025-08-20 06:29:43,763 - INFO - Optional dependency not resolved: XGBoostTrainingStepStep.hyperparameters_s3_uri
2025-08-20 06:29:43,764 - INFO - Created hyperparameters JSON file at /tmp/tmp22d3b40w

sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.OutputDataConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.ResourceConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds


2025-08-20 06:29:45,248 - INFO - Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
2025-08-20 06:29:45,315 - INFO - Ignoring unnecessary Python version: py3.
2025-08-20 06:29:45,333 - INFO - Ignoring unnecessary instance type: ml.m5.12xlarge.
2025-08-20 06:29:45,337 - INFO - Created TrainingStep with name: XGBoostTraining
2025-08-20 06:29:45,337 - INFO - Built step XGBoostTraining
2025-08-20 06:29:45,338 - INFO - Creating ModelCalibration ProcessingStep...
2025-08-20 06:29:45,338 - INFO - Registered specification for step 'ModelCalibrationStepStep' of type 'ModelCalibration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:45,338 - INFO - Registered specification for step 'XGBoostTraining' of type 'XGBoostTraining' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:45,339 - INFO - Best match for evaluation_data: XGBoostTraining.evaluation_output (confidence: 1.000)
2025-08-20 06:29:45,340 - INFO - Resolved ModelCalibrationStepStep.evaluation_data -> XGBoostTr

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:46,373 - INFO - No command-line arguments needed for calibration script
2025-08-20 06:29:46,374 - INFO - Created ProcessingStep with name: ModelCalibration
2025-08-20 06:29:46,374 - INFO - Built step ModelCalibration
2025-08-20 06:29:46,374 - INFO - Creating MIMS Payload ProcessingStep...
2025-08-20 06:29:46,375 - INFO - Registered specification for step 'PayloadStepStep' of type 'Payload' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:46,375 - INFO - Registered specification for step 'XGBoostTraining' of type 'XGBoostTraining' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:46,376 - INFO - Best match for model_input: XGBoostTraining.model_output (confidence: 1.000)
2025-08-20 06:29:46,377 - INFO - Resolved PayloadStepStep.model_input -> XGBoostTraining.model_output
2025-08-20 06:29:46,377 - INFO - Payload environment variables: {...}
2025-08-20 06:29:46,378 - INFO - Defaulting to only available Python version: py3


sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:47,415 - INFO - No command-line arguments needed for payload script
2025-08-20 06:29:47,416 - INFO - Created ProcessingStep with name: Payload
2025-08-20 06:29:47,416 - INFO - Built step Payload
2025-08-20 06:29:47,417 - INFO - Creating XGBoostModelEval ProcessingStep...
2025-08-20 06:29:47,417 - INFO - Registered specification for step 'XGBoostModelEvalStepStep' of type 'XGBoostModelEval' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:47,418 - INFO - Registered specification for step 'XGBoostTraining' of type 'XGBoostTraining' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:47,418 - INFO - Registered specification for step 'TabularPreprocessing-Calibration' of type 'TabularPreprocessing_Calibration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:47,419 - INFO - Best match for model_input: XGBoostTraining.model_output (confidence: 1.000)
2025-08-20 06:29:47,420 - INFO - Resolved XGBoostModelEvalStepStep.model_input -> XGBoostTraining.model_output
20

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.ResourceConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.Environment


2025-08-20 06:29:49,719 - INFO - Ignoring unnecessary Python version: py3.
2025-08-20 06:29:49,736 - INFO - Ignoring unnecessary instance type: ml.m5.12xlarge.
2025-08-20 06:29:49,737 - INFO - Setting job_type argument to: calibration
2025-08-20 06:29:49,739 - INFO - Created ProcessingStep with name: XGBoostModelEval-Calibration
2025-08-20 06:29:49,739 - INFO - Built step XGBoostModelEval_calibration
2025-08-20 06:29:49,740 - INFO - Creating Packaging ProcessingStep...
2025-08-20 06:29:49,740 - INFO - Registered specification for step 'PackageStepStep' of type 'Package' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:49,741 - INFO - Registered specification for step 'ModelCalibration' of type 'ModelCalibration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:49,741 - INFO - Registered specification for step 'XGBoostTraining' of type 'XGBoostTraining' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:49,742 - INFO - Best match for model_input: XGBoostTraining.model_outpu

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds


2025-08-20 06:29:50,786 - INFO - [PACKAGING INPUT OVERRIDE] Using local inference scripts path from configuration: /home/ec2-user/SageMaker/Cursus/dockers/xgboost_atoz
2025-08-20 06:29:50,786 - INFO - [PACKAGING INPUT OVERRIDE] This local path will be used regardless of any dependency-resolved values
2025-08-20 06:29:50,787 - INFO - Added inference scripts input with local path: /home/ec2-user/SageMaker/Cursus/dockers/xgboost_atoz -> /opt/ml/processing/input/script
2025-08-20 06:29:50,787 - INFO - No command-line arguments needed for packaging script
2025-08-20 06:29:50,788 - INFO - Created ProcessingStep with name: Package
2025-08-20 06:29:50,788 - INFO - Built step Package
2025-08-20 06:29:50,789 - INFO - Creating MimsModelRegistrationProcessingStep...
2025-08-20 06:29:50,789 - INFO - Registered specification for step 'RegistrationStepStep' of type 'Registration' in context 'lukexie-AtoZ-xgboost-NA'
2025-08-20 06:29:50,790 - INFO - Registered specification for step 'Package' of type 

sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingOutputConfig.KmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.ProcessingResources.ClusterConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.NetworkConfig.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.ProcessingJob.Environment


2025-08-20 06:29:51,815 - INFO - Created MimsModelRegistrationProcessingStep: Registration-NA
2025-08-20 06:29:51,816 - INFO - Built step Registration
2025-08-20 06:29:51,821 - INFO - Generated pipeline lukexie-AtoZ-xgboost-NA-2-0-0-pipeline with 10 steps in 12.18 seconds
2025-08-20 06:29:51,821 - INFO - Stored 2 Cradle loading requests
2025-08-20 06:29:51,821 - INFO - Found registration step: Registration
2025-08-20 06:29:51,822 - INFO - Ignoring unnecessary Python version: py3.
2025-08-20 06:29:51,838 - INFO - Ignoring unnecessary instance type: ml.m5.large.
2025-08-20 06:29:51,838 - INFO - Retrieved image URI: 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.7-1
2025-08-20 06:29:51,839 - INFO - Stored execution doc config for registration step: Registration-NA
2025-08-20 06:29:51,839 - INFO - Pipeline name 'lukexie-AtoZ-xgboost-NA-2.0.0-pipeline' sanitized to 'lukexie-AtoZ-xgboost-NA-2-0-0-pipeline' to conform to SageMaker constraints
2025-08-20 06:29:51,839 - INFO -

### Pipeline Template

After the pipeline is generated, we can retrieve the pipeline template

In [90]:
pipeline_template_builder = dag_compiler.get_last_template()

## [SKIP] Convert Pipeline to MODS Pipeline

We can also generate MODS Pipeline instead of general SageMaker Pipeline by decorating the `DynamicPipelineTemplate` with MODS Template 

### MODS Template Head

In [91]:
AUTHOR           = base_config.author
PIPELINE_DESC    = base_config.pipeline_description
PIPELINE_VERSION = base_config.pipeline_version

In [92]:
AUTHOR

'lukexie'

In [93]:
PIPELINE_DESC

'AtoZ xgboost Model NA'

In [94]:
PIPELINE_VERSION

'2.0.0'

### MODSPipelineDAGCompiler

In [95]:
from src.cursus.mods.compiler.mods_dag_compiler import MODSPipelineDAGCompiler

In [97]:
mods_dag_compiler = MODSPipelineDAGCompiler(
        config_path=config_path,
        sagemaker_session=pipeline_session,
        role=role
    )

`MODSPipelineDAGCompiler` can create the **MODS Template Decorated Class**

```python
MODSDecoratedTemplate = MODSTemplate(
                author=author,
                version=version,
                description=description
            )(DynamicPipelineTemplate)
```


In [98]:
mods_pipeline_template_class = mods_dag_compiler.create_decorated_class(dag,
                                                               AUTHOR,
                                                               PIPELINE_VERSION,
                                                               PIPELINE_DESC
                                                            )

2025-08-20 06:30:09,867 - INFO - Creating MODS decorated template class with metadata: author=lukexie, version=2.0.0, description=AtoZ xgboost Model NA


In [100]:
mods_pipeline_template = mods_dag_compiler.create_template(dag,
                                                           author=AUTHOR,
                                                           version=PIPELINE_VERSION,
                                                           description=PIPELINE_DESC
                                                          )

2025-08-20 06:31:10,903 - INFO - Creating MODS template for DAG with 10 nodes
2025-08-20 06:31:10,905 - INFO - Creating MODS decorated template class with metadata: author=lukexie, version=2.0.0, description=AtoZ xgboost Model NA
2025-08-20 06:31:10,906 - INFO - Loading configs from: /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:31:10,908 - INFO - Loading configs from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
2025-08-20 06:31:10,908 - INFO - Loading configuration from /home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/config_NA_xgboost_AtoZ.json
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.config_cradle_data_loading_step
Field: unknown
Original definition path: CradleJobSpecificationConfig()
Reference path: 
This creates a cycle in the object graph.
Object: CradleJobSpecificationConfig in src.cursus.steps.configs.co

In [101]:
type(mods_pipeline_template)

mods.mods_template.MODSTemplate.__call__.<locals>.Wrapped

```python
# Convert DAG to pipeline and get report
try:
    logger.info(f"Converting DAG to pipeline")
    template_pipeline, report = mods_dag_compiler.compile_with_report(
        dag=dag
    )
        
    # Log report summary
    logger.info(f"Conversion complete: {report.summary()}")
    for node, details in report.resolution_details.items():
        logger.info(f"  {node} → {details['config_type']} ({details['builder_type']})")
        
    # Log pipeline creation details
    logger.info(f"Pipeline '{template_pipeline.name}' created successfully")
    logger.info(f"Pipeline ARN: {template_pipeline.arn if hasattr(template_pipeline, 'arn') else 'Not available until upserted'}")
    logger.info("To upsert the pipeline, call pipeline.upsert()")       
except Exception as e:
    logger.error(f"Failed to convert DAG to pipeline: {e}")
    raise
```

In [96]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(pipeline_template_builder)

In [97]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostTrainCalibrateEvaluateE2ETemplate)

In [98]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostTrainCalibrateTemplate)

In [99]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostTrainEvaluateE2ETemplate)

In [100]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostTrainEvaluateNoRegistrationTemplate)

In [101]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostSimpleTemplate)

In [102]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(XGBoostDataloadPreprocessTemplate)

In [103]:
#MODSPipelineBuilder = MODSTemplate(author=base_config.author, description=base_config.pipeline_description, version=base_config.pipeline_version)(CradleOnlyTemplate)

## Prepare for Execution Document

In [104]:
from mods_workflow_helper.sagemaker_pipeline_helper import SagemakerPipelineHelper, SecurityConfig

In [105]:
default_execution_doc = SagemakerPipelineHelper.get_pipeline_default_execution_document(template_pipeline)
test_execution_doc = default_execution_doc

In [106]:
print(json.dumps(test_execution_doc, indent=2))

{
  "PIPELINE_STEP_CONFIGS": {
    "CradleDataLoading-Training": {
      "STEP_CONFIG": {
        "dataSources": {
          "dataSources": [
            {
              "dataSourceName": "RAW_MDS",
              "dataSourceType": "MDS",
              "mdsDataSourceProperties": {
                "serviceName": "MDS_DATA_SET_NAME_IN_STRING",
                "orgId": "MDS_ORG_UNIT_IN_STRING",
                "region": "MDS_REGION/NA/EU/FE",
                "useHourlyEdxDataSet": false,
                "outputSchema": [
                  {
                    "fieldName": "OUTPUT_FIELD_NAME",
                    "fieldType": "STRING"
                  },
                  {
                    "fieldName": "orderId",
                    "fieldType": "STRING"
                  }
                ]
              }
            },
            {
              "dataSourceName": "TAGS",
              "dataSourceType": "ANDES",
              "andesDataSourceProperties": {
                "provider

### Fill in Execution Doc

In [107]:
#execution_doc_fill = pipeline_builder.fill_execution_document(test_execution_doc)

In [108]:
# Fill in the execution document using the stored requests
#execution_doc_fill_2 = xgboost_train_eval_pipeline_template_builder.fill_execution_document(test_execution_doc)

In [109]:
execution_doc_fill = pipeline_template_builder.fill_execution_document(test_execution_doc)

2025-08-15 23:30:18,848 - INFO - Updated execution config for Cradle step: CradleDataLoading-Training
2025-08-15 23:30:18,848 - INFO - Updated execution config for Cradle step: CradleDataLoading-Calibration
2025-08-15 23:30:18,848 - INFO - Found registration step by config type: Registration
2025-08-15 23:30:18,849 - INFO - Found registration configuration: RegistrationConfig
2025-08-15 23:30:18,849 - INFO - Updated execution config for registration step: Registration-NA


In [110]:
print(json.dumps(execution_doc_fill, indent=2))

{
  "PIPELINE_STEP_CONFIGS": {
    "CradleDataLoading-Training": {
      "STEP_CONFIG": {
        "dataSources": {
          "dataSources": [
            {
              "dataSourceName": "RAW_MDS_NA",
              "dataSourceType": "MDS",
              "mdsDataSourceProperties": {
                "serviceName": "AtoZ",
                "orgId": "0",
                "region": "NA",
                "useHourlyEdxDataSet": false,
                "outputSchema": [
                  {
                    "fieldName": "Abuse.mfn_categorized_refunds_si_by_customer_marketplace.n_mfn_diff_refunds_si_365_days",
                    "fieldType": "STRING"
                  },
                  {
                    "fieldName": "Abuse.mfn_refunds_si_by_customer_marketplace.n_mfn_refund_amount_si_last_365_days",
                    "fieldType": "STRING"
                  },
                  {
                    "fieldName": "Abuse.completed_afn_orders_by_customer_marketplace.n_afn_unit_amount_last

In [111]:
test_execution_doc = execution_doc_fill.copy()

### Save Execution Doc locally

In [112]:
exe_doc_json_filename = f"execute_doc_{base_config.pipeline_name}_{base_config.pipeline_version}.json"
exe_doc_file_path =  config_dir / exe_doc_json_filename
exe_doc_file_path

PosixPath('/home/ec2-user/SageMaker/Cursus/pipeline_config/config_NA_xgboost_AtoZ_v2/execute_doc_lukexie-AtoZ-xgboost-NA_2.0.0.json')

In [113]:
with open(exe_doc_file_path, 'w') as f:
    json.dump(test_execution_doc, f, indent=2)

## Execute Pipeline

### Start Execution

In [114]:
from mods_workflow_helper.sagemaker_pipeline_helper import SagemakerPipelineHelper

In [115]:
security_config

<mods_workflow_helper.sagemaker_pipeline_helper.SecurityConfig at 0x7fd9bec72020>

In [116]:
template_pipeline

<sagemaker.workflow.pipeline.Pipeline at 0x7fd9b5543160>

In [117]:
SagemakerPipelineHelper.start_pipeline_execution(
    pipeline=template_pipeline,
    secure_config=security_config,
    sagemaker_session=pipeline_session,
    preparation_space_local_root="/tmp",
    pipeline_execution_document=test_execution_doc
)

2025-08-15 23:30:48,931 - INFO - Apply execution document provided config {'dataSources': {'dataSources': [{'dataSourceName': 'RAW_MDS_NA', 'dataSourceType': 'MDS', 'mdsDataSourceProperties': {'serviceName': 'AtoZ', 'orgId': '0', 'region': 'NA', 'useHourlyEdxDataSet': False, 'outputSchema': [{'fieldName': 'Abuse.mfn_categorized_refunds_si_by_customer_marketplace.n_mfn_diff_refunds_si_365_days', 'fieldType': 'STRING'}, {'fieldName': 'Abuse.mfn_refunds_si_by_customer_marketplace.n_mfn_refund_amount_si_last_365_days', 'fieldType': 'STRING'}, {'fieldName': 'Abuse.completed_afn_orders_by_customer_marketplace.n_afn_unit_amount_last_365_days', 'fieldType': 'STRING'}, {'fieldName': 'claimantInfo_pendingClaimCount', 'fieldType': 'STRING'}, {'fieldName': 'transactionDate', 'fieldType': 'STRING'}, {'fieldName': 'claimantInfo_lifetimeClaimCount', 'fieldType': 'STRING'}, {'fieldName': 'Abuse.mfn_a2z_claims_by_customer_na.n_mfn_diff_claims_amount_last_365_days', 'fieldType': 'STRING'}, {'fieldName':

sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.ResourceConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.Environment


2025-08-15 23:30:51,937 - INFO - Uploaded /home/ec2-user/SageMaker/Cursus/dockers/xgboost_atoz to s3://sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um/lukexie-AtoZ-xgboost-NA-2-0-0-pipeline/code/f928bb8e661265ce6d78490db9e6d8721008d778859e49c6af0fd4a7504b570c/sourcedir.tar.gz
2025-08-15 23:30:51,992 - INFO - runproc.sh uploaded to s3://sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um/lukexie-AtoZ-xgboost-NA-2-0-0-pipeline/code/790e693585d4b9d76a16083f19c002a6eb4f97b3633a7cf730a990518d606c80/runproc.sh
2025-08-15 23:30:54,822 - INFO - Add currentOwnerAlias tag to the request for operation: CreatePipeline.
2025-08-15 23:30:54,823 - INFO - A creation operation CreatePipeline is detected. Apply owner tag to the request.


sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.ResourceConfig.VolumeKmsKeyId
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.Environment


2025-08-15 23:30:57,193 - INFO - Uploaded /home/ec2-user/SageMaker/Cursus/dockers/xgboost_atoz to s3://sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um/lukexie-AtoZ-xgboost-NA-2-0-0-pipeline/code/f928bb8e661265ce6d78490db9e6d8721008d778859e49c6af0fd4a7504b570c/sourcedir.tar.gz
2025-08-15 23:30:57,262 - INFO - runproc.sh uploaded to s3://sandboxdependency-abuse-secureaisandboxteamshare-1l77v9am252um/lukexie-AtoZ-xgboost-NA-2-0-0-pipeline/code/790e693585d4b9d76a16083f19c002a6eb4f97b3633a7cf730a990518d606c80/runproc.sh


_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:601857636239:pipeline/lukexie-atoz-xgboost-na-2-0-0-pipeline/execution/9u3eol451fy3', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x7fd9bc9c3880>)

### Handle the CTI of MMS

```python
from secure_ai_sandbox_python_lib.session import Session

############ Initialize sandbox_session
sandbox_session = Session(session_folder='/tmp/temp_folder', retail_region='NA')

############# Create the MIMS resource
mims = sandbox_session.resource('MIMSModelRegistrar')

model_domain='AtoZ' # The registered Domain
model_objective='ContactRiskPDAModelNA'
cti_category='IT Dev'
cti_type='Abuse Prevention'
cti_item='APSAnalytics'

response = mims.update_objective_cti(model_domain=model_domain,
                                     model_objective=model_objective,
                                     cti_category=cti_category,
                                     cti_type=cti_type,
                                     cti_item=cti_item)
print(response)
```