# Setup

In [1]:
import os
from time import gmtime, strftime

import boto3
import sagemaker

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterFloat,
)
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import (
    ConditionLessThanOrEqualTo,
    ConditionGreaterThanOrEqualTo,
)
# from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.functions import Join


from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost import XGBoost
from sagemaker.model_metrics import ModelMetrics, MetricsSource

from sagemaker.debugger import ProfilerConfig, ProfilerRule, rule_configs
from sagemaker import image_uris





# --------------------------------------------------------------------
# 0. Basic setup
# --------------------------------------------------------------------
region = boto3.Session().region_name
pipeline_session = PipelineSession()
role = sagemaker.get_execution_role()

default_bucket = sagemaker.Session().default_bucket()
print("Region:", region)
print("Default bucket:", default_bucket)
print("Role:", role)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
Region: us-east-1
Default bucket: sagemaker-us-east-1-423623839320
Role: arn:aws:iam::423623839320:role/service-role/SageMaker-ExecutionRole-20250705T232334


# 1. Pipeline parameters (override when starting the pipeline)

In [2]:
# --------------------------------------------------------------------
# 1. Pipeline parameters (override when starting the pipeline)
# --------------------------------------------------------------------
athena_database = ParameterString(
    name="AthenaDatabase",
    default_value="retail_demand",         # <- change to your DB
)

athena_table = ParameterString(
    name="AthenaTable",
    default_value="demand_product",      # <- change to your table
)

athena_query = ParameterString(
    name="AthenaQuery",
    default_value="""SELECT * FROM "retail_demand"."demand_product" """,  # empty -> script will default to SELECT * FROM <table>
)

athena_workgroup = ParameterString(
    name="AthenaWorkgroup",
    default_value="primary",
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="retail-demand-feature-group-3",
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value=f"s3://{default_bucket}/feature-store/retail-demand/offline-store",
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.m5.xlarge",
)

rmse_threshold = ParameterFloat(
    name="RmseThreshold",
    default_value=40.0,      # adjust!
)

r2_threshold = ParameterFloat(
    name="R2Threshold",
    default_value=0.5,       # adjust!
)

mlflow_tracking_server_arn = ParameterString(
    name="MLflowTrackingServerArn",
    default_value="arn:aws:sagemaker:us-east-1:423623839320:mlflow-tracking-server/tracking-server-demo",
)

mlflow_experiment_name = ParameterString(
    name="MLflowExperimentName",
    default_value="forcasting_demand_product",
)

model_package_group_name = ParameterString(
    name="ModelPackageGroupName",
    default_value="retail-demand-model-group",
)

monitor_base_s3 = ParameterString(
    name="MonitorBaseS3",
    default_value=f"s3://{default_bucket}/monitor_retail_demand",
)

monitor_athena_db = ParameterString(
    name="MonitorAthenaDBName",
    default_value="monitor_retail_demand",
)

# 2. Step: Preprocess ‚Äì SKLearnProcessor + preprocess-scikit-retail-feature-store.py
    - Reads from Athena (via script args)
    - Writes train/validation/test to S3
    - Ingests features to Feature Store (inside script)


In [3]:
# --------------------------------------------------------------------
# 2. Step: Preprocess ‚Äì SKLearnProcessor + preprocess-scikit-retail-feature-store.py
#    - Reads from Athena (via script args)
#    - Writes train/validation/test to S3
#    - Ingests features to Feature Store (inside script)
# --------------------------------------------------------------------
sk_proc = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    env={"AWS_DEFAULT_REGION": region},
    sagemaker_session=pipeline_session,
)

preprocess_step = ProcessingStep(
    name="PreprocessRetailDemand",
    processor=sk_proc,
    code="preprocess-scikit-retail-feature-store.py",
    inputs=[],  # script will pull from Athena itself
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/retail_product/train",
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/output/retail_product/validation",
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/output/retail_product/test",
        ),
    ],
    job_arguments=[
        "--input-data", "/opt/ml/processing/input/data",
        "--output-data", "/opt/ml/processing/output/retail_product",
        "--train-split-percentage", "0.9",
        "--validation-split-percentage", "0.05",
        "--test-split-percentage", "0.05",
        "--balance-dataset", "True",
        "--feature-store-offline-prefix", feature_store_offline_prefix,
        "--feature-group-name", feature_group_name,
        "--athena-database", athena_database,
        "--athena-table", athena_table,
        "--athena-query", athena_query,
        "--athena-workgroup", athena_workgroup,
    ],
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# 3. Step: Train ‚Äì XGBoost Estimator + training.py


In [4]:
# --------------------------------------------------------------------
# 3. Step: Train ‚Äì XGBoost Estimator + training.py
# --------------------------------------------------------------------

"""
#====================================================
# ‡∏ñ‡πâ‡∏≤‡πÄ‡∏õ‡πá‡∏ô dev / tuning phase ‚Üí ‡πÄ‡∏õ‡∏¥‡∏î Profiler ‡πÑ‡∏ß‡πâ‡∏î‡∏µ‡∏°‡∏≤‡∏Å ‡∏à‡∏∞‡πÄ‡∏´‡πá‡∏ô CPU/GPU/IO/thread ‡πÉ‡∏ä‡πâ‡∏¢‡∏±‡∏á‡πÑ‡∏á
# ‡∏ñ‡πâ‡∏≤‡πÄ‡∏õ‡πá‡∏ô prod pipeline ‡∏£‡∏±‡∏ô‡∏ó‡∏∏‡∏Å‡∏ß‡∏±‡∏ô ‚Üí Profiler ‡∏ó‡∏∏‡∏Å run ‡∏à‡∏∞‡∏Å‡∏¥‡∏ô log & cost ‡πÄ‡∏û‡∏¥‡πà‡∏°‡∏Ç‡∏∂‡πâ‡∏ô‡∏û‡∏≠‡∏™‡∏°‡∏Ñ‡∏ß‡∏£

profiler_config = ProfilerConfig(
    system_monitor_interval_millis=500
)

rules = [
    ProfilerRule.sagemaker(rule_configs.ProfilerReport())
]
#====================================================
PipelineSession ‡∏à‡∏∞‡∏à‡∏±‡∏î‡∏Å‡∏≤‡∏£ output_path ‡πÉ‡∏´‡πâ‡∏°‡∏µ‡πÇ‡∏Ñ‡∏£‡∏á‡∏õ‡∏£‡∏∞‡∏°‡∏≤‡∏ì:
s3://<default-bucket>/<pipeline-name>/<step-name>/<execution-id>/output/model.tar.gz
‡∏Ñ‡∏∑‡∏≠‡∏°‡∏±‡∏ô unique ‡∏≠‡∏¢‡∏π‡πà‡πÅ‡∏•‡πâ‡∏ß‡∏ï‡∏≤‡∏° ‡∏ä‡∏∑‡πà‡∏≠ pipeline + ‡∏ä‡∏∑‡πà‡∏≠ step + execution id

‡∏î‡∏±‡∏á‡∏ô‡∏±‡πâ‡∏ô‡πÄ‡∏£‡∏≤ ‡πÑ‡∏°‡πà‡∏à‡∏≥‡πÄ‡∏õ‡πá‡∏ô‡∏ï‡πâ‡∏≠‡∏á‡πÉ‡∏™‡πà timestamp ‡πÄ‡∏≠‡∏á‡∏≠‡∏µ‡∏Å ‡πÄ‡∏•‡∏¢
- lineage ‡πÉ‡∏ô Studio / Model Registry ‡∏à‡∏∞‡∏£‡∏π‡πâ‡∏ß‡πà‡∏≤ model ‡∏ô‡∏µ‡πâ‡∏°‡∏≤‡∏à‡∏≤‡∏Å pipeline execution ‡πÑ‡∏´‡∏ô

- ‡πÑ‡∏°‡πà‡πÄ‡∏™‡∏µ‡πà‡∏¢‡∏á‡∏û‡∏•‡∏≤‡∏î‡∏û‡∏¥‡∏°‡∏û‡πå timestamp ‡∏ã‡πâ‡∏≥‡∏´‡∏£‡∏∑‡∏≠‡∏ä‡∏ô‡∏Å‡∏±‡∏ô‡πÄ‡∏≠‡∏á
#====================================================

"""


xgb_estimator = XGBoost(
    entry_point="training.py",
    framework_version="1.7-1",
    py_version="py3",
    role=role,
    instance_count=1,
    instance_type=train_instance_type,
    sagemaker_session=pipeline_session,
    base_job_name="retail-demand-xgb",
    # profiler_config=profiler_config,
    # rules=rules,
    # disable_profiler=False,
    # enable_sagemaker_metrics=True,
)

# Hyperparameters for both XGBoost algo and our script
xgb_estimator.set_hyperparameters(
    # XGBoost hyperparams
    max_depth=6,
    learning_rate=0.1,       # "eta" under the hood
    n_estimators=250,
    subsample=0.8,
    colsample_bytree=0.8,
    min_child_weight=1.0,
    reg_lambda=1.0,
)

train_step = TrainingStep(
    name="TrainXGBRetailDemand",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig
                .Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig
                .Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(   # üëà ‡πÄ‡∏û‡∏¥‡πà‡∏° channel test ‡πÄ‡∏Ç‡πâ‡∏≤‡πÑ‡∏õ
            s3_data=preprocess_step.properties.ProcessingOutputConfig
                .Outputs["test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)



INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


# 4. Step: Evaluate ‚Äì SKLearnProcessor + evaluate.py
Writes evaluation_summary.json, shap_feature_importance.csv, etc.


In [5]:

# ‡πÉ‡∏ä‡πâ XGBoost container version ‡πÄ‡∏î‡∏µ‡∏¢‡∏ß‡∏Å‡∏±‡∏ö‡∏ó‡∏µ‡πà train
xgb_image_uri = image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

eval_processor = ScriptProcessor(
    image_uri=xgb_image_uri,
    command=["python3"],
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    sagemaker_session=pipeline_session,
    base_job_name="evaluate-retail-demand",
)



eval_report = PropertyFile(
    name="EvaluationReport",           # arbitrary name used inside the pipeline
    output_name="evaluation",  # matches the ProcessingOutput.output_name
    path="evaluation_summary.json",    # the file inside that output
)


evaluate_step = ProcessingStep(
    name="EvaluateModel",
    processor=eval_processor,
    code="evaluate.py",
    inputs=[
        # test data from preprocess step
        ProcessingInput(
            input_name="test_data",
            source=preprocess_step.properties.ProcessingOutputConfig
                .Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
        # model.tar.gz from training step
        ProcessingInput(
            input_name="model",
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/output/evaluation",
        )
    ],
    job_arguments=[
        "--test_data", "/opt/ml/processing/test",
        "--model_dir", "/opt/ml/processing/model",
        "--output_dir", "/opt/ml/processing/output/evaluation",
    ],
    property_files=[eval_report],
)


INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


# 5. Step: IngestEval ‚Äì SKLearnProcessor + ingest_to_athena.py
    (writes evaluation_summary.csv, shap_importance, test_predictions into Athena)
    This runs for *every* model, good or bad.


In [6]:

monitor_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    env={"AWS_DEFAULT_REGION": region},
)

ingest_step = ProcessingStep(
    name="IngestEvalToAthena",
    processor=monitor_processor,
    code="ingest_to_athena.py",
    inputs=[
        ProcessingInput(
            input_name="eval_local",
            source=evaluate_step.properties.ProcessingOutputConfig
               .Outputs["evaluation"].S3Output.S3Uri,
            destination="/opt/ml/processing/eval",  # if your script ever wants local access
        ),
    ],
    job_arguments=[
        "--eval-output-s3",
        evaluate_step.properties.ProcessingOutputConfig
            .Outputs["evaluation"].S3Output.S3Uri,
        "--monitor-base-s3",
        monitor_base_s3,
        "--athena-db-name",
        monitor_athena_db,
    ],
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# 6. Step: RegisterModel ‚Äì only if metrics pass thresholds


In [7]:
# --------------------------------------------------------------------
# 6. Step: RegisterModel ‚Äì only if metrics pass thresholds
# --------------------------------------------------------------------
# Build ModelMetrics from evaluation_regression_metrics.json
eval_metrics_s3 = Join(
    on="",
    values=[
        evaluate_step.properties.ProcessingOutputConfig
            .Outputs["evaluation"].S3Output.S3Uri,
        "/evaluation_regression_metrics.json",
    ],
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=eval_metrics_s3,
        content_type="application/json",
    )
)

register_step = RegisterModel(
    name="RegisterRetailDemandModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["application/json"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status="PendingManualApproval",  # manual approval in Registry
)

# 7. Condition ‚Äì if RMSE <= threshold AND R2 >= threshold ‚Üí Register


In [8]:
# --------------------------------------------------------------------
# 7. Condition ‚Äì if RMSE <= threshold AND R2 >= threshold ‚Üí Register
# --------------------------------------------------------------------


rmse_value = JsonGet(
    step=evaluate_step,       # üëà pass the Step object
    property_file=eval_report,
    json_path="metrics.rmse",
)

r2_value = JsonGet(
    step=evaluate_step,
    property_file=eval_report,
    json_path="metrics.r2",
)

cond_step = ConditionStep(
    name="CheckMetricsAndRegister",
    conditions=[
        ConditionLessThanOrEqualTo(
            left=rmse_value,
            right=rmse_threshold,
        ),
        ConditionGreaterThanOrEqualTo(
            left=r2_value,
            right=r2_threshold,
        ),
    ],
    if_steps=[register_step],  # register only if metrics OK
    else_steps=[],             # still always run ingest_step (outside condition)
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# 8. Build Pipeline object


In [9]:



# --------------------------------------------------------------------
# 8. Build Pipeline object
# --------------------------------------------------------------------
def get_pipeline(
    pipeline_name: str = "RetailDemandForecastingPipeline",
) -> Pipeline:
    return Pipeline(
        name=pipeline_name,
        parameters=[
            athena_database,
            athena_table,
            athena_query,
            athena_workgroup,
            feature_group_name,
            feature_store_offline_prefix,
            train_instance_type,
            rmse_threshold,
            r2_threshold,
            mlflow_tracking_server_arn,
            mlflow_experiment_name,
            model_package_group_name,
            monitor_base_s3,   # üëà add these
            monitor_athena_db, # üëà add these

        ],
        steps=[
            preprocess_step,
            train_step,
            evaluate_step,
            ingest_step,   # runs always after evaluate
            cond_step,     # conditionally runs RegisterModel
        ],
        sagemaker_session=pipeline_session,
    )


pipeline = get_pipeline()
pipeline.upsert(
    role_arn=role,   # üëà name must be exactly role_arn
    description="Retail demand forecasting pipeline",
)


INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:423623839320:pipeline/RetailDemandForecastingPipeline',
 'PipelineVersionId': 13,
 'ResponseMetadata': {'RequestId': '43b0ad2e-8f69-4b63-a025-ff1c6662bd2d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '43b0ad2e-8f69-4b63-a025-ff1c6662bd2d',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '122',
   'date': 'Sat, 13 Dec 2025 13:48:37 GMT'},
  'RetryAttempts': 0}}

In [10]:
execution = pipeline.start()
print("Started pipeline execution:", execution.arn)


INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


Started pipeline execution: arn:aws:sagemaker:us-east-1:423623839320:pipeline/RetailDemandForecastingPipeline/execution/ncaw7w2h64w9
