In [1]:
import json
import os
import re
import time
from datetime import datetime, timezone
from io import BytesIO
from urllib.parse import urlparse

import boto3
import numpy as np
import pandas as pd

import sagemaker
from sagemaker import Session, get_execution_role, clarify
from sagemaker.deserializers import JSONDeserializer, StringDeserializer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_monitor import (
    CronExpressionGenerator,
    DataCaptureConfig,
    DefaultModelMonitor,
)
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.predictor import Predictor
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.serializers import CSVSerializer

from sklearn.metrics import (
    balanced_accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


### Setup

In [None]:
RANDOM_STATE = 42

RAW_CSV   = "star_classification.csv"
TRAIN_CSV = "data/splits/train.csv"
TEST_CSV  = "data/splits/test.csv"
PROD_CSV  = "data/splits/prod_monitor.csv"

# label mapping (used for eval + optional post-processing)
LABEL_TO_ID = {"GALAXY": 0, "STAR": 1, "QSO": 2}
ID_TO_LABEL = {v: k for k, v in LABEL_TO_ID.items()}

# SageMaker/AWS 
sess = sagemaker.Session()
region = sess.boto_region_name
boto_sess = boto3.Session(region_name=region)

s3 = boto_sess.client("s3")
sm = boto_sess.client("sagemaker")
cw = boto_sess.client("cloudwatch")

sts = boto_sess.client("sts")
account_id = sts.get_caller_identity()["Account"]

BUCKET = f"sagemaker-{region}-{account_id}"
PREFIX = "sagemaker-featurestore-demo"

S3_BASE = f"s3://{BUCKET}/{PREFIX}"
S3_RAW  = f"{S3_BASE}/raw/"
S3_SPLITS = f"{S3_BASE}/splits/"
S3_PREP = f"{S3_BASE}/prepared/"
S3_OUT  = f"{S3_BASE}/output/"
S3_MON  = f"{S3_BASE}/monitoring/"

def ensure_bucket(bucket: str):
    try:
        s3.head_bucket(Bucket=bucket)
    except Exception:
        if region == "us-east-1":
            s3.create_bucket(Bucket=bucket)
        else:
            s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={"LocationConstraint": region})

ensure_bucket(BUCKET)

def parse_s3_uri(uri: str):
    p = urlparse(uri)
    if p.scheme != "s3":
        raise ValueError(f"Not an s3 uri: {uri}")
    return p.netloc, p.path.lstrip("/")

def s3_upload(local_path: str, s3_uri: str):
    b, k = parse_s3_uri(s3_uri)
    s3.upload_file(local_path, b, k)
    return s3_uri

def s3_read_csv(s3_uri: str) -> pd.DataFrame:
    b, k = parse_s3_uri(s3_uri)
    obj = s3.get_object(Bucket=b, Key=k)
    return pd.read_csv(BytesIO(obj["Body"].read()))

def s3_put_bytes(data: bytes, s3_uri: str, content_type: str):
    b, k = parse_s3_uri(s3_uri)
    s3.put_object(Bucket=b, Key=k, Body=data, ContentType=content_type)
    return s3_uri

def list_s3_objects(s3_prefix_uri: str):
    b, pfx = parse_s3_uri(s3_prefix_uri)
    if pfx and not pfx.endswith("/"):
        pfx += "/"
    out = []
    token = None
    while True:
        kwargs = {"Bucket": b, "Prefix": pfx, "MaxKeys": 1000}
        if token:
            kwargs["ContinuationToken"] = token
        resp = s3.list_objects_v2(**kwargs)
        out.extend(resp.get("Contents", []))
        if not resp.get("IsTruncated"):
            break
        token = resp.get("NextContinuationToken")
    return b, pfx, out


### Data Sources

In [3]:
df_train_raw = pd.read_csv(TRAIN_CSV)
df_test_raw  = pd.read_csv(TEST_CSV)
df_prod_raw  = pd.read_csv(PROD_CSV)

print("Local shapes:")
print("  train:", df_train_raw.shape)
print("  test: ", df_test_raw.shape)
print("  prod: ", df_prod_raw.shape)

Local shapes:
  train: (48000, 18)
  test:  (12000, 18)
  prod:  (40000, 18)


### Data Engineering

In [4]:
S3_RAW_CSV   = f"{S3_RAW}{os.path.basename(RAW_CSV)}"
S3_TRAIN_CSV = f"{S3_SPLITS}train.csv"
S3_TEST_CSV  = f"{S3_SPLITS}test.csv"
S3_PROD_CSV  = f"{S3_SPLITS}prod_monitor.csv"

s3_upload(RAW_CSV,   S3_RAW_CSV)
s3_upload(TRAIN_CSV, S3_TRAIN_CSV)
s3_upload(TEST_CSV,  S3_TEST_CSV)
s3_upload(PROD_CSV,  S3_PROD_CSV)

print("Uploaded to S3:")
print("  raw:  ", S3_RAW_CSV)
print("  train:", S3_TRAIN_CSV)
print("  test: ", S3_TEST_CSV)
print("  prod: ", S3_PROD_CSV)

Uploaded to S3:
  raw:   s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/raw/star_classification.csv
  train: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/splits/train.csv
  test:  s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/splits/test.csv
  prod:  s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/splits/prod_monitor.csv


### Feature Engineering

In [None]:
def fe_pipeline(df: pd.DataFrame):
    df2 = df.copy()

    # Normalize expected numeric columns
    bands = ["u", "g", "r", "i", "z"]
    for c in bands + ["alpha", "delta", "redshift"]:
        if c in df2.columns:
            df2[c] = pd.to_numeric(df2[c], errors="coerce")

    # Common cleaning for sentinel/invalid values (adjust if you want)
    for c in bands:
        if c in df2.columns:
            df2.loc[df2[c] <= -1000, c] = np.nan
    if "redshift" in df2.columns:
        df2.loc[df2["redshift"] < -0.1, "redshift"] = np.nan

    # Color indices (adjacent + remaining pairs)
    adj_pairs = [("u","g"), ("g","r"), ("r","i"), ("i","z")]
    all_pairs = []
    for i in range(len(bands)):
        for j in range(i+1, len(bands)):
            all_pairs.append((bands[i], bands[j]))

    for b1, b2 in all_pairs:
        if b1 in df2.columns and b2 in df2.columns:
            df2[f"{b1}_{b2}"] = df2[b1] - df2[b2]

    color_feats = [f"{b1}_{b2}" for (b1,b2) in all_pairs if f"{b1}_{b2}" in df2.columns]

    # Summary stats
    present_bands = [c for c in bands if c in df2.columns]
    if present_bands:
        df2["mean_mag"] = df2[present_bands].mean(axis=1)
        df2["mag_std"]  = df2[present_bands].std(axis=1)
        df2["mag_span"] = df2[present_bands].max(axis=1) - df2[present_bands].min(axis=1)

    # Minimal feature set
    base_feats = [c for c in ["alpha", "delta"] + bands + ["redshift"] if c in df2.columns]
    eng_feats  = color_feats + [c for c in ["mean_mag", "mag_std", "mag_span"] if c in df2.columns]
    feature_cols = base_feats + eng_feats

    # Require label + required numeric fields
    required = [c for c in (bands + ["redshift"]) if c in df2.columns]
    df2 = df2.dropna(subset=required + ["class"]).reset_index(drop=True)

    return df2, feature_cols

df_train_fe, feature_cols = fe_pipeline(df_train_raw)
df_test_fe,  _            = fe_pipeline(df_test_raw)
df_prod_fe,  _            = fe_pipeline(df_prod_raw)

print("Engineered shapes:")
print("  train:", df_train_fe.shape)
print("  test: ", df_test_fe.shape)
print("  prod: ", df_prod_fe.shape)

# Write engineered datasets to S3
def to_xgb_ready(df_fe: pd.DataFrame, feature_cols: list[str], label_to_id: dict):
    y = df_fe["class"].astype(str).str.strip().str.upper().map(label_to_id)
    if y.isna().any():
        bad = sorted(df_fe["class"][y.isna()].astype(str).unique().tolist())
        raise ValueError(f"Unmapped labels found in 'class': {bad}")

    X = df_fe[feature_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
    out = pd.concat([y.astype(int).rename("label"), X], axis=1)
    return out

train_ready = to_xgb_ready(df_train_fe, feature_cols, LABEL_TO_ID)
test_ready  = to_xgb_ready(df_test_fe,  feature_cols, LABEL_TO_ID)
prod_ready  = to_xgb_ready(df_prod_fe,  feature_cols, LABEL_TO_ID)

job_tag = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")

S3_FEATURE_COLS_JSON = f"{S3_PREP}{job_tag}/feature_cols.json"
S3_TRAIN_XGB = f"{S3_PREP}{job_tag}/train.csv"     
S3_VAL_XGB   = f"{S3_PREP}{job_tag}/validation.csv"  
S3_PROD_X    = f"{S3_PREP}{job_tag}/prod_features.csv" 

# Basic split for validation
val_frac = 0.2
val_n = int(len(train_ready) * val_frac)
train_part = train_ready.iloc[val_n:].reset_index(drop=True)
val_part   = train_ready.iloc[:val_n].reset_index(drop=True)

s3_put_bytes(json.dumps(feature_cols).encode("utf-8"), S3_FEATURE_COLS_JSON, "application/json")
s3_put_bytes(train_part.to_csv(index=False, header=False).encode("utf-8"), S3_TRAIN_XGB, "text/csv")
s3_put_bytes(val_part.to_csv(index=False, header=False).encode("utf-8"),   S3_VAL_XGB,   "text/csv")
s3_put_bytes(prod_ready.drop(columns=["label"]).to_csv(index=False, header=False).encode("utf-8"), S3_PROD_X, "text/csv")

print("Prepared training inputs:")
print("  feature_cols:", S3_FEATURE_COLS_JSON)
print("  train:", S3_TRAIN_XGB)
print("  val:  ", S3_VAL_XGB)


Engineered shapes:
  train: (47999, 31)
  test:  (12000, 31)
  prod:  (40000, 31)
Prepared training inputs:
  feature_cols: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/prepared/20260217-215414/feature_cols.json
  train: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/prepared/20260217-215414/train.csv
  val:   s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/prepared/20260217-215414/validation.csv


### Model Training & Evaluation (SageMaker built-in XGBoost) and Deployment

In [None]:
try:
    role = sagemaker.get_execution_role()
except Exception:
    role = boto3.client("iam").get_role(RoleName="LabRole")["Role"]["Arn"]

image = sagemaker.image_uris.retrieve("xgboost", region=region, version="1.7-1")

train_job_name = f"xgb-train-1-{job_tag}"
output_path = f"{S3_OUT}{train_job_name}/"

est = Estimator(
    image_uri=image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=50,
    input_mode="File",
    output_path=output_path,
    sagemaker_session=sess,
)

est.set_hyperparameters(
    objective="multi:softprob",
    num_class=3,
    max_depth=5,
    eta=0.2,
    subsample=0.8,
    num_round=200,
    verbosity=1,
)

est.fit(
    {
        "train": TrainingInput(S3_TRAIN_XGB, content_type="text/csv"),
        "validation": TrainingInput(S3_VAL_XGB, content_type="text/csv"),
    },
    job_name=train_job_name,
    logs=True,
)

print("Training job:", train_job_name)
print("Model artifacts:", est.model_data)


S3_TEST_X = f"{S3_PREP}{job_tag}/test_features.csv"
s3_put_bytes(test_ready.drop(columns=["label"]).to_csv(index=False, header=False).encode("utf-8"), S3_TEST_X, "text/csv")

bt_name = f"xgb-batch-{job_tag}"
bt_out  = f"{S3_OUT}{bt_name}/"

transformer = est.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=bt_out,
    assemble_with="Line",
    accept="text/csv",
)

transformer.transform(data=S3_TEST_X, content_type="text/csv", split_type="Line")
transformer.wait()
print("Batch output prefix:", bt_out)

def read_first_output_file_text(s3_prefix_uri: str) -> str:
    b, pfx, objs = list_s3_objects(s3_prefix_uri)
    keys = [o["Key"] for o in objs if not o["Key"].endswith("_SUCCESS")]
    out_keys = [k for k in keys if k.endswith(".out")] or keys
    if not out_keys:
        raise RuntimeError(f"No batch output files found under: {s3_prefix_uri}")
    out_key = sorted(out_keys)[0]
    body = s3.get_object(Bucket=b, Key=out_key)["Body"].read().decode("utf-8", errors="replace")
    return body

# parsing probabilities from batch output (one line per record)
out_text = read_first_output_file_text(bt_out)

pred_probs = []
for ln in out_text.splitlines():
    ln = ln.strip()
    if not ln:
        continue

    if ln.startswith("[") and ln.endswith("]"):
        probs = json.loads(ln)
        pred_probs.append([float(x) for x in probs])
        continue

    if re.fullmatch(r"[-+0-9.eE]+(,[-+0-9.eE]+)+", ln):
        pred_probs.append([float(x) for x in ln.split(",")])
        continue

    raise ValueError(f"Unrecognized prediction line format: {ln[:200]}")

pred_probs = np.asarray(pred_probs, dtype=float)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker:Creating training-job with name: xgb-train-1-20260217-215414


2026-02-17 22:13:46 Starting - Starting the training job...
2026-02-17 22:14:02 Starting - Preparing the instances for training...
2026-02-17 22:14:25 Downloading - Downloading input data...
2026-02-17 22:14:51 Downloading - Downloading the training image...
  import pkg_resources[0m
[34m[2026-02-17 22:15:49.680 ip-10-2-103-81.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2026-02-17 22:15:49.758 ip-10-2-103-81.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2026-02-17:22:15:50:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2026-02-17:22:15:50:INFO] Failed to parse hyperparameter objective value multi:softprob to Json.[0m
[34mReturning the value itself[0m
[34m[2026-02-17:22:15:50:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-17:22:15:50:INFO] Running XGBoost Sagemaker in algorithm mode[0m
[34m[2026-02-17:22:15:50:INFO] Determined 0 GPU(s) available on the i

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2026-02-17-22-16-59-894
INFO:sagemaker:Creating transform job with name: sagemaker-xgboost-2026-02-17-22-17-00-667


..............................
  import pkg_resources[0m
[34m[2026-02-17:22:21:56:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-17:22:21:56:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2026-02-17:22:21:56:INFO] nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_read_timeout 60s;
      proxy_pass http:/

In [11]:
data_capture_prefix = f"{S3_MON}datacapture/"

data_capture = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=data_capture_prefix,
    capture_options=["Input", "Output"],
)

endpoint_name = f"xgb-ep1-{job_tag}"

predictor = est.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture,
)

predictor.serializer = CSVSerializer()
predictor.deserializer = StringDeserializer()

print("Endpoint:", endpoint_name)
print("Data capture S3:", data_capture_prefix)

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2026-02-17-22-23-10-552
INFO:sagemaker:Creating endpoint-config with name xgb-ep1-20260217-215414
INFO:sagemaker:Creating endpoint with name xgb-ep1-20260217-215414


------!Endpoint: xgb-ep1-20260217-215414
Data capture S3: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/datacapture/


In [None]:
X_test = (
    df_test_fe[feature_cols]
    .apply(pd.to_numeric, errors="coerce")
    .fillna(0.0)
    .to_numpy(dtype=float)
)

y_true = (
    df_test_fe["class"]
    .astype(str).str.strip().str.upper()
    .map(LABEL_TO_ID)
    .to_numpy(dtype=int)
)

def parse_probs_from_response(resp_text: str) -> np.ndarray:
    """
    Returns array shape (n_rows, n_classes).
    Supports:
      1) {"predictions":[{"score":[...]}, ...]}
      2) {"predictions":[[...],[...],...]}
      3) line-based outputs: "p0,p1,p2" or "[p0,p1,p2]"
    """
    s = str(resp_text).strip()
    if s.startswith("{") or s.startswith("["):
        obj = json.loads(s)
        if isinstance(obj, dict) and "predictions" in obj:
            preds = obj["predictions"]
            if len(preds) == 0:
                return np.zeros((0, 0), dtype=float)
            if isinstance(preds[0], dict) and "score" in preds[0]:
                return np.asarray([p["score"] for p in preds], dtype=float)
            if isinstance(preds[0], list):
                return np.asarray(preds, dtype=float)

            raise ValueError(f"Unknown predictions element type: {type(preds[0])}")
        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], list):
            return np.asarray(obj, dtype=float)

    probs = []
    for ln in s.splitlines():
        ln = ln.strip()
        if not ln:
            continue
        if ln.startswith("[") and ln.endswith("]"):
            probs.append(json.loads(ln))
        else:
            probs.append([float(x) for x in ln.split(",")])
    return np.asarray(probs, dtype=float)

#limiting batch size to keep payload redable and avoid timeouts
batch_size = 500 
all_probs = []

for i in range(0, len(X_test), batch_size):
    chunk = X_test[i:i + batch_size]
    payload = "\n".join(",".join(map(str, row)) for row in chunk)

    resp = predictor.predict(payload)
    probs = parse_probs_from_response(resp)

    if probs.shape[0] != len(chunk):
        raise ValueError(f"Row count mismatch: sent {len(chunk)} rows, got {probs.shape[0]} predictions")

    all_probs.append(probs)

pred_probs = np.vstack(all_probs)
y_pred = np.argmax(pred_probs, axis=1)

macro = f1_score(y_true, y_pred, average="macro")
bal = balanced_accuracy_score(y_true, y_pred)

print("Endpoint TEST metrics:")
print("macro_f1:", round(float(macro), 6))
print("balanced_acc:", round(float(bal), 6))


Endpoint TEST metrics:
macro_f1: 0.975357
balanced_acc: 0.971641


### Model Monitoring

In [None]:
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=50,
    max_runtime_in_seconds=3600,
    sagemaker_session=sess,
)

baseline_prefix = f"{S3_MON}baselines/data-quality/"
report_prefix   = f"{S3_MON}reports/data-quality/"

statistics_path  = f"{baseline_prefix}statistics.json"
constraints_path = f"{baseline_prefix}constraints.json"

S3_TRAIN_FE_HEADER = f"{S3_PREP}{job_tag}/train_fe_header.csv"
s3_put_bytes(df_train_fe[["class"] + feature_cols].to_csv(index=False, header=True).encode("utf-8"),
             S3_TRAIN_FE_HEADER, "text/csv")

monitor.suggest_baseline(
    baseline_dataset=S3_TRAIN_FE_HEADER,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_prefix,
    wait=True,
)

dq_schedule_name = f"dq-{job_tag}"[:63]

monitor.create_monitoring_schedule(
    monitor_schedule_name=dq_schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri=report_prefix,
    statistics=statistics_path,
    constraints=constraints_path,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
)

print("Baseline:")
print("  statistics:", statistics_path)
print("  constraints:", constraints_path)
print("Monitoring:")
print("  schedule:", dq_schedule_name)
print("  reports: ", report_prefix)

def put_alarm(metric_name: str, threshold: float, period: int, eval_periods: int,
              alarm_name: str, stat: str = None, ext_stat: str = None,
              comparison: str = "GreaterThanThreshold"):
    if (stat is None) == (ext_stat is None):
        raise ValueError("Provide exactly one of stat or ext_stat")

    kwargs = dict(
        AlarmName=alarm_name,
        Namespace="AWS/SageMaker",
        MetricName=metric_name,
        Dimensions=[
            {"Name": "EndpointName", "Value": endpoint_name},
            {"Name": "VariantName", "Value": "AllTraffic"},
        ],
        Period=period,
        EvaluationPeriods=eval_periods,
        Threshold=float(threshold),
        ComparisonOperator=comparison,
        TreatMissingData="notBreaching",
    )
    if stat:
        kwargs["Statistic"] = stat
    else:
        kwargs["ExtendedStatistic"] = ext_stat

    cw.put_metric_alarm(**kwargs)

put_alarm(
    metric_name="ModelLatency",
    ext_stat="p95.0",
    threshold=500,          
    period=60,
    eval_periods=5,
    alarm_name=f"{endpoint_name}-latency-p95",
)

put_alarm(
    metric_name="Invocation5XXErrors",
    stat="Sum",
    threshold=1,
    period=60,
    eval_periods=1,
    alarm_name=f"{endpoint_name}-5xx",
)

print("CloudWatch alarms created (latency p95, 5XX errors).")

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2026-02-17-22-28-09-287


.................[34m2026-02-17 22:30:55.207886: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2026-02-17 22:30:55.207925: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2026-02-17 22:30:56.728351: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2026-02-17 22:30:56.728389: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2026-02-17 22:30:56.728412: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-2-249-196.ec2.internal): /proc/driver/nvidia/version does 

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: dq-20260217-215414


Baseline:
  statistics: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/baselines/data-quality/statistics.json
  constraints: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/baselines/data-quality/constraints.json
Monitoring:
  schedule: dq-20260217-215414
  reports:  s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/reports/data-quality/
CloudWatch alarms created (latency p95, 5XX errors).


In [None]:
endpoint_name = "xgb-ep1-20260217-215414"

sess = sagemaker.Session()
region = sess.boto_region_name
role = sagemaker.get_execution_role()

bucket = "sagemaker-us-east-1-579568333234"
prefix = "sagemaker-featurestore-demo"
S3_MON = f"s3://{bucket}/{prefix}/monitoring/"

job_tag = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")

baseline_prefix = f"{S3_MON}baselines/data-quality/{job_tag}/"
report_prefix   = f"{S3_MON}reports/data-quality/{job_tag}/"

baseline_local = "train_fe_header.csv"
baseline_df = df_train_fe[["class"] + feature_cols].copy()

baseline_df = baseline_df.loc[:, ~baseline_df.columns.duplicated()]
baseline_df.to_csv(baseline_local, index=False, header=True)

baseline_s3 = S3Uploader.upload(baseline_local, f"s3://{bucket}/{prefix}/prepared/{job_tag}/")
print("Baseline CSV S3:", baseline_s3)
print("Baseline prefix:", baseline_prefix)
print("Report prefix:", report_prefix)


Baseline CSV S3: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/prepared/20260217-235416//train_fe_header.csv
Baseline prefix: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/baselines/data-quality/20260217-235416/
Report prefix: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/monitoring/reports/data-quality/20260217-235416/


In [51]:
endpoint_name = "xgb-ep1-20260217-215414"
bucket = "sagemaker-us-east-1-579568333234"
prefix = "sagemaker-featurestore-demo"

label_col = "class"
facet_col = "redshift"
positive_label = "STAR"

session = Session()
role = get_execution_role()

run_id = f"{datetime.utcnow():%Y%m%d-%H%M%S}"
baseline_s3_key = f"s3://{bucket}/{prefix}/clarify/{run_id}/baseline"
reports_s3_key  = f"s3://{bucket}/{prefix}/clarify/{run_id}/reports"

def make_unique_columns(cols):
    seen = {}
    out = []
    for c in map(str, cols):
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}__dup{seen[c]}")
    return out


baseline_df = df_train_fe[[label_col, facet_col] + feature_cols].copy()
baseline_df = baseline_df.loc[:, ~baseline_df.columns.duplicated()]

if baseline_df.columns.duplicated().any():
    baseline_df.columns = make_unique_columns(baseline_df.columns)

baseline_df = baseline_df.sample(
    n=min(2000, len(baseline_df)),
    random_state=42
).reset_index(drop=True)

baseline_df["redshift_bin"] = pd.qcut(
    baseline_df["redshift"],
    q=4,
    labels=["very_low", "low", "high", "very_high"]
)

facet_col_for_bias = "redshift_bin"

feature_cols_clean = [c for c in feature_cols if c in baseline_df.columns]

X = (
    baseline_df[feature_cols_clean]
    .apply(pd.to_numeric, errors="coerce")
    .fillna(0.0)
    .to_numpy(dtype=float)
)


baseline_local = "baseline.csv"
baseline_df.to_csv(baseline_local, index=False, header=True)

baseline_full_uri = S3Uploader.upload(baseline_local, baseline_s3_key)

headers = list(baseline_df.columns)
if len(headers) != len(set(headers)):
    raise RuntimeError("Duplicate headers detected.")

print("Baseline uploaded:", baseline_full_uri)


pred = Predictor(endpoint_name=endpoint_name, sagemaker_session=session)
pred.serializer = CSVSerializer()
pred.deserializer = JSONDeserializer()

batch_size = 200
pred_labels = []

for i in range(0, len(X), batch_size):
    chunk = X[i:i+batch_size]
    payload = "\n".join(",".join(map(str, row)) for row in chunk)

    resp = pred.predict(payload)

    preds = resp["predictions"]
    probs = np.asarray([p["score"] for p in preds], dtype=float)
    ids = np.argmax(probs, axis=1)

    pred_labels.extend([ID_TO_LABEL[int(j)] for j in ids])

predicted_df = pd.DataFrame({"predicted_label": pred_labels})

pred_local = "predicted_label.csv"
predicted_df.to_csv(pred_local, index=False, header=False)

predicted_full_uri = S3Uploader.upload(pred_local, baseline_s3_key)

print("Predicted-label uploaded:", predicted_full_uri)


clarify_processor = clarify.SageMakerClarifyProcessor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    sagemaker_session=session,
)

data_config = clarify.DataConfig(
    s3_data_input_path=baseline_full_uri,
    s3_output_path=reports_s3_key,
    label=label_col,
    headers=headers,
    dataset_type="text/csv",
    predicted_label_dataset_uri=predicted_full_uri,
    predicted_label_headers=["predicted_label"],
    predicted_label="predicted_label",
)

bias_config = clarify.BiasConfig(
    label_values_or_threshold=[positive_label],
    facet_name=facet_col_for_bias,   # <- categorical bin
)

clarify_processor.run_bias(
    data_config=data_config,
    bias_config=bias_config,
    pre_training_methods="all",
    post_training_methods=None,
    wait=True,
    logs=True,
)


S3Downloader.download(f"{reports_s3_key}/analysis.json", ".")
S3Downloader.download(f"{reports_s3_key}/report.pdf", ".")

print("Downloaded: analysis.json, report.pdf")
print("Reports S3:", reports_s3_key)


  run_id = f"{datetime.utcnow():%Y%m%d-%H%M%S}"


Baseline uploaded: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/clarify/20260218-000315/baseline/baseline.csv


INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.clarify:Analysis Config: {'dataset_type': 'text/csv', 'headers': ['class', 'redshift', 'alpha', 'delta', 'u', 'g', 'r', 'i', 'z', 'u_g', 'u_r', 'u_i', 'u_z', 'g_r', 'g_i', 'g_z', 'r_i', 'r_z', 'i_z', 'mean_mag', 'mag_std', 'mag_span', 'redshift_bin'], 'label': 'class', 'predicted_label_dataset_uri': 's3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/clarify/20260218-000315/baseline/predicted_label.csv', 'predicted_label_headers': ['predicted_label'], 'predicted_label': 'predicted_label', 'label_values_or_threshold': ['STAR'], 'facet': [{'name_or_index': 'redshift_bin'}], 'methods': {'report': {'name': 'report', 'title': 'Analysis Report'}, 'pre_training_bias': {'methods': 'all'}}}
INFO:sagemaker:Creating processing-job with name Clarify-Bias-2026-02-18-00-03-15-829


Predicted-label uploaded: s3://sagemaker-us-east-1-579568333234/sagemaker-featurestore-demo/clarify/20260218-000315/baseline/predicted_label.csv
....................[34msagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml[0m
[34msagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml[0m
[34mWe are not in a supported iso region, /bin/sh exiting gracefully with no changes.[0m
[34mINFO:sagemaker-clarify-processing:Starting SageMaker Clarify Processing job[0m
[34mINFO:analyzer.data_loading.data_loader_util:Analysis config path: /opt/ml/processing/input/config/analysis_config.json[0m
[34mINFO:analyzer.data_loading.data_loader_util:Analysis result path: /opt/ml/processing/output[0m
[34mINFO:analyzer.data_loading.data_loader_util:This host is algo-1.[0m
[34mINFO:analyzer.data_loading.data_loader_util:This host is the leader.[0m
[34mINFO:analyzer.data_loading.data_loader_util:Number of host

In [52]:
from IPython.display import IFrame

IFrame("report.pdf", width=1000, height=700)
