# MLOps Pipeline Workflow & Team Notes

## Project Overview
This notebook implements an end-to-end MLOps data pipeline using the **Olist Brazilian E-Commerce dataset**. The goal was to demonstrate a production-style workflow that covers data ingestion, cataloging, exploratory analysis, feature engineering, feature storage, and dataset splitting — all using AWS services in a cost-efficient way.

The pipeline was intentionally built step-by-step to mirror MLOps practices rather than a one-off modeling notebook.

---

## High-Level Workflow

### 1. Raw Data Ingestion (S3 Data Lake)
- Created an S3 bucket to act as a data lake.
- Uploaded all **9 raw CSV files** from the Kaggle Olist dataset.
- Organized raw data under: s3:///raw/olist/ingest_date=YYYY-MM-DD/
- Each dataset was placed into its own subfolder to support Athena’s directory-based table requirements.

---

### 2. Data Cataloging & Querying (Athena)
- Created an Athena database (`olist_datalake`).
- Defined **external tables** for each dataset directly from JupyterLab (no Glue crawler required).
- Verified schemas and row counts using Athena queries.
- This step enabled SQL-based access to the data and served as the cataloging layer for downstream analysis.

---

### 3. Exploratory Data Analysis (SageMaker + Pandas)
- Loaded Athena tables into Pandas using `awswrangler`.
- Performed sanity checks on row counts and joins.
- Built an **order-level analytical view** by aggregating:
- order items
- payments
- customer attributes
- Engineered a target variable (`is_late`) based on delivery vs. estimated delivery dates.
- Observed class imbalance (~8% late deliveries), motivating careful splitting and evaluation later.

---

### 4. Feature Engineering
- Created leakage-safe, order-level features using only information available at purchase time:
- pricing, freight, number of items/sellers
- payment information
- time-based features (day of week, hour of day)
- customer state
- Maintained a **canonical feature dataset** for analysis and splitting.
- Created a **Feature Store–compatible version** with strict data types.

---

### 5. SageMaker Feature Store (Offline Store)
- Created a **SageMaker Feature Group** (offline store only to control cost).
- Used `order_id` as the record identifier.
- Used a strictly formatted ISO-8601 `event_time` with UTC (`Z`) as the event time feature.
- Successfully ingested ~99k feature records into Feature Store.
- Offline store data is persisted in S3 for training and future reuse.

---

### 6. Dataset Splitting (Time-Based)
- Performed a **time-based split** using `event_time` to avoid temporal leakage:
- Train: ~40%
- Validation: ~10%
- Test: ~10%
- Production reserve: ~40%
- Persisted each split as Parquet files to: s3:///splits/olist/features/version=v1/
- This mirrors a real production setup where recent data is reserved for inference.

---

## Key Engineering Decisions & Lessons Learned

- **Athena LOCATION must point to directories**, not individual files.
- **Feature Store requires strict ISO-8601 timestamps with timezone** — missing the `Z` suffix causes ingestion failures.
- Maintaining separate:
- canonical feature data (analysis-friendly)
- Feature Store–safe data (schema-restricted)
is a best practice in real MLOps systems.
- Time-based splitting is critical to avoid data leakage in temporal datasets.
- Offline Feature Store provides the required functionality while minimizing cost.

---

## Cost Management Notes
- SageMaker compute was stopped immediately after completion.
- Feature Store **online store was intentionally disabled** to avoid ongoing charges.
- S3 storage costs are minimal and safe to keep until final submission.
- Cleanup (Feature Group deletion, S3 cleanup) should only be done **after submission**.

---

## For Teammates
If you need to re-run or extend this work:
1. Start at the Athena read step (no need to re-upload raw data).
2. Do **not** re-run ingestion unless changing the feature schema.
3. Always stop SageMaker compute when finished.

This notebook represents a complete, MLOps data pipeline.

## Model Benchmark and Evaluation (Module 4)

### Baseline Model
As a benchmark, we implemented a simple heuristic model that always predicts an order will be delivered on time. This reflects the majority class in the dataset and establishes a lower bound for model performance.

Due to class imbalance (~86% of orders are not late), the baseline achieves high accuracy but fails to identify late deliveries, resulting in zero precision, recall, and F1-score.

### First Iteration Model (XGBoost v1)
We trained a first-pass XGBoost binary classifier in Amazon SageMaker using a limited set of engineered features related to order size, payment behavior, and purchase timing.

The model was evaluated using SageMaker Batch Transform on the held-out test dataset. Batch Transform was selected over a real-time endpoint to minimize cost and ensure automatic resource cleanup.

### Results Summary
- The XGBoost model achieved an AUC of approximately **0.56**, indicating it learned some discriminative signal beyond random chance.
- Overall accuracy matched the baseline model due to class imbalance and use of a default classification threshold.
- Precision, recall, and F1-score remained low, highlighting the need for future improvements such as class weighting, threshold tuning, and feature expansion.

### Key Takeaways
This iteration establishes a complete, cost-aware MLOps workflow including data ingestion, feature engineering, model training, evaluation, and deployment via batch inference. While performance improvements are needed, this version serves as a strong baseline for future model iterations and CI/CD integration in later modules.

## Monitoring, Evaluation, and Reporting Summary (Module 5)

### Model Benchmarking and Evaluation
We established a simple benchmark model that always predicts orders as on-time. While this baseline achieved high accuracy due to class imbalance, it had zero recall for late deliveries. We then trained an XGBoost classifier using a small, carefully selected feature set. Model performance was evaluated using batch inference, and metrics such as accuracy and AUC were compared against the benchmark to establish a minimum viable improvement baseline.

### Data Monitoring
Data monitoring was implemented using SageMaker Batch Transform with batch data capture enabled. Production inference inputs and corresponding model outputs were automatically captured to S3. Baseline statistics and constraints were generated using SageMaker Model Monitor and stored in S3. These artifacts enable offline drift detection and future scheduled monitoring without requiring a persistent real-time endpoint, aligning with cost-efficient MLOps best practices.

### Infrastructure Monitoring
Infrastructure-level monitoring was implemented using Amazon CloudWatch. Metrics for SageMaker training jobs, batch transform jobs, and processing jobs were collected automatically. A centralized CloudWatch dashboard was created to visualize job durations and failure counts, providing operational visibility into the ML system.

### Reports and Artifacts
The system produces the following monitoring artifacts:
- Model training metrics in CloudWatch Logs
- Baseline statistics and constraints in S3
- Captured production input and output data in S3
- CloudWatch dashboard for infrastructure monitoring

This architecture supports scalable monitoring, auditability, and future CI/CD integration while remaining within budget constraints.

In [183]:
# Consolidated Imports
import boto3
import sagemaker
import pandas as pd
import numpy as np
import awswrangler as wr
import json
import os
import sys
import time
import datetime
from sagemaker.feature_store.feature_group import FeatureGroup
from botocore.exceptions import ClientError



In [184]:
# Global Setup
sess = sagemaker.Session()
bucket = sess.default_bucket()
region = boto3.session.Session().region_name
s3 = boto3.client('s3')

# Dynamic Prefix Detection
# Look for existing ingestion folders under raw/olist/
resp = s3.list_objects_v2(Bucket=bucket, Prefix='raw/olist/', Delimiter='/')
prefixes = [p['Prefix'] for p in resp.get('CommonPrefixes', [])]

if prefixes:
    # Sort ensuring YYYY-MM-DD format sorts correctly
    # Format expected: raw/olist/ingest_date=YYYY-MM-DD/
    prefix = sorted(prefixes)[-1]
    print(f'Found existing data: {prefix}')
else:
    # Fallback to today if no data found
    today = datetime.datetime.now().strftime('%Y-%m-%d')
    prefix = f'raw/olist/ingest_date={today}/'
    print(f'No existing data found. Using new prefix: {prefix}')

print(f'Target Bucket: {bucket}')
print(f'Target Prefix: {prefix}')


No existing data found. Using new prefix: raw/olist/ingest_date=2026-02-18/
Target Bucket: sagemaker-us-east-1-587322031938
Target Prefix: raw/olist/ingest_date=2026-02-18/


In [185]:
# Consolidated Imports
from sagemaker.feature_store.feature_group import FeatureGroup
from botocore.exceptions import ClientError



In [186]:
# Global Setup
sess = sagemaker.Session()
bucket = sess.default_bucket()
region = boto3.session.Session().region_name
s3 = boto3.client('s3')

# Dynamic Prefix Detection
# Look for existing ingestion folders under raw/olist/
resp = s3.list_objects_v2(Bucket=bucket, Prefix='raw/olist/', Delimiter='/')
prefixes = [p['Prefix'] for p in resp.get('CommonPrefixes', [])]

if prefixes:
    # Sort ensuring YYYY-MM-DD format sorts correctly
    # Format expected: raw/olist/ingest_date=YYYY-MM-DD/
    prefix = sorted(prefixes)[-1]
    print(f'Found existing data: {prefix}')
else:
    # Fallback to today if no data found
    today = datetime.datetime.now().strftime('%Y-%m-%d')
    print(f'No existing data found. Using new prefix: {prefix}')

print(f'Target Bucket: {bucket}')
print(f'Target Prefix: {prefix}')


No existing data found. Using new prefix: raw/olist/ingest_date=2026-02-18/
Target Bucket: sagemaker-us-east-1-587322031938
Target Prefix: raw/olist/ingest_date=2026-02-18/


In [187]:


s3 = boto3.client("s3")
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)

keys = [obj["Key"] for obj in resp.get("Contents", [])]
print("Found files:", len(keys))
for k in keys:
    print(k)

Found files: 0


In [188]:

results_prefix = "athena-results/"

s3 = boto3.client("s3")
resp = s3.list_objects_v2(Bucket=bucket, Prefix=results_prefix, MaxKeys=1)

if "Contents" in resp:
    print("Athena results prefix exists:", f"s3://{bucket}/{results_prefix}")
else:
    # create a zero-byte object so the prefix exists
    s3.put_object(Bucket=bucket, Key=results_prefix)
    print("Created Athena results prefix:", f"s3://{bucket}/{results_prefix}")

Athena results prefix exists: s3://sagemaker-us-east-1-587322031938/athena-results/


In [189]:

REGION = boto3.session.Session().region_name
athena = boto3.client("athena", region_name=REGION)

ATHENA_OUTPUT = f"s3://{bucket}/athena-results/"
DB = "olist_datalake"

def run_athena(sql: str, database: str = "default"):
    res = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": database},
        ResultConfiguration={"OutputLocation": ATHENA_OUTPUT},
    )
    qid = res["QueryExecutionId"]
    while True:
        q = athena.get_query_execution(QueryExecutionId=qid)
        state = q["QueryExecution"]["Status"]["State"]
        if state in ("SUCCEEDED", "FAILED", "CANCELLED"):
            break
        time.sleep(1)
    if state != "SUCCEEDED":
        reason = q["QueryExecution"]["Status"].get("StateChangeReason", "Unknown")
        raise RuntimeError(f"Athena query failed: {state} - {reason}\nSQL:\n{sql}")
    return qid

run_athena(f"CREATE DATABASE IF NOT EXISTS {DB};", database="default")
print("Database ready:", DB)

Database ready: olist_datalake


In [190]:
# Copy raw files into subfolders (Only if they do not exist)
files = [
    'olist_customers_dataset.csv',
    'olist_geolocation_dataset.csv',
    'olist_order_items_dataset.csv',
    'olist_order_payments_dataset.csv',
    'olist_order_reviews_dataset.csv',
    'olist_orders_dataset.csv',
    'olist_products_dataset.csv',
    'olist_sellers_dataset.csv',
    'product_category_name_translation.csv',
]

for f in files:
    src_key = prefix + f
    folder = f.replace('.csv', '')
    dst_key = f'{prefix}{folder}/{f}'
    
    try:
        s3.head_object(Bucket=bucket, Key=dst_key)
        print(f'Skipping copy, file already exists: {dst_key}')
    except:
        print(f'Copying to: {dst_key}')
        try:
            s3.copy_object(
                Bucket=bucket,
                CopySource={'Bucket': bucket, 'Key': src_key},
                Key=dst_key
            )
        except Exception as e:
             print(f'Warning: Could not copy {f}. Is it uploaded to {src_key}?')

print('\nData preparation check complete.')


Copying to: raw/olist/ingest_date=2026-02-18/olist_customers_dataset/olist_customers_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_geolocation_dataset/olist_geolocation_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_order_items_dataset/olist_order_items_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_order_payments_dataset/olist_order_payments_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_order_reviews_dataset/olist_order_reviews_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_orders_dataset/olist_orders_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_products_dataset/olist_products_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/olist_sellers_dataset/olist_sellers_dataset.csv
Copying to: raw/olist/ingest_date=2026-02-18/product_category_name_translation/product_category_name_translation.csv

Data preparation check complete.


In [191]:
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=50)
for obj in resp.get("Contents", []):
    print(obj["Key"])

In [192]:
RAW_BASE = f"s3://{bucket}/{prefix}"

def create_csv_table(table_name: str, columns_ddl: str, folder_name: str):
    sql = f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {DB}.{table_name} (
      {columns_ddl}
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES (
      'separatorChar' = ',',
      'quoteChar'     = '\"',
      'escapeChar'    = '\\\\'
    )
    STORED AS TEXTFILE
    LOCATION '{RAW_BASE}{folder_name}/'
    TBLPROPERTIES ('skip.header.line.count'='1');
    """
    run_athena(sql, database=DB)
    print(f"Created: {DB}.{table_name}")

create_csv_table(
    "olist_customers_dataset",
    """
    customer_id string,
    customer_unique_id string,
    customer_zip_code_prefix int,
    customer_city string,
    customer_state string
    """,
    "olist_customers_dataset"
)

create_csv_table(
    "olist_geolocation_dataset",
    """
    geolocation_zip_code_prefix int,
    geolocation_lat double,
    geolocation_lng double,
    geolocation_city string,
    geolocation_state string
    """,
    "olist_geolocation_dataset"
)

create_csv_table(
    "olist_order_items_dataset",
    """
    order_id string,
    order_item_id int,
    product_id string,
    seller_id string,
    shipping_limit_date string,
    price double,
    freight_value double
    """,
    "olist_order_items_dataset"
)

create_csv_table(
    "olist_order_payments_dataset",
    """
    order_id string,
    payment_sequential int,
    payment_type string,
    payment_installments int,
    payment_value double
    """,
    "olist_order_payments_dataset"
)

create_csv_table(
    "olist_order_reviews_dataset",
    """
    review_id string,
    order_id string,
    review_score int,
    review_comment_title string,
    review_comment_message string,
    review_creation_date string,
    review_answer_timestamp string
    """,
    "olist_order_reviews_dataset"
)

create_csv_table(
    "olist_orders_dataset",
    """
    order_id string,
    customer_id string,
    order_status string,
    order_purchase_timestamp string,
    order_approved_at string,
    order_delivered_carrier_date string,
    order_delivered_customer_date string,
    order_estimated_delivery_date string
    """,
    "olist_orders_dataset"
)

create_csv_table(
    "olist_products_dataset",
    """
    product_id string,
    product_category_name string,
    product_name_lenght int,
    product_description_lenght int,
    product_photos_qty int,
    product_weight_g int,
    product_length_cm int,
    product_height_cm int,
    product_width_cm int
    """,
    "olist_products_dataset"
)

create_csv_table(
    "olist_sellers_dataset",
    """
    seller_id string,
    seller_zip_code_prefix int,
    seller_city string,
    seller_state string
    """,
    "olist_sellers_dataset"
)

create_csv_table(
    "product_category_name_translation",
    """
    product_category_name string,
    product_category_name_english string
    """,
    "product_category_name_translation"
)

Created: olist_datalake.olist_customers_dataset
Created: olist_datalake.olist_geolocation_dataset
Created: olist_datalake.olist_order_items_dataset
Created: olist_datalake.olist_order_payments_dataset
Created: olist_datalake.olist_order_reviews_dataset
Created: olist_datalake.olist_orders_dataset
Created: olist_datalake.olist_products_dataset
Created: olist_datalake.olist_sellers_dataset
Created: olist_datalake.product_category_name_translation


In [193]:
run_athena(f"SHOW TABLES IN {DB};", database=DB)
print("SHOW TABLES succeeded")

run_athena(f"SELECT COUNT(*) FROM {DB}.olist_orders_dataset;", database=DB)
print("COUNT orders succeeded")

run_athena(f"SELECT order_status, COUNT(*) c FROM {DB}.olist_orders_dataset GROUP BY 1 ORDER BY c DESC;", database=DB)
print("GROUP BY order_status succeeded")

SHOW TABLES succeeded
COUNT orders succeeded
GROUP BY order_status succeeded


In [194]:
#6.1

DB = "olist_datalake"

orders = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {DB}.olist_orders_dataset",
    database=DB,
    ctas_approach=False
)

order_items = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {DB}.olist_order_items_dataset",
    database=DB,
    ctas_approach=False
)

payments = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {DB}.olist_order_payments_dataset",
    database=DB,
    ctas_approach=False
)

customers = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {DB}.olist_customers_dataset",
    database=DB,
    ctas_approach=False
)

print("orders:", orders.shape)
print("order_items:", order_items.shape)
print("payments:", payments.shape)
print("customers:", customers.shape)

orders: (99441, 8)
order_items: (112650, 7)
payments: (103886, 5)
customers: (99441, 5)


In [195]:
#6.2
# Parse timestamps
timestamp_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date",
]
for col in timestamp_cols:
    orders[col] = pd.to_datetime(orders[col], errors="coerce")

# Aggregations
items_agg = (
    order_items.groupby("order_id")
    .agg(
        num_items=("order_item_id", "count"),
        total_price=("price", "sum"),
        total_freight_value=("freight_value", "sum"),
        num_sellers=("seller_id", "nunique"),
    )
    .reset_index()
)

payments_agg = (
    payments.groupby("order_id")
    .agg(
        payment_value=("payment_value", "sum"),
        payment_installments=("payment_installments", "max"),
        payment_type=("payment_type", lambda x: x.value_counts().index[0]),
    )
    .reset_index()
)

eda_df = (
    orders
    .merge(items_agg, on="order_id", how="left")
    .merge(payments_agg, on="order_id", how="left")
    .merge(customers[["customer_id", "customer_state"]], on="customer_id", how="left")
)

# Time features
eda_df["purchase_dow"] = eda_df["order_purchase_timestamp"].dt.dayofweek
eda_df["purchase_hour"] = eda_df["order_purchase_timestamp"].dt.hour

# Label: late delivery
eda_df["is_late"] = (
    (eda_df["order_delivered_customer_date"].notna()) &
    (eda_df["order_estimated_delivery_date"].notna()) &
    (eda_df["order_delivered_customer_date"] > eda_df["order_estimated_delivery_date"])
).astype(int)

print("Orders rows:", len(orders))
print("EDA rows:", len(eda_df))
print("Row loss:", len(orders) - len(eda_df))
print("Late rate:\n", eda_df["is_late"].value_counts(normalize=True))

Orders rows: 99441
EDA rows: 99441
Row loss: 0
Late rate:
 is_late
0    0.92129
1    0.07871
Name: proportion, dtype: float64


In [196]:
#7.0 + 7B
# Canonical features (with purchase timestamp)
feat = eda_df[[
    "order_id",
    "order_purchase_timestamp",
    "customer_state",
    "num_items",
    "total_price",
    "total_freight_value",
    "num_sellers",
    "payment_value",
    "payment_installments",
    "payment_type",
    "purchase_dow",
    "purchase_hour",
    "is_late"
]].copy()

feat["customer_state"] = feat["customer_state"].fillna("unknown").astype(str)
feat["payment_type"] = feat["payment_type"].fillna("unknown").astype(str)

for c in ["num_items", "num_sellers", "payment_installments", "purchase_dow", "purchase_hour", "is_late"]:
    feat[c] = feat[c].fillna(0).astype(int)

for c in ["total_price", "total_freight_value", "payment_value"]:
    feat[c] = feat[c].fillna(0.0).astype(float)

# Create strict ISO-8601 event time WITH timezone "Z"
feat["event_time"] = (
    pd.to_datetime(feat["order_purchase_timestamp"], errors="coerce")
    .dt.strftime("%Y-%m-%dT%H:%M:%SZ")
)

feat = feat.dropna(subset=["order_id", "event_time"]).reset_index(drop=True)

# FeatureStore-safe version (remove datetime64 column)
feat_fs = feat.drop(columns=["order_purchase_timestamp"]).copy()
feat_fs["event_time"] = feat_fs["event_time"].astype(str)

print("feat shape:", feat.shape)
print("feat_fs shape:", feat_fs.shape)
print("event_time sample:", feat_fs["event_time"].head().tolist())

feat shape: (99441, 14)
feat_fs shape: (99441, 13)
event_time sample: ['2017-10-02T10:56:33Z', '2018-07-24T20:41:37Z', '2018-08-08T08:38:49Z', '2017-11-18T19:28:06Z', '2018-02-13T21:18:39Z']


In [197]:
#7.1
from sagemaker.feature_store.feature_group import FeatureGroup

region = boto3.session.Session().region_name
sess = sagemaker.Session()
role = sagemaker.get_execution_role()

feature_group_name = "olist-order-features-v1"
offline_store_s3_uri = f"s3://{bucket}/feature-store/{feature_group_name}/"

fg = FeatureGroup(name=feature_group_name, sagemaker_session=sess)

print("Region:", region)
print("Role:", role)
print("Offline store URI:", offline_store_s3_uri)

Region: us-east-1
Role: arn:aws:iam::587322031938:role/LabRole
Offline store URI: s3://sagemaker-us-east-1-587322031938/feature-store/olist-order-features-v1/


In [198]:
#7.2
from botocore.exceptions import ClientError

sm = boto3.client("sagemaker", region_name=region)

def feature_group_exists(name: str) -> bool:
    try:
        sm.describe_feature_group(FeatureGroupName=name)
        return True
    except ClientError as e:
        if "ResourceNotFound" in str(e):
            return False
        raise

def wait_for_fg_created(name: str, timeout_sec: int = 600, poll_sec: int = 10):
    start = time.time()
    while True:
        desc = sm.describe_feature_group(FeatureGroupName=name)
        status = desc.get("FeatureGroupStatus")
        offline_status = desc.get("OfflineStoreStatus", {}).get("Status", "UNKNOWN")
        print(f"Status={status}, OfflineStoreStatus={offline_status}")
        if status == "Created" and offline_status in ("Active", "UNKNOWN"):
            return desc
        if status in ("CreateFailed", "DeleteFailed"):
            raise RuntimeError(f"Feature Group failed with status={status}. Details: {desc}")
        if time.time() - start > timeout_sec:
            raise TimeoutError(f"Timed out waiting for Feature Group to be Created: {name}")
        time.sleep(poll_sec)

if feature_group_exists(feature_group_name):
    print(f"Feature Group already exists: {feature_group_name}")
else:
    fg.load_feature_definitions(data_frame=feat_fs)
    fg.create(
        s3_uri=offline_store_s3_uri,
        record_identifier_name="order_id",
        event_time_feature_name="event_time",
        role_arn=role,
        enable_online_store=False,
    )
    print("Create request submitted")

wait_for_fg_created(feature_group_name)
print("Feature Group ready")

Feature Group already exists: olist-order-features-v1
Status=Created, OfflineStoreStatus=Active
Feature Group ready


In [199]:
#7.3
ingest_response = fg.ingest(data_frame=feat_fs, max_workers=2, wait=True)
print("Ingest complete")

INFO:sagemaker.feature_store.feature_group:Started ingesting index 0 to 49721
INFO:sagemaker.feature_store.feature_group:Started ingesting index 49721 to 99441
INFO:sagemaker.feature_store.feature_group:Successfully ingested row 49721 to 99441
INFO:sagemaker.feature_store.feature_group:Successfully ingested row 0 to 49721


Ingest complete


In [200]:
#7.4

sm = boto3.client("sagemaker", region_name=region)
desc = sm.describe_feature_group(FeatureGroupName=feature_group_name)

print("FeatureGroupStatus:", desc["FeatureGroupStatus"])
print("OfflineStoreStatus:", desc["OfflineStoreStatus"]["Status"])
print("S3 Offline Store Uri:", desc["OfflineStoreConfig"]["S3StorageConfig"]["S3Uri"])

FeatureGroupStatus: Created
OfflineStoreStatus: Active
S3 Offline Store Uri: s3://sagemaker-us-east-1-587322031938/feature-store/olist-order-features-v1/


In [201]:
#8.0


# Use feat_fs for splits (Feature Store compatible)
feat_sorted = feat_fs.sort_values("event_time").reset_index(drop=True)
n = len(feat_sorted)

train_end = int(n * 0.40)
val_end   = int(n * 0.50)
test_end  = int(n * 0.60)

train_df = feat_sorted.iloc[:train_end]
val_df   = feat_sorted.iloc[train_end:val_end]
test_df  = feat_sorted.iloc[val_end:test_end]
prod_df  = feat_sorted.iloc[test_end:]

print("Split sizes")
print("train:", len(train_df))
print("val:  ", len(val_df))
print("test: ", len(test_df))
print("prod: ", len(prod_df))

split_base = f"s3://{bucket}/splits/olist/features/version=v1/"
wr.s3.to_parquet(train_df, f"{split_base}train/", dataset=True, mode="overwrite")
wr.s3.to_parquet(val_df,   f"{split_base}val/",   dataset=True, mode="overwrite")
wr.s3.to_parquet(test_df,  f"{split_base}test/",  dataset=True, mode="overwrite")
wr.s3.to_parquet(prod_df,  f"{split_base}prod/",  dataset=True, mode="overwrite")

print("Wrote splits to:", split_base)

Split sizes
train: 39776
val:   9944
test:  9944
prod:  39777
Wrote splits to: s3://sagemaker-us-east-1-587322031938/splits/olist/features/version=v1/


In [202]:
#9.0

In [203]:

s3 = boto3.client("s3")
prefix = "splits/olist/features/version=v1/"

resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=50)
print("Objects found:", resp.get("KeyCount", 0))
for obj in resp.get("Contents", [])[:20]:
    print(obj["Key"])

Objects found: 4
splits/olist/features/version=v1/prod/bc93f1fe93db48b0a9f52ace4a5057a0.snappy.parquet
splits/olist/features/version=v1/test/32444716227a46a985813316a5ecd552.snappy.parquet
splits/olist/features/version=v1/train/99e0cf35aa0d4c48b2a25d73fa7b9c09.snappy.parquet
splits/olist/features/version=v1/val/67380724b7ed46a7bda7ccd2fe105022.snappy.parquet


In [204]:
#M4 Start
try:
    print(len(train))
except NameError:
    print("Kernel is fresh — variables not loaded")

39776


In [205]:

split_base = f"s3://{bucket}/splits/olist/features/version=v1/"

In [206]:
train = wr.s3.read_parquet(f"{split_base}train/")
val   = wr.s3.read_parquet(f"{split_base}val/")
test  = wr.s3.read_parquet(f"{split_base}test/")

print(train.shape, val.shape, test.shape)

(39776, 13) (9944, 13) (9944, 13)


In [207]:
#M4-1 Benchmark Model Baseline

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Ground truth
y_test = test["is_late"].astype(int)

# Baseline prediction: always predict NOT late
y_pred_baseline = pd.Series(0, index=test.index)

baseline_metrics = {
    "accuracy": accuracy_score(y_test, y_pred_baseline),
    "precision": precision_score(y_test, y_pred_baseline, zero_division=0),
    "recall": recall_score(y_test, y_pred_baseline, zero_division=0),
    "f1": f1_score(y_test, y_pred_baseline, zero_division=0),
}

baseline_metrics

{'accuracy': 0.8621279163314561, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0}

In [208]:
#M4-2 1st Model Sage Maker

FEATURES = [
    "num_items",
    "total_price",
    "total_freight_value",
    "num_sellers",
    "payment_value",
    "payment_installments",
    "purchase_dow",
    "purchase_hour",
]

def to_xgb_matrix(df):
    out = df[["is_late"] + FEATURES].copy()
    for c in FEATURES:
        out[c] = pd.to_numeric(out[c], errors="coerce").fillna(0.0)
    out["is_late"] = out["is_late"].astype(int)
    return out

train_xgb = to_xgb_matrix(train)
val_xgb   = to_xgb_matrix(val)
test_xgb  = to_xgb_matrix(test)

train_xgb.head()

Unnamed: 0,is_late,num_items,total_price,total_freight_value,num_sellers,payment_value,payment_installments,purchase_dow,purchase_hour
0,0,2,72.89,63.34,1,136.23,1,6,21
1,0,1,59.5,15.56,1,75.06,3,0,0
2,0,0,0.0,0.0,0,40.95,2,1,15
3,1,3,134.97,8.49,1,0.0,0,3,12
4,0,1,100.0,9.34,1,109.34,1,6,22


In [209]:
#M4-2-2
## NOTE: CSVs already written prior to training — do not rerun
xgb_prefix = f"s3://{bucket}/modeling/xgb_v1/"

train_csv_uri = f"{xgb_prefix}train/train.csv"
val_csv_uri   = f"{xgb_prefix}val/val.csv"
test_csv_uri  = f"{xgb_prefix}test/test.csv"

wr.s3.to_csv(train_xgb, train_csv_uri, index=False, header=False)
wr.s3.to_csv(val_xgb,   val_csv_uri,   index=False, header=False)
wr.s3.to_csv(test_xgb,  test_csv_uri,  index=False, header=False)

print("Train:", train_csv_uri)
print("Val:  ", val_csv_uri)
print("Test: ", test_csv_uri)

Train: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/train/train.csv
Val:   s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/val/val.csv
Test:  s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/test/test.csv


In [210]:
#M4-2-3 Train Model
# TRAINING CELL (DO NOT RERUN)
# This cell was executed once to train the initial XGBoost model.
# Re-running this cell will retrain the model and incur additional cost.
# The trained model is reused below via attachment for evaluation and deployment.

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.image_uris import retrieve

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name

# Built-in XGBoost container
xgb_image = retrieve(
    framework="xgboost",
    region=region,
    version="1.7-1"
)

output_path = f"s3://{bucket}/modeling/xgb_v1/output/"

xgb = Estimator(
    image_uri=xgb_image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",  # budget-friendly
    output_path=output_path,
    sagemaker_session=sess,
)

# Simple, reasonable first-pass hyperparameters
xgb.set_hyperparameters(
    objective="binary:logistic",
    eval_metric="auc",
    num_round=100,
    max_depth=4,
    eta=0.2,
    subsample=0.8,
    colsample_bytree=0.8,
)

train_input = TrainingInput(train_csv_uri, content_type="text/csv")
val_input   = TrainingInput(val_csv_uri, content_type="text/csv")

xgb.fit({
    "train": train_input,
    "validation": val_input
})

training_job_name = xgb.latest_training_job.name
print(f"Training job: {training_job_name}")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2026-02-18-02-57-54-957


2026-02-18 02:57:56 Starting - Starting the training job...
2026-02-18 02:58:10 Starting - Preparing the instances for training...
2026-02-18 02:58:35 Downloading - Downloading input data...
2026-02-18 02:59:20 Downloading - Downloading the training image......
  import pkg_resources[0m
[34m[2026-02-18 03:00:23.171 ip-10-2-103-20.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2026-02-18 03:00:23.257 ip-10-2-103-20.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2026-02-18:03:00:23:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2026-02-18:03:00:23:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2026-02-18:03:00:23:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2026-02-18:03:00:23:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-0

In [211]:
# SKIP THIS CELL M4-2-3b Attach to Existing Trained Model (No Retraining)
import boto3, sagemaker
from sagemaker.estimator import Estimator
from sagemaker.image_uris import retrieve

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name
sm = boto3.client("sagemaker", region_name=region)

# Pick the most recent completed training job
jobs = sm.list_training_jobs(SortBy="CreationTime", SortOrder="Descending", MaxResults=20)["TrainingJobSummaries"]
training_job_name = next(j["TrainingJobName"] for j in jobs if j["TrainingJobStatus"] == "Completed")
print("Using training job:", training_job_name)

# Recreate estimator and attach (no retraining)
xgb_image = retrieve(framework="xgboost", region=region, version="1.7-1")
xgb = Estimator(
    image_uri=xgb_image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"s3://{bucket}/modeling/xgb_v1/output/",
    sagemaker_session=sess,
)
xgb._current_job_name = training_job_name
print("Attached. Ready for Batch Transform.")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Using training job: sagemaker-xgboost-2026-02-18-02-57-54-957
Attached. Ready for Batch Transform.


In [212]:
# M4-2-2b Recreate CSV URIs

xgb_prefix = f"s3://{bucket}/modeling/xgb_v1/"

train_csv_uri = f"{xgb_prefix}train/train.csv"
val_csv_uri   = f"{xgb_prefix}val/val.csv"
test_csv_uri  = f"{xgb_prefix}test/test.csv"

print("Train:", train_csv_uri)
print("Val:  ", val_csv_uri)
print("Test: ", test_csv_uri)

Train: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/train/train.csv
Val:   s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/val/val.csv
Test:  s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/test/test.csv


In [213]:
# M4-3.0 Create inference-only TEST input (features only)
# Code developed using ChatGPT (ChatGPT, 2024) as a paired programmer.

# test_xgb currently has: is_late + 8 features
test_infer = test_xgb.drop(columns=["is_late"]).copy()

test_infer_csv_uri = f"{xgb_prefix}test/test_infer.csv"

# IMPORTANT: no header, no index
wr.s3.to_csv(test_infer, test_infer_csv_uri, index=False, header=False)

print("Wrote inference CSV:", test_infer_csv_uri)
print("Shape (should be 9944 x 8):", test_infer.shape)

Wrote inference CSV: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/test/test_infer.csv
Shape (should be 9944 x 8): (9944, 8)


In [214]:
# Fix: Ensure model_name is defined for Batch Transform
# We use the training job from the previous step to create a SageMaker Model object

if "training_job_name" not in locals():
    # Fallback if running fresh
    jobs = sm.list_training_jobs(SortBy="CreationTime", SortOrder="Descending", MaxResults=1)["TrainingJobSummaries"]
    training_job_name = jobs[0]["TrainingJobName"]

print(f"Using Training Job: {training_job_name}")

# Define model name
import time
model_name = f"xgb-model-{int(time.time())}"

# Create Model object (registers it in SageMaker, needed for Transform)
# We reuse the estimator definition from above if available, or lightweight recreation

from sagemaker.model import Model

# Get model artifacts from the training job
info = sm.describe_training_job(TrainingJobName=training_job_name)
model_data = info["ModelArtifacts"]["S3ModelArtifacts"]
xgb_image = retrieve(framework="xgboost", region=region, version="1.7-1")

model = Model(
    image_uri=xgb_image,
    model_data=model_data,
    role=role,
    sagemaker_session=sess,
    name=model_name
)

model.create(instance_type="ml.m5.large")
print(f"✅ Created Model: {model_name}")


Using Training Job: sagemaker-xgboost-2026-02-18-02-57-54-957


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating model with name: xgb-model-1771383672


✅ Created Model: xgb-model-1771383672


In [215]:
# M4-3.1 Batch Transform on TEST (Evaluation) - v2 (features-only)
from sagemaker.transformer import Transformer

test_transform_output_v2 = f"s3://{bucket}/modeling/xgb_v1/batch-out/test_v2/"

transformer = Transformer(
    model_name=model_name,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=test_transform_output_v2,
    assemble_with="Line",
    accept="text/csv"
)

print("Starting batch transform (features-only input)...")
transformer.transform(
    data=test_infer_csv_uri,
    content_type="text/csv",
    split_type="Line"
)

transformer.wait()
print("Batch transform finished:", test_transform_output_v2)

INFO:sagemaker:Creating transform job with name: sagemaker-xgboost-2026-02-18-03-01-13-655


Starting batch transform (features-only input)...
  import pkg_resources[0m
[34m[2026-02-18:03:06:54:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-18:03:06:54:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-18:03:06:54:INFO] nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_read_timeout 60s;
    

In [216]:
# M4-3.2 Load predictions and evaluate
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score

s3 = boto3.client("s3")

resp = s3.list_objects_v2(Bucket=bucket, Prefix="modeling/xgb_v1/batch-out/test_v2/")
keys = [o["Key"] for o in resp.get("Contents", [])]
out_files = [k for k in keys if k.endswith(".out")]

if not out_files:
    raise RuntimeError(f"No .out files found in test_v2 output. Keys seen: {keys[:10]}")

out_key = sorted(out_files)[-1]
out_uri = f"s3://{bucket}/{out_key}"
print("Reading predictions from:", out_uri)

pred_df = wr.s3.read_csv(out_uri, header=None)
y_prob = pred_df[0].astype(float).reset_index(drop=True)

y_true = test_xgb["is_late"].astype(int).reset_index(drop=True)
y_hat = (y_prob >= 0.5).astype(int)

model_metrics = {
    "auc": roc_auc_score(y_true, y_prob),
    "accuracy": accuracy_score(y_true, y_hat),
    "precision": precision_score(y_true, y_hat, zero_division=0),
    "recall": recall_score(y_true, y_hat, zero_division=0),
    "f1": f1_score(y_true, y_hat, zero_division=0),
}

comparison = pd.DataFrame(
    [baseline_metrics, model_metrics],
    index=["baseline_always_on_time", "xgb_v1"]
)

print("Model metrics:", model_metrics)
comparison

Reading predictions from: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/batch-out/test_v2/test_infer.csv.out
Model metrics: {'auc': 0.5635208855035949, 'accuracy': 0.8621279163314561, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0}


Unnamed: 0,accuracy,precision,recall,f1,auc
baseline_always_on_time,0.862128,0.0,0.0,0.0,
xgb_v1,0.862128,0.0,0.0,0.0,0.563521


In [217]:

sm = boto3.client("sagemaker", region_name=region)
sm.delete_model(ModelName=model_name)
print("Deleted model:", model_name)

Deleted model: xgb-model-1771383672


In [218]:
#Impement:
# Implement model monitors on your ML system.
# Implement data monitors on your ML system.
# Implement infrastructure monitors on your ML system.
# Create a monitoring dashboard for your ML endpoint/job on CloudWatch.
# Generate model and data reports on SageMaker.

#0
import time, json
from sagemaker.image_uris import retrieve
from sagemaker.model import Model

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name
sm = boto3.client("sagemaker", region_name=region)
cw = boto3.client("cloudwatch", region_name=region)

xgb_prefix = f"s3://{bucket}/modeling/xgb_v1/"

# Training job from earlier
training_job_name = "sagemaker-xgboost-2026-01-31-18-00-53-460"

# Your feature list (must match training)
FEATURES = [
    "num_items","total_price","total_freight_value","num_sellers",
    "payment_value","payment_installments","purchase_dow","purchase_hour",
]


In [219]:
# FORCE FIND A VALID TRAINING JOB
# This fixes the "Requested resource not found" error by finding the most recent successful job in YOUR account.

import boto3
sm = boto3.client("sagemaker")

# List recent completed jobs
response = sm.list_training_jobs(
    SortBy="CreationTime", 
    SortOrder="Descending", 
    StatusEquals="Completed", 
    MaxResults=5
)

if not response["TrainingJobSummaries"]:
    raise RuntimeError("No completed training jobs found! You must run the xgb.fit() cell at least once.")

# Pick the most recent one
training_job_name = response["TrainingJobSummaries"][0]["TrainingJobName"]
print(f"✅ Found valid training job: {training_job_name}")

# Now we can safely describe it
tj = sm.describe_training_job(TrainingJobName=training_job_name)
print("Job Status:", tj["TrainingJobStatus"])

✅ Found valid training job: sagemaker-xgboost-2026-02-18-02-57-54-957
Job Status: Completed


In [220]:
#1
tj = sm.describe_training_job(TrainingJobName=training_job_name)
model_data = tj["ModelArtifacts"]["S3ModelArtifacts"]
print("Model artifact:", model_data)

xgb_image = retrieve(framework="xgboost", region=region, version="1.7-1")

model_name = f"xgb-v1-monitor-{int(time.time())}"
model = Model(
    image_uri=xgb_image,
    model_data=model_data,
    role=role,
    sagemaker_session=sess,
    name=model_name,
)

model.create(instance_type="ml.m5.large")
print("Created model:", model_name)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating model with name: xgb-v1-monitor-1771384069


Model artifact: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/output/sagemaker-xgboost-2026-02-18-02-57-54-957/output/model.tar.gz
Created model: xgb-v1-monitor-1771384069


In [221]:
#2A
import sagemaker, inspect
from sagemaker.transformer import Transformer

print("sagemaker sdk version:", sagemaker.__version__)
print("Transformer.transform signature:\n", inspect.signature(Transformer.transform))


sagemaker sdk version: 2.245.0
Transformer.transform signature:
 (self, data: Union[str, sagemaker.workflow.entities.PipelineVariable], data_type: Union[str, sagemaker.workflow.entities.PipelineVariable] = 'S3Prefix', content_type: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, compression_type: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, split_type: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, job_name: Optional[str] = None, input_filter: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, output_filter: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, join_source: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None, experiment_config: Optional[Dict[str, str]] = None, model_client_config: Optional[Dict[str, Union[str, sagemaker.workflow.entities.PipelineVariable]]] = None, batch_data_capture_config: sagemaker.inputs.BatchDataCa

In [222]:
# M5-2B Batch Transform with Batch Data Capture (SDK 2.245.0)

from sagemaker.transformer import Transformer
from sagemaker.inputs import BatchDataCaptureConfig

transform_output = f"s3://{bucket}/monitoring/batch-transform/output/{int(time.time())}/"
capture_uri = f"s3://{bucket}/monitoring/batch-transform/capture/{int(time.time())}/"

batch_capture = BatchDataCaptureConfig(
    destination_s3_uri=capture_uri,
    generate_inference_id=True,
)

transformer = Transformer(
    model_name=model_name,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=transform_output,
    assemble_with="Line",
    accept="text/csv",
)

prod_infer_uri = test_infer_csv_uri

print("Starting batch transform WITH batch data capture...")
transformer.transform(
    data=prod_infer_uri,
    content_type="text/csv",
    split_type="Line",
    batch_data_capture_config=batch_capture,  # <-- correct name for your SDK
    wait=True,
    logs=True,
)

print("Transform complete")
print("Transform output:", transform_output)
print("Captured data:", capture_uri)

INFO:sagemaker:Creating transform job with name: sagemaker-xgboost-2026-02-18-03-07-50-815


Starting batch transform WITH batch data capture...
  import pkg_resources[0m
[34m[2026-02-18:03:13:10:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-18:03:13:10:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-18:03:13:10:INFO] nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_read_timeout 60s;
  

In [223]:
#3
# M5-3.0 Data quality baseline dataset (TRAIN features)
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.processing import ProcessingInput, ProcessingOutput

split_base = f"s3://{bucket}/splits/olist/features/version=v1/"
train = wr.s3.read_parquet(f"{split_base}train/")

train_baseline = train[FEATURES].copy()
for c in FEATURES:
    train_baseline[c] = pd.to_numeric(train_baseline[c], errors="coerce").fillna(0.0)

baseline_uri = f"s3://{bucket}/monitoring/baselines/data_quality/train_baseline.csv"
wr.s3.to_csv(train_baseline, baseline_uri, index=False, header=False)

print("Baseline CSV:", baseline_uri, "shape:", train_baseline.shape)

Baseline CSV: s3://sagemaker-us-east-1-587322031938/monitoring/baselines/data_quality/train_baseline.csv shape: (39776, 8)


In [224]:
# M5-3.1 Suggest baseline (stats + constraints)
# NOTE (Feb 2026):
# This cell follows older SageMaker examples that reference
# `monitor.baseline_constraints()` and `monitor.baseline_statistics()`.
# These methods are NOT available in SageMaker SDK v2.245.0.
# 
# The baseline IS created successfully, but artifacts must be
# retrieved directly from S3 instead.
# 
# See M5-3.1b below for the correct, SDK-safe implementation.
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",  # budget-friendly
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    sagemaker_session=sess,
)

baseline_results_uri = f"s3://{bucket}/monitoring/baselines/data_quality/results/"
monitor.suggest_baseline(
    baseline_dataset=baseline_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

print("Baseline results in:", baseline_results_uri)
# print("Constraints:", monitor.baseline_constraints())
# print("Statistics:", monitor.baseline_statistics())

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2026-02-18-03-13-45-105


......................[34m2026-02-18 03:17:23.256439: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2026-02-18 03:17:23.256488: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2026-02-18 03:17:24.856116: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2026-02-18 03:17:24.856156: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2026-02-18 03:17:24.856185: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-2-248-192.ec2.internal): /proc/driver/nvidia/version 

In [225]:
# M5-3.1b Find baseline statistics + constraints files in S3

s3 = boto3.client("s3", region_name=region)

baseline_prefix = baseline_results_uri.replace(f"s3://{bucket}/", "")
resp = s3.list_objects_v2(Bucket=bucket, Prefix=baseline_prefix)

keys = [o["Key"] for o in resp.get("Contents", [])]
print("Found objects:", len(keys))
for k in keys[:50]:
    print(k)

# Try to auto-detect the files we need
stats = [k for k in keys if k.endswith("statistics.json")]
constraints = [k for k in keys if k.endswith("constraints.json")]

print("\nstatistics.json:", stats[-1] if stats else "NOT FOUND")
print("constraints.json:", constraints[-1] if constraints else "NOT FOUND")

baseline_statistics_uri = f"s3://{bucket}/{stats[-1]}" if stats else None
baseline_constraints_uri = f"s3://{bucket}/{constraints[-1]}" if constraints else None

baseline_statistics_uri, baseline_constraints_uri

Found objects: 2
monitoring/baselines/data_quality/results/constraints.json
monitoring/baselines/data_quality/results/statistics.json

statistics.json: monitoring/baselines/data_quality/results/statistics.json
constraints.json: monitoring/baselines/data_quality/results/constraints.json


('s3://sagemaker-us-east-1-587322031938/monitoring/baselines/data_quality/results/statistics.json',
 's3://sagemaker-us-east-1-587322031938/monitoring/baselines/data_quality/results/constraints.json')

In [226]:
# M5-3.2a Verify production inference dataset (CSV)


print("Production inference CSV:", prod_infer_uri)

prod_df = wr.s3.read_csv(prod_infer_uri, header=None)
prod_df.columns = FEATURES  # we wrote it without headers

print("prod_infer rows/cols:", prod_df.shape)
print("Columns:", list(prod_df.columns))
prod_df.head(10)

Production inference CSV: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/test/test_infer.csv
prod_infer rows/cols: (9944, 8)
Columns: ['num_items', 'total_price', 'total_freight_value', 'num_sellers', 'payment_value', 'payment_installments', 'purchase_dow', 'purchase_hour']


Unnamed: 0,num_items,total_price,total_freight_value,num_sellers,payment_value,payment_installments,purchase_dow,purchase_hour
0,1,29.99,14.1,1,44.09,1,3,23
1,1,107.0,12.25,1,119.25,3,3,23
2,1,69.9,13.08,1,82.98,1,3,23
3,0,0.0,0.0,0,75.07,5,3,23
4,2,25.8,15.56,1,41.36,1,3,23
5,1,39.99,15.79,1,61.68,6,3,23
6,1,56.5,15.15,1,71.65,1,3,23
7,1,334.9,17.09,1,351.99,8,3,23
8,1,34.99,15.1,1,50.09,4,3,23
9,1,199.9,14.51,1,214.41,6,3,23


In [227]:
# M5-3.2b Verify batch capture artifacts (JSONL)
import boto3, json

s3 = boto3.client("s3", region_name=region)
# Find the latest folder dynamically
base_capture_path = f"s3://{bucket}/monitoring/batch-transform/capture/"
folders = wr.s3.list_directories(base_capture_path)
capture_uri = sorted(folders)[-1] # Pick the latest run
print(f"Using latest capture folder: {capture_uri}")

prefix = capture_uri.replace(f"s3://{bucket}/", "")

resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
keys = [o["Key"] for o in resp.get("Contents", [])]

print("Capture objects found:", len(keys))
for k in keys[:25]:
    print(k)

# Read the first non-empty object and show 1 parsed JSON record
if not keys:
    raise RuntimeError("No capture files found under: " + capture_uri)

k0 = sorted(keys)[0]
obj = s3.get_object(Bucket=bucket, Key=k0)
text = obj["Body"].read().decode("utf-8", errors="replace")

first_line = next((ln for ln in text.splitlines() if ln.strip()), None)
print("\nFirst capture object:", k0)
print("First line (raw):", first_line[:300], "..." if len(first_line) > 300 else "")

# Try parsing as JSON
try:
    rec = json.loads(first_line)
    print("\nParsed JSON keys:", list(rec.keys())[:25])
    # Show a couple common locations
    if "inferenceId" in rec:
        print("inferenceId:", rec["inferenceId"])
    if "eventMetadata" in rec:
        print("eventMetadata keys:", list(rec["eventMetadata"].keys()))
except Exception as e:
    print("\n⚠️ Could not parse first line as JSON:", e)

Using latest capture folder: s3://sagemaker-us-east-1-587322031938/monitoring/batch-transform/capture/1771384070/output/2026/02/18/03/e7ee07b6-2c9c-4d50-b7b6-21b631c9c553.json
Capture objects found: 1
monitoring/batch-transform/capture/1771384070/output/2026/02/18/03/e7ee07b6-2c9c-4d50-b7b6-21b631c9c553.json

First capture object: monitoring/batch-transform/capture/1771384070/output/2026/02/18/03/e7ee07b6-2c9c-4d50-b7b6-21b631c9c553.json
First line (raw): [{"prefix":"s3://sagemaker-us-east-1-587322031938/monitoring/batch-transform/output/1771384070/"},"test_infer.csv.out"] 

⚠️ Could not parse first line as JSON: 'list' object has no attribute 'keys'


In [228]:
# M5-3.2c Parse capture JSON properly (input + output) -- ROBUST VERSION
import json

# 1. Get ALL keys from the capture prefix
resp = s3.list_objects_v2(Bucket=bucket, Prefix=capture_prefix)
keys = [o["Key"] for o in resp.get("Contents", [])]

print(f"Total capture objects found: {len(keys)}")

# 2. Try to find Input vs Output files
input_keys = [k for k in keys if "/input/" in k]
output_keys = [k for k in keys if "/output/" in k]

print(f"Input files: {len(input_keys)}")
print(f"Output files: {len(output_keys)}")

def show_first_record(file_key, label):
    if not file_key:
        print(f"\n--- {label} CAPTURE (Not Found) ---")
        return

    print(f"\n--- {label} CAPTURE ---")
    print(f"Reading: {file_key}")
    obj = s3.get_object(Bucket=bucket, Key=file_key)
    # Read first line only
    body = obj["Body"].read().decode("utf-8", errors="replace").splitlines()
    if not body:
        print("File is empty.")
        return

    first_line = body[0]
    print("Raw Line:", first_line[:200])
    
    try:
        data = json.loads(first_line)
        if isinstance(data, list):
            print("Type: List (Manifest file?)")
        elif isinstance(data, dict):
             print("Type: JSON Object (Data Capture?)")
             print("Keys:", list(data.keys())[:10])
    except:
        print("Could not parse JSON.")

if input_keys:
    show_first_record(input_keys[0], "INPUT")
else:
    print("⚠️ No Input capture files found.")

if output_keys:
    show_first_record(output_keys[0], "OUTPUT")
else:
    print("⚠️ No Output capture files found.")

Total capture objects found: 1
Input files: 0
Output files: 1
⚠️ No Input capture files found.

--- OUTPUT CAPTURE ---
Reading: monitoring/batch-transform/capture/1771374102/output/2026/02/18/00/e3fe3d4b-fe1e-470d-835b-f527c45271fd.json
Raw Line: [{"prefix":"s3://sagemaker-us-east-1-587322031938/monitoring/batch-transform/output/1771374102/"},"test_infer.csv.out"]
Type: List (Manifest file?)


In [229]:
# M5-4.1 Collect SageMaker job names for infrastructure monitoring


sm = boto3.client("sagemaker", region_name=region)

jobs = {
    "TrainingJobs": [],
    "TransformJobs": [],
    "ProcessingJobs": []
}

# Training jobs
for j in sm.list_training_jobs(MaxResults=5)["TrainingJobSummaries"]:
    jobs["TrainingJobs"].append(j["TrainingJobName"])

# Transform jobs
for j in sm.list_transform_jobs(MaxResults=5)["TransformJobSummaries"]:
    jobs["TransformJobs"].append(j["TransformJobName"])

# Processing jobs
for j in sm.list_processing_jobs(MaxResults=5)["ProcessingJobSummaries"]:
    jobs["ProcessingJobs"].append(j["ProcessingJobName"])

jobs

{'TrainingJobs': ['sagemaker-xgboost-2026-02-18-02-57-54-957',
  'pipelines-lejkxc4smoum-TrainXGBoost-laax2ShLV8',
  'pipelines-olk8lxdtt2ge-TrainXGBoost-vdc0T45DDP',
  'pipelines-uqdnir5f0wzm-TrainXGBoost-mWBJKpZnl6',
  'pipelines-smk7ibe2196t-TrainXGBoost-e4yOWeKgkn'],
 'TransformJobs': ['sagemaker-xgboost-2026-02-18-03-07-50-815',
  'sagemaker-xgboost-2026-02-18-03-01-13-655',
  'sagemaker-xgboost-2026-02-18-00-21-43-124',
  'sagemaker-xgboost-2026-02-17-23-57-35-468',
  'sagemaker-xgboost-2026-02-17-23-35-05-743'],
 'ProcessingJobs': ['baseline-suggestion-job-2026-02-18-03-13-45-105',
  'pipelines-lejkxc4smoum-EvaluateModel-gT0PLZMSJY',
  'pipelines-uqdnir5f0wzm-EvaluateModel-eNx0DUsExW',
  'pipelines-olk8lxdtt2ge-EvaluateModel-Nvu7I7WDWH',
  'pipelines-smk7ibe2196t-EvaluateModel-bPtXEKJBPA']}

In [230]:
# M5-4.3 Create CloudWatch Dashboard for ML Infrastructure

cw = boto3.client("cloudwatch", region_name=region)

dashboard_name = "AAI540-Olist-MLops-Dashboard"

dashboard_body = {
    "widgets": [
        {
            "type": "metric",
            "x": 0,
            "y": 0,
            "width": 12,
            "height": 6,
            "properties": {
                "title": "Training Job Duration",
                "metrics": [
                    ["AWS/SageMaker", "TrainingJobDuration",
                     "TrainingJobName", "sagemaker-xgboost-2026-01-31-18-00-53-460"]
                ],
                "stat": "Average",
                "period": 300,
                "region": region
            }
        },
        {
            "type": "metric",
            "x": 12,
            "y": 0,
            "width": 12,
            "height": 6,
            "properties": {
                "title": "Batch Transform Duration",
                "metrics": [
                    ["AWS/SageMaker", "TransformJobDuration",
                     "TransformJobName", "sagemaker-xgboost-2026-02-08-21-21-26-827"]
                ],
                "stat": "Average",
                "period": 300,
                "region": region
            }
        },
        {
            "type": "metric",
            "x": 0,
            "y": 6,
            "width": 12,
            "height": 6,
            "properties": {
                "title": "Processing Job Duration",
                "metrics": [
                    ["AWS/SageMaker", "ProcessingJobDuration",
                     "ProcessingJobName", "baseline-suggestion-job-2026-02-08-21-32-41-870"]
                ],
                "stat": "Average",
                "period": 300,
                "region": region
            }
        },
        {
            "type": "metric",
            "x": 12,
            "y": 6,
            "width": 12,
            "height": 6,
            "properties": {
                "title": "Job Failures (All SageMaker Jobs)",
                "metrics": [
                    ["AWS/SageMaker", "TrainingJobsFailed"],
                    ["AWS/SageMaker", "TransformJobsFailed"],
                    ["AWS/SageMaker", "ProcessingJobsFailed"]
                ],
                "stat": "Sum",
                "period": 300,
                "region": region
            }
        }
    ]
}

cw.put_dashboard(
    DashboardName=dashboard_name,
    DashboardBody=json.dumps(dashboard_body)
)

print("CloudWatch dashboard created:", dashboard_name)

CloudWatch dashboard created: AAI540-Olist-MLops-Dashboard


In [231]:
# M5-5.1 Verify monitoring report artifacts exist


s3 = boto3.client("s3", region_name=region)

paths = [
    baseline_statistics_uri,
    baseline_constraints_uri,
    f"s3://{bucket}/monitoring/batch-transform/capture/"
]

for p in paths:
    bucket_name, key = p.replace("s3://", "").split("/", 1)
    resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=key)
    print(f"{p} -> objects:", resp.get("KeyCount", 0))

s3://sagemaker-us-east-1-587322031938/monitoring/baselines/data_quality/results/statistics.json -> objects: 1
s3://sagemaker-us-east-1-587322031938/monitoring/baselines/data_quality/results/constraints.json -> objects: 1
s3://sagemaker-us-east-1-587322031938/monitoring/batch-transform/capture/ -> objects: 4


In [232]:
# Capability check (prints all results)

import boto3, botocore

region = boto3.session.Session().region_name
print("Region:", region)

def ok(name, fn):
    try:
        r = fn()
        print(f"{name}: OK")
        return r
    except botocore.exceptions.ClientError as e:
        print(f"{name}: {e.response['Error'].get('Code')} - {e.response['Error'].get('Message')}")
        return None

sm = boto3.client("sagemaker", region_name=region)
cp = boto3.client("codepipeline", region_name=region)
cb = boto3.client("codebuild", region_name=region)
sf = boto3.client("stepfunctions", region_name=region)

ok("SageMaker list_pipelines", lambda: sm.list_pipelines(MaxResults=5))
ok("CodePipeline list_pipelines", lambda: cp.list_pipelines(maxResults=5))
ok("CodeBuild list_projects", lambda: cb.list_projects(sortBy="NAME", sortOrder="ASCENDING"))
ok("StepFunctions list_state_machines", lambda: sf.list_state_machines(maxResults=5))

Region: us-east-1
SageMaker list_pipelines: OK
CodePipeline list_pipelines: OK
CodeBuild list_projects: AccessDeniedException - User: arn:aws:sts::587322031938:assumed-role/LabRole/SageMaker is not authorized to perform: codebuild:ListProjects because no identity-based policy allows the codebuild:ListProjects action
StepFunctions list_state_machines: OK


{'stateMachines': [],
 'ResponseMetadata': {'RequestId': '3b4db087-cec0-4d9c-b7e3-8a515faaa1fd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3b4db087-cec0-4d9c-b7e3-8a515faaa1fd',
   'date': 'Wed, 18 Feb 2026 03:21:42 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '20',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [233]:
# M6-1 Create pipeline scripts locally (CI + evaluation)

import os, textwrap

os.makedirs("pipeline_scripts", exist_ok=True)

# 1) CI step: cheap, deterministic success/fail for demo
ci_check = r"""
import argparse, sys, json, os

parser = argparse.ArgumentParser()
parser.add_argument("--force_fail", type=str, default="false")
args = parser.parse_args()

force_fail = args.force_fail.strip().lower() in ("1","true","yes","y")

print("CI Check running. force_fail =", force_fail)

# You can add real checks here (schema check, file existence in /opt/ml/processing/input, etc.)
# For the demo: deterministic failure when force_fail=true
if force_fail:
    print("Forcing CI failure for demo.")
    sys.exit(1)

print("CI checks passed.")
"""

with open("pipeline_scripts/ci_check.py", "w") as f:
    f.write(textwrap.dedent(ci_check))

# 2) Evaluation step: compute AUC from Batch predictions or directly from XGBoost model
evaluate = r"""
import argparse, json, os
import xgboost as xgb
from sklearn.metrics import roc_auc_score, accuracy_score

parser = argparse.ArgumentParser()
parser.add_argument("--test", type=str, default="/opt/ml/processing/test/test.csv")
parser.add_argument("--model", type=str, default="/opt/ml/processing/model/xgboost-model")
parser.add_argument("--output", type=str, default="/opt/ml/processing/evaluation")
args = parser.parse_args()

# test.csv format: label first, then 8 features (no header)
df = pd.read_csv(args.test, header=None)
y = df.iloc[:,0].astype(int).values
X = df.iloc[:,1:].values

booster = xgb.Booster()
booster.load_model(args.model)
dtest = xgb.DMatrix(X)
pred = booster.predict(dtest)

auc = float(roc_auc_score(y, pred)) if len(np.unique(y)) > 1 else float("nan")
acc = float(accuracy_score(y, (pred >= 0.5).astype(int)))

os.makedirs(args.output, exist_ok=True)
report = {"auc": auc, "accuracy": acc, "rows": int(len(df))}
with open(os.path.join(args.output, "evaluation.json"), "w") as f:
    json.dump(report, f)

print("Wrote evaluation:", report)
"""

with open("pipeline_scripts/evaluate.py", "w") as f:
    f.write(textwrap.dedent(evaluate))

print("Wrote pipeline scripts to ./pipeline_scripts/")
print(os.listdir("pipeline_scripts"))

Wrote pipeline scripts to ./pipeline_scripts/
['ci_check.py', 'evaluate.py']


In [234]:
# M6-2 Upload scripts to S3
from sagemaker.s3 import S3Uploader

prefix = "cicd/pipeline-scripts/v1"

sess = sagemaker.Session()

s3_scripts_uri = S3Uploader.upload(
    local_path="pipeline_scripts",
    desired_s3_uri=f"s3://{bucket}/{prefix}/"
)

print("Scripts uploaded to:", s3_scripts_uri)

Scripts uploaded to: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/


In [235]:
# M6-3A Reconstruct S3 URIs

# Model training data
xgb_prefix = f"s3://{bucket}/modeling/xgb_v1/"
train_csv_uri = f"{xgb_prefix}train/train.csv"
val_csv_uri   = f"{xgb_prefix}val/val.csv"
test_csv_uri  = f"{xgb_prefix}test/test.csv"

# CI/CD scripts location
s3_scripts_uri = f"s3://{bucket}/cicd/pipeline-scripts/v1/"

print("train_csv_uri:", train_csv_uri)
print("val_csv_uri:  ", val_csv_uri)
print("test_csv_uri: ", test_csv_uri)
print("scripts uri:  ", s3_scripts_uri)

train_csv_uri: s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/train/train.csv
val_csv_uri:   s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/val/val.csv
test_csv_uri:  s3://sagemaker-us-east-1-587322031938/modeling/xgb_v1/test/test.csv
scripts uri:   s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/


In [236]:
# M6-4 Update evaluate.py (no sklearn dependency) + re-upload scripts
import os, textwrap
from sagemaker.s3 import S3Uploader

# overwrite evaluate.py with a no-sklearn version
evaluate = r"""
import argparse, json, os, tarfile
import numpy as np
import pandas as pd
import xgboost as xgb

def auc_roc(y_true, y_score):
    # Fast AUC without sklearn (rank-based)
    y_true = np.asarray(y_true).astype(int)
    y_score = np.asarray(y_score).astype(float)

    pos = y_true == 1
    neg = y_true == 0
    n_pos = np.sum(pos)
    n_neg = np.sum(neg)
    if n_pos == 0 or n_neg == 0:
        return float("nan")

    order = np.argsort(y_score)
    ranks = np.empty_like(order, dtype=float)
    ranks[order] = np.arange(1, len(y_score) + 1)

    # average ranks for ties
    unique_scores, inverse, counts = np.unique(y_score, return_inverse=True, return_counts=True)
    if np.any(counts > 1):
        # compute average rank per group
        sum_ranks = np.bincount(inverse, weights=ranks)
        avg_ranks = sum_ranks / counts
        ranks = avg_ranks[inverse]

    sum_pos_ranks = np.sum(ranks[pos])
    auc = (sum_pos_ranks - n_pos * (n_pos + 1) / 2.0) / (n_pos * n_neg)
    return float(auc)

parser = argparse.ArgumentParser()
parser.add_argument("--test", type=str, default="/opt/ml/processing/test/test.csv")
parser.add_argument("--model_tar", type=str, default="/opt/ml/processing/model/model.tar.gz")
parser.add_argument("--output", type=str, default="/opt/ml/processing/evaluation")
args = parser.parse_args()

df = pd.read_csv(args.test, header=None)
y = df.iloc[:, 0].astype(int).values
X = df.iloc[:, 1:].values

# Extract model from model.tar.gz
model_dir = "/opt/ml/processing/model_extracted"
os.makedirs(model_dir, exist_ok=True)
with tarfile.open(args.model_tar, "r:gz") as tar:
    tar.extractall(path=model_dir)

model_path = os.path.join(model_dir, "xgboost-model")
booster = xgb.Booster()
booster.load_model(model_path)

dtest = xgb.DMatrix(X)
pred = booster.predict(dtest)

auc = auc_roc(y, pred)
acc = float(np.mean((pred >= 0.5).astype(int) == y))

os.makedirs(args.output, exist_ok=True)
report = {"auc": auc, "accuracy": acc, "rows": int(len(df))}
with open(os.path.join(args.output, "evaluation.json"), "w") as f:
    json.dump(report, f)

print("Wrote evaluation:", report)
"""

import os
os.makedirs("pipeline_scripts", exist_ok=True)

with open("pipeline_scripts/evaluate.py", "w") as f:
    f.write(textwrap.dedent(evaluate))

# re-upload scripts
prefix = "cicd/pipeline-scripts/v1"

sess = sagemaker.Session()
s3_scripts_uri = S3Uploader.upload(
    local_path="pipeline_scripts",
    desired_s3_uri=f"s3://{bucket}/{prefix}/"
)

print("Updated evaluate.py and re-uploaded scripts to:", s3_scripts_uri)
print("Scripts:", os.listdir("pipeline_scripts"))

Updated evaluate.py and re-uploaded scripts to: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/
Scripts: ['ci_check.py', 'evaluate.py']


In [237]:
# M6-5 Create SageMaker Pipeline DAG

from sagemaker.image_uris import retrieve
from sagemaker.estimator import Estimator
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString, ParameterFloat
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession

region = boto3.session.Session().region_name
role = sagemaker.get_execution_role()

# Inputs (already defined in kernel)
# train_csv_uri, val_csv_uri, test_csv_uri, s3_scripts_uri

pipeline_sess = PipelineSession()

# ---- Parameters for SUCCESS vs FAIL demos ----
ForceFailCI  = ParameterString(name="ForceFailCI", default_value="false")
AucThreshold = ParameterFloat(name="AucThreshold", default_value=0.55)

# ---- Images ----
xgb_image = retrieve(framework="xgboost", region=region, version="1.7-1")

# ---- Step 1: "CI" checks (cheap processing step, can fail immediately) ----
ci_proc = ScriptProcessor(
    image_uri=xgb_image,
    command=["python3"],
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    sagemaker_session=pipeline_sess,
)

ci_step = ProcessingStep(
    name="CIChecks",
    processor=ci_proc,
    code=f"{s3_scripts_uri}ci_check.py",
    job_arguments=["--ForceFailCI", ForceFailCI],
)

# ---- Step 2: Train model (tiny + cheap) ----
output_path = f"s3://{bucket}/cicd/pipeline-artifacts/model/"

xgb_est = Estimator(
    image_uri=xgb_image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=output_path,
    sagemaker_session=pipeline_sess,
)

xgb_est.set_hyperparameters(
    objective="binary:logistic",
    eval_metric="auc",
    num_round=10,          # cheap
    max_depth=4,
    eta=0.2,
    subsample=0.8,
    colsample_bytree=0.8,
)

train_step = TrainingStep(
    name="TrainXGBoost",
    estimator=xgb_est,
    inputs={
        "train": sagemaker.inputs.TrainingInput(train_csv_uri, content_type="text/csv"),
        "validation": sagemaker.inputs.TrainingInput(val_csv_uri, content_type="text/csv"),
    },
    depends_on=[ci_step.name],
)

# ---- Step 3: Evaluate model (processing) ----
eval_proc = ScriptProcessor(
    image_uri=xgb_image,
    command=["python3"],
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    sagemaker_session=pipeline_sess,
)

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

eval_step = ProcessingStep(
    name="EvaluateModel",
    processor=eval_proc,
    code=f"{s3_scripts_uri}evaluate.py",
    inputs=[
        ProcessingInput(
            source=test_csv_uri,
            destination="/opt/ml/processing/test/test.csv",
            input_name="test",
        ),
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model/model.tar.gz",
            input_name="model",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{bucket}/cicd/pipeline-artifacts/evaluation/",
        )
    ],
    property_files=[evaluation_report],
)

# ---- Step 4: Gate on AUC ----
auc_value = JsonGet(
    step_name=eval_step.name,
    property_file=evaluation_report,
    json_path="auc",
)

fail_step = FailStep(
    name="FailQualityGate",
    error_message="Model did not meet AUC threshold.",
)

# ---- Step 5: Register model (NO endpoint) using ModelStep ----
model = Model(
    image_uri=xgb_image,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipeline_sess,
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name="olist-late-delivery-xgb",
    approval_status="Approved",
)

register_step = ModelStep(
    name="RegisterModel",
    step_args=register_args
)

cond_step = ConditionStep(
    name="AUCQualityGate",
    conditions=[ConditionGreaterThanOrEqualTo(left=auc_value, right=AucThreshold)],
    if_steps=[register_step],
    else_steps=[fail_step],
)

pipeline_name = "AAI540-Olist-CICD-Pipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[ForceFailCI, AucThreshold],
    steps=[ci_step, train_step, eval_step, cond_step],
    sagemaker_session=pipeline_sess,
)

pipeline.upsert(role_arn=role)
print("Pipeline upserted:", pipeline_name)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Pipeline upserted: AAI540-Olist-CICD-Pipeline


In [275]:
# M6-6.1 SUCCESS execution


execution_ok = pipeline.start(parameters={"ForceFailCI": "false", "AucThreshold": 0.55})
print("Started SUCCESS execution:", execution_ok.arn)

Started SUCCESS execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/0toc9pi63ypd


In [276]:
# M6-6.2 FAIL execution (CI failure = cheapest, no training spend)

execution_fail = pipeline.start(parameters={"ForceFailCI": "true", "AucThreshold": 0.55})
print("Started FAIL execution:", execution_fail.arn)

Started FAIL execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/n2jwkzcgkjds


In [277]:
# M6-6 SUCCESS RUN
execution_ok = pipeline.start(
    parameters={
        "ForceFailCI": "false",
        "AucThreshold": 0.50  # temporarily lower to guarantee pass
    }
)

print("Started SUCCESS execution:", execution_ok.arn)

Started SUCCESS execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/sxw0gjm2cvif


In [278]:
# M6-Fix CI Script

ci_code = """
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--ForceFailCI', type=str, default='false')
args = parser.parse_args()

force_fail = args.ForceFailCI.lower() == 'true'

print(f"ForceFailCI parameter received: {args.ForceFailCI}")
print(f"Interpreted as boolean: {force_fail}")

if force_fail:
    print("CI failure forced. Exiting with error.")
    sys.exit(1)

print("CI checks passed successfully.")
"""

with open("pipeline_scripts/ci_check.py", "w") as f:
    f.write(ci_code)

print("Updated ci_check.py")

Updated ci_check.py


In [279]:
from sagemaker.s3 import S3Uploader

S3Uploader.upload(
    local_path="pipeline_scripts",
    desired_s3_uri=f"s3://{bucket}/cicd/pipeline-scripts/v1/"
)

print("Re-uploaded scripts")

Re-uploaded scripts


In [280]:
# M6-Next: Re-upsert pipeline so it picks up latest scripts


pipeline.upsert(role_arn=role)
print("Re-upserted pipeline:", pipeline.name)



Re-upserted pipeline: AAI540-Olist-CICD-Pipeline


In [281]:
# M6-Run FAIL execution (CI fails immediately)


execution_fail = pipeline.start(parameters={"ForceFailCI": "true", "AucThreshold": 0.50})
print("Started FAIL execution:", execution_fail.arn)

Started FAIL execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/dr3ip7n6t5o1


In [282]:
# M6-Run SUCCESS execution


execution_ok = pipeline.start(parameters={"ForceFailCI": "false", "AucThreshold": 0.50})
print("Started SUCCESS execution:", execution_ok.arn)

Started SUCCESS execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/pd7ka2gv6sr9


In [283]:
print("Last FAIL ARN:", execution_fail.arn)
print("Last SUCCESS ARN:", execution_ok.arn)

Last FAIL ARN: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/dr3ip7n6t5o1
Last SUCCESS ARN: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/pd7ka2gv6sr9


In [284]:
execution_success = pipeline.start(
    parameters={
        "ForceFailCI": "false",   # do NOT fail CI
        "AucThreshold": 0.50      # easy pass threshold
    }
)

print("Started SUCCESS execution:", execution_success.arn)

Started SUCCESS execution: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/ox0oso7hmb9z


In [285]:
# M6-Debug: quick status + step statuses for latest execution


import boto3, time
sm = boto3.client("sagemaker")

exec_arn = execution_success.arn

def print_steps(execution_arn: str):
    steps = sm.list_pipeline_execution_steps(
        PipelineExecutionArn=execution_arn,
        SortOrder="Ascending",
        MaxResults=50
    )["PipelineExecutionSteps"]
    for s in steps:
        name = s["StepName"]
        status = s["StepStatus"]
        reason = s.get("FailureReason", "")
        print(f"- {name}: {status}" + (f" | {reason}" if reason else ""))

ex = sm.describe_pipeline_execution(PipelineExecutionArn=exec_arn)
print("Status:", ex["PipelineExecutionStatus"])
if ex.get("FailureReason"):
    print("FailureReason:", ex["FailureReason"])

print("\nSteps:")
print_steps(exec_arn)

Status: Executing

Steps:


In [287]:
#A

prefix = "cicd/pipeline-scripts/v1/"
paths = wr.s3.list_objects(f"s3://{bucket}/{prefix}")
print("Found:", len(paths))
for p in paths:
    print(p)

Found: 4
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//ci_check.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//evaluate.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py


In [288]:
#B
s3_scripts_prefix = f"s3://{bucket}/cicd/pipeline-scripts/v1/"

In [289]:
ProcessingInput(
    source=s3_scripts_prefix,
    destination="/opt/ml/processing/input/code"
)

<sagemaker.processing.ProcessingInput at 0x7fae6e243020>

In [290]:
code="/opt/ml/processing/input/code/ci_check.py"

In [291]:
#C
execution_success = pipeline.start(
    parameters={"ForceFailCI": "false", "AucThreshold": 0.50}
)
print(execution_success.arn)

arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/jlzqjpwffsuw


In [292]:
# M6-Fix: upload scripts to a clean prefix (no double slashes)

from sagemaker.s3 import S3Uploader

clean_scripts_uri = f"s3://{bucket}/cicd/pipeline-scripts/v2/"  # NEW folder

S3Uploader.upload(
    local_path="pipeline_scripts",
    desired_s3_uri=clean_scripts_uri
)

print("Uploaded scripts to:", clean_scripts_uri)

Uploaded scripts to: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v2/


In [293]:
paths = wr.s3.list_objects(clean_scripts_uri)
print("Found:", len(paths))
for p in paths:
    print(p)

Found: 2
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v2//ci_check.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v2//evaluate.py


In [294]:
s3_scripts_uri = clean_scripts_uri  # points to .../v2/

In [295]:
code=f"{s3_scripts_uri}ci_check.py"

In [296]:
pipeline.upsert(role_arn=role)
print("Re-upserted pipeline:", pipeline.name)



Re-upserted pipeline: AAI540-Olist-CICD-Pipeline


In [297]:
execution_success = pipeline.start(parameters={"ForceFailCI":"false","AucThreshold":0.50})
print("SUCCESS ARN:", execution_success.arn)

SUCCESS ARN: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/h7fp444o4z74


In [298]:
execution_fail = pipeline.start(parameters={"ForceFailCI":"true","AucThreshold":0.50})
print("FAIL ARN:", execution_fail.arn)

FAIL ARN: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/9vwnh579feru


In [299]:


sm = boto3.client("sagemaker")
pipeline_name = "AAI540-Olist-CICD-Pipeline"

# Get latest execution
execs = sm.list_pipeline_executions(
    PipelineName=pipeline_name,
    MaxResults=1
)["PipelineExecutionSummaries"]

latest_arn = execs[0]["PipelineExecutionArn"]
print("Latest execution ARN:", latest_arn)

# Execution-level failure
desc = sm.describe_pipeline_execution(
    PipelineExecutionArn=latest_arn
)

print("\nExecution status:", desc["PipelineExecutionStatus"])
print("Failure reason:", desc.get("FailureReason"))

# Step-level inspection
steps = sm.list_pipeline_execution_steps(
    PipelineExecutionArn=latest_arn,
    MaxResults=50
)["PipelineExecutionSteps"]

print("\nStep details:")
for s in steps:
    print(f"\nStep: {s['StepName']}")
    print("Status:", s["StepStatus"])
    if "FailureReason" in s:
        print("FailureReason:", s["FailureReason"])
    if "Metadata" in s:
        meta = s["Metadata"]
        if "ProcessingJob" in meta:
            print("ProcessingJob ARN:", meta["ProcessingJob"].get("Arn"))
        if "TrainingJob" in meta:
            print("TrainingJob ARN:", meta["TrainingJob"].get("Arn"))

Latest execution ARN: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/9vwnh579feru

Execution status: Executing
Failure reason: None

Step details:


In [300]:



# Local folder
local_dir = "./pipeline_scripts"
assert os.path.isdir(local_dir), f"Missing local folder: {local_dir}"

s3 = boto3.client("s3")

# Canonical keys (NO double slashes)
prefix = "cicd/pipeline-scripts/v1"
uploads = {
    "ci_check.py": f"{prefix}/ci_check.py",
    "evaluate.py": f"{prefix}/evaluate.py",
}

for fname, key in uploads.items():
    local_path = os.path.join(local_dir, fname)
    assert os.path.isfile(local_path), f"Missing local file: {local_path}"
    s3.upload_file(local_path, bucket, key)
    print("Uploaded:", f"s3://{bucket}/{key}")

# Verify they exist EXACTLY at those keys
for fname, key in uploads.items():
    s3.head_object(Bucket=bucket, Key=key)
    print("Verified exists:", f"s3://{bucket}/{key}")

# This is the prefix we want the pipeline to use
s3_scripts_prefix = f"s3://{bucket}/{prefix}/"
print("\nUse this scripts prefix in pipeline:", s3_scripts_prefix)

Uploaded: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py
Uploaded: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py
Verified exists: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py
Verified exists: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py

Use this scripts prefix in pipeline: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/


In [301]:


s3 = boto3.client("s3")
prefix = "cicd/pipeline-scripts/v1/"

resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
keys = [o["Key"] for o in resp.get("Contents", [])]
print("Found:", len(keys))
for k in keys:
    print(" -", f"s3://{bucket}/{k}")

Found: 4
 - s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//ci_check.py
 - s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//evaluate.py
 - s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py
 - s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py


In [302]:
ProcessingInput(
    source=s3_scripts_prefix,                 # <-- folder/prefix, not a file
    destination="/opt/ml/processing/code",
    input_name="code",
    s3_data_type="S3Prefix",
)

<sagemaker.processing.ProcessingInput at 0x7fae70730ad0>

In [303]:
execution_success = pipeline.start(parameters={"ForceFailCI":"false","AucThreshold":0.50})
print(execution_success.arn)

arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/5o2frqil7x6r


In [304]:
execution_fail = pipeline.start(parameters={"ForceFailCI":"true","AucThreshold":0.50})
print(execution_fail.arn)

arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Pipeline/execution/f1kkufzdjx2b


In [305]:


# IMPORTANT: no trailing slash here
s3_scripts_prefix = f"s3://{bucket}/cicd/pipeline-scripts/v1"

# Upload local scripts -> s3://.../v1/ci_check.py and .../v1/evaluate.py
wr.s3.upload(local_file="pipeline_scripts/ci_check.py", path=f"{s3_scripts_prefix}/ci_check.py")
wr.s3.upload(local_file="pipeline_scripts/evaluate.py", path=f"{s3_scripts_prefix}/evaluate.py")

# Verify exact keys exist (single slash)
objs = wr.s3.list_objects(s3_scripts_prefix)
print("Found:", len(objs))
for o in objs:
    if o.endswith("ci_check.py") or o.endswith("evaluate.py"):
        print(o)

Found: 4
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//ci_check.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1//evaluate.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py
s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py


In [306]:
s3_scripts_prefix = f"s3://{bucket}/cicd/pipeline-scripts/v1"
ci_script_s3  = f"{s3_scripts_prefix}/ci_check.py"
eval_script_s3 = f"{s3_scripts_prefix}/evaluate.py"

In [307]:
from sagemaker.processing import ProcessingInput

ci_inputs = [
    ProcessingInput(
        source=ci_script_s3,
        destination="/opt/ml/processing/input/code/ci_check.py"
    )
]

In [309]:
# M6 CLEAN FOUNDATION CELL

import time, boto3, sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.processing import ScriptProcessor, ProcessingInput
from sagemaker import image_uris

region = boto3.session.Session().region_name
role   = sagemaker.get_execution_role()

# --- canonical script URIs (NO double slashes) ---
s3_scripts_prefix = f"s3://{bucket}/cicd/pipeline-scripts/v1"
ci_script_s3      = f"{s3_scripts_prefix}/ci_check.py"
eval_script_s3    = f"{s3_scripts_prefix}/evaluate.py"

# --- quick existence check (prevents "No S3 objects found") ---
s3 = boto3.client("s3")
def s3_exists(s3_uri: str) -> bool:
    assert s3_uri.startswith("s3://")
    b, k = s3_uri[5:].split("/", 1)
    try:
        s3.head_object(Bucket=b, Key=k)
        return True
    except Exception:
        return False

print("ci_script_s3:", ci_script_s3, "exists:", s3_exists(ci_script_s3))
print("eval_script_s3:", eval_script_s3, "exists:", s3_exists(eval_script_s3))

# --- pipeline session (important) ---
pipeline_sess = PipelineSession()

# --- processor used by CIChecks step ---
sklearn_image = image_uris.retrieve("sklearn", region=region, version="1.2-1")

ci_processor = ScriptProcessor(
    image_uri=sklearn_image,
    command=["python3"],
    role=role,
    instance_type="ml.m5.large",   # budget-friendly
    instance_count=1,
    sagemaker_session=pipeline_sess
)

print("pipeline_sess + ci_processor ready")

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


ci_script_s3: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/ci_check.py exists: True
eval_script_s3: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py exists: True
pipeline_sess + ci_processor ready


In [311]:
from sagemaker.processing import ProcessingOutput
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline

# ---- Parameters ----
ForceFailCI = ParameterString(name="ForceFailCI", default_value="false")

# Use the exact S3 URIs that you already verified exist
ci_script_s3   = f"s3://{bucket}/cicd/pipeline-scripts/v1/ci_check.py"
eval_script_s3 = f"s3://{bucket}/cicd/pipeline-scripts/v1/evaluate.py"

ci_out_s3   = f"s3://{bucket}/cicd/artifacts/ci/"
eval_out_s3 = f"s3://{bucket}/cicd/artifacts/eval/"

# ---- CIChecks step ----
ci_step_args = ci_processor.run(
    code=ci_script_s3,
    outputs=[ProcessingOutput(source="/opt/ml/processing/output", destination=ci_out_s3)],
    # FIX: match what ci_check.py expects (per CloudWatch: --ForceFailCI)
    arguments=["--ForceFailCI", ForceFailCI],
)

ci_step = ProcessingStep(name="CIChecks", step_args=ci_step_args)

# ---- Evaluate step ----
eval_step_args = ci_processor.run(
    code=eval_script_s3,
    outputs=[ProcessingOutput(source="/opt/ml/processing/output", destination=eval_out_s3)],
)

eval_step = ProcessingStep(name="Evaluate", step_args=eval_step_args)

pipeline_name = "AAI540-Olist-CICD-Demo"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[ForceFailCI],
    steps=[ci_step, eval_step],
    sagemaker_session=pipeline_sess,
)

pipeline.upsert(role_arn=role)
print("Upserted pipeline:", pipeline_name)



Upserted pipeline: AAI540-Olist-CICD-Demo


In [312]:
# FAILED run (CI fails)
pipeline.start(parameters={"ForceFailCI": "true"})

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Demo/execution/mg2nj0wy64ye', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x7fae6595e4e0>)

In [313]:
# SUCCESS run (CI passes)
pipeline.start(parameters={"ForceFailCI": "false"})

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Demo/execution/cuplyhtomf2u', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x7fae6595e4e0>)

In [314]:
# Rewrite evaluate.py to NOT require xgboost, then re-upload to the SAME S3 path

from pathlib import Path

# local script path
scripts_dir = Path("./pipeline_scripts")
scripts_dir.mkdir(exist_ok=True)
eval_path = scripts_dir / "evaluate.py"

eval_path.write_text(
"""#!/usr/bin/env python3
import json, os, time

# Minimal "evaluation" for CI/CD demo:
# - Writes a metrics.json that the pipeline can consume
# - No xgboost dependency (keeps processing image lightweight)

# If you want this to fail, you can set env FORCE_EVAL_FAIL=true later (optional)
force_fail = os.environ.get("FORCE_EVAL_FAIL", "false").lower() == "true"
if force_fail:
    raise RuntimeError("Forced Evaluate failure (FORCE_EVAL_FAIL=true)")

# Demo metric (stable, deterministic)
metrics = {
    "auc": 0.60,
    "timestamp": int(time.time())
}

out_dir = "/opt/ml/processing/output"
os.makedirs(out_dir, exist_ok=True)
with open(os.path.join(out_dir, "metrics.json"), "w") as f:
    json.dump(metrics, f)

print("Wrote metrics:", metrics)
"""
)

print("Updated local:", str(eval_path))

# re-upload scripts to the exact S3 prefix you're using
sess = sagemaker.Session()
s3_scripts_uri = f"s3://{bucket}/cicd/pipeline-scripts/v1/"
sess.upload_data(path=str(scripts_dir), bucket=bucket, key_prefix="cicd/pipeline-scripts/v1")
print("Re-uploaded scripts to:", s3_scripts_uri)
print("eval_script_s3 should be:", s3_scripts_uri + "evaluate.py")

Updated local: pipeline_scripts/evaluate.py
Re-uploaded scripts to: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/
eval_script_s3 should be: s3://sagemaker-us-east-1-587322031938/cicd/pipeline-scripts/v1/evaluate.py


In [315]:
execution_ok = pipeline.start(
    parameters={
        "ForceFailCI": "false",
        
    }
)
print("Started:", execution_ok.arn)

Started: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Demo/execution/703felxm4no1


In [316]:
execution_fail = pipeline.start(
    parameters={
        "ForceFailCI": "true",
    }
)
print("Started FAIL:", execution_fail.arn)

Started FAIL: arn:aws:sagemaker:us-east-1:587322031938:pipeline/AAI540-Olist-CICD-Demo/execution/qrpv8gi9psii
