In [17]:
import sagemaker
import boto3
import pandas as pd
import numpy as np
import os
import json

from sagemaker.model_monitor import DataCaptureConfig, DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
# Setup SageMaker session
sagemaker_session = sagemaker.Session()
sagemaker_client = boto3.client("sagemaker")

role = sagemaker.get_execution_role()
print(role)

arn:aws:iam::837028399719:role/iti113-team2-sagemaker-iti113-team2-domain-iti113-team2-Role


----
#### Generate the Baseline

In [24]:
bucket_name = 'iti113-team2-bucket'
base_folder = 'Team2'

monitor_output_path = f"s3://{bucket_name}/{base_folder}/monitoring"

# Create a DefaultModelMonitor instance
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.t3.large',
    sagemaker_session=sagemaker_session
)

# Run the baseline job
baseline_data = f's3://{bucket_name}/{base_folder}/processing/train/v1/train.csv'
baseline_job = monitor.suggest_baseline(
    baseline_dataset=baseline_data,
    dataset_format={'csv': {'header': True}},
    output_s3_uri=f"{monitor_output_path}/baseline",
    wait=True,
    logs=True
)

INFO:sagemaker:Creating processing-job with name baseline-suggestion-job-2025-08-25-14-21-55-214


.................[34m2025-08-25 14:24:41.081820: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2025-08-25 14:24:41.081871: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.[0m
[34m2025-08-25 14:24:43.044505: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory[0m
[34m2025-08-25 14:24:43.044545: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)[0m
[34m2025-08-25 14:24:43.044574: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-10-0-184-154.ap-southeast-1.compute.internal): /proc/driver/n

------
#### Re-attach to the job as a ProcessingJob

In [19]:
import sagemaker
from sagemaker.processing import ProcessingJob
from sagemaker.model_monitor import Constraints

sagemaker_session = sagemaker.Session()

baseline_job = None
# !!!! The job name from your successful run
baseline_job_name = "baseline-suggestion-job-2025-08-25-14-21-55-214"

try:
    # Re-attach as a ProcessingJob
    baseline_job = ProcessingJob.from_processing_name(
        processing_job_name=baseline_job_name,
        sagemaker_session=sagemaker_session
    )
    print(f"Successfully re-attached to job: {baseline_job_name}")

except Exception as e:
    print(f"Failed to re-attach to the job. Error: {e}")


# Now, load and inspect the constraints from S3
if baseline_job:
    # Find the output configuration in the job description
    output_config = baseline_job.describe()['ProcessingOutputConfig']['Outputs']

    # Find the S3 URI for the baseline output
    baseline_output_uri = None
    for output in output_config:
        if output['OutputName'] == 'monitoring_output':
            baseline_output_uri = output['S3Output']['S3Uri']
            break

    if baseline_output_uri:
        # Construct the full path to the constraints file
        constraints_s3_uri = f"{baseline_output_uri}/constraints.json"
        print(f"\nLoading constraints from: {constraints_s3_uri}")

        # Load the constraints file from S3
        suggested_constraints = Constraints.from_s3_uri(constraints_s3_uri)

        # Now print the dictionary, just like in the other example
        print("\n--- Sample of Generated Constraints ---")
        from pprint import pprint
        pprint(suggested_constraints.body_dict)
        # ===================================================================
    else:
        print("Could not find the baseline output path in the job description.")

Successfully re-attached to job: baseline-suggestion-job-2025-08-25-14-21-55-214

Loading constraints from: s3://iti113-team2-bucket/Team2/monitoring/baseline/constraints.json

--- Sample of Generated Constraints ---
{'features': [{'completeness': 1.0,
               'inferred_type': 'Fractional',
               'name': 'restingBP',
               'num_constraints': {'is_non_negative': False}},
              {'completeness': 1.0,
               'inferred_type': 'Fractional',
               'name': 'serumcholestrol',
               'num_constraints': {'is_non_negative': False}},
              {'completeness': 1.0,
               'inferred_type': 'Fractional',
               'name': 'maxheartrate',
               'num_constraints': {'is_non_negative': False}},
              {'completeness': 1.0,
               'inferred_type': 'Fractional',
               'name': 'oldpeak',
               'num_constraints': {'is_non_negative': False}},
              {'completeness': 1.0,
               '

----
#### Schedule the Monitoring Job

In [29]:
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor
from botocore.exceptions import ClientError
import pandas as pd
import boto3
import sagemaker
from sagemaker.model_monitor import EndpointInput

sagemaker_client = boto3.client("sagemaker", region_name="ap-southeast-1")

# Confirm endpoint is InService
response = sagemaker_client.describe_endpoint(EndpointName="Team2-predictor-endpoint")
print("Endpoint status:", response['EndpointStatus'])  # should be "InService"

sagemaker_session = sagemaker.Session()
print("SageMaker default region:", sagemaker_session.boto_region_name)
role = sagemaker.get_execution_role()

bucket_name = 'iti113-team2-bucket'
base_folder = 'Team2'

monitor_output_path = f"s3://{bucket_name}/{base_folder}/monitoring"
baseline_output_uri = f"{monitor_output_path}/baseline"

#schedule_name
schedule_name = "Team2-drift-schedule-main"

# Initialize model monitor
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.t3.large',
    sagemaker_session=sagemaker_session
)

try:
    # Check if schedule exists
    sagemaker_client.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
    print(f"Found existing monitoring schedule: '{schedule_name}'")

    # Attach monitor to the schedule
    monitor.attach(schedule_name)
    monitor.monitoring_schedule_name = schedule_name
    print(f"Successfully attached monitor object to the schedule.")

except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceNotFound':
        print(f"No schedule named '{schedule_name}' found. Creating a new one.")

        endpoint_input = EndpointInput(
            endpoint_name="Team2-predictor-endpoint",
            destination="/opt/ml/processing/input"
        )
        
        # Create monitoring schedule
        monitor.create_monitoring_schedule(
            monitor_schedule_name=schedule_name,
            endpoint_input=endpoint_input,  # just pass the name directly
            output_s3_uri=f"{monitor_output_path}/reports",
            statistics=f"{baseline_output_uri}/statistics.json",
            constraints=f"{baseline_output_uri}/constraints.json",
            schedule_cron_expression=CronExpressionGenerator.hourly(),
            enable_cloudwatch_metrics=True,
        )
        print(f"Monitoring schedule '{schedule_name}' created successfully.")
        
        # Attach after creation
        # monitor.attach(monitoring_schedule_name=schedule_name)
        
    else:
        print("An unexpected error occurred while checking for the schedule.")
        raise e

Endpoint status: InService
SageMaker default region: ap-southeast-1
No schedule named 'Team2-drift-schedule-main' found. Creating a new one.
Monitoring schedule 'Team2-drift-schedule-main' created successfully.


In [30]:
# Now describe schedule details
try:
    schedule_details = monitor.describe_schedule()
    print(f"Schedule status: {schedule_details['MonitoringScheduleStatus']}")
except Exception as e:
    print(f"Could not retrieve schedule details: {e}")

Schedule status: Scheduled


-----

#### Simulate, Detect, and Analyze Drift

In [31]:
import sagemaker
import boto3
import pandas as pd
import numpy as np
import time
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
import json

# === CONFIGURATION ===
bucket_name = 'iti113-team2-bucket'
base_folder = 'Team2'
endpoint_name = "Team2-predictor-endpoint"
aws_region = "ap-southeast-1"

# === SETUP CLIENTS ===
boto_session = boto3.Session(region_name=aws_region)
sagemaker_session = sagemaker.Session(boto_session=boto_session)
sagemaker_client = boto3.client("sagemaker", region_name=aws_region)
s3_client = boto3.client('s3', region_name=aws_region)

s3_process_test_path = f"s3://{bucket_name}/{base_folder}/processing/test/v1/test.csv"
df = pd.read_csv(s3_process_test_path)
df = df.drop("target", axis=1)

# Select a few rows to manipulate (simulate drift)
# drifted_data = df.head(5).copy()
drifted_data = df.head(100).copy()

# Manually introduce DRIFT
print("Original average restingBP:", int(drifted_data['restingBP'].mean()))
drifted_data['restingBP'] = 1
print("New average age:", drifted_data['restingBP'].mean())

# 2. Categorical Drift: 
# if 'restingelectro' in drifted_data.columns:
#     drifted_data['restingelectro'] = 1  # Assuming 1 maps to 'ST-abnormality'
#     print("\n'restingelectro' distribution changed to single category 'ST-abnormality'.")
# else:
#     print("\nWarning: 'restingelectro' column not found.")

# Print sample payloads
# print("\nSample drifted payloads:")
# print(drifted_data.head())

print("\nNumber of features:", drifted_data.shape[1])

# Create SageMaker predictor
predictor = sagemaker.predictor.Predictor(
    endpoint_name=endpoint_name,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
    sagemaker_session=sagemaker_session
)

# === SEND PREDICTION PAYLOADS ===
print(f"\nSending {len(drifted_data)} drifted requests to endpoint: {endpoint_name}")
for i, row in drifted_data.iterrows():
    payload = {"data": [row.to_dict()]}
    try:
        response = predictor.predict(payload)
        print(f"[{i}] ✅ Response: {response}")
        time.sleep(0.1)  # Optional delay to avoid throttling
    except Exception as e:
        print(f"[{i}] ❌ Error sending request: {e}")

print("\n✅ All drifted requests sent.")

Original average restingBP: 0
New average age: 1.0

Number of features: 25

Sending 100 drifted requests to endpoint: Team2-predictor-endpoint
[0] ✅ Response: {'predictions': [1]}
[1] ✅ Response: {'predictions': [0]}
[2] ✅ Response: {'predictions': [1]}
[3] ✅ Response: {'predictions': [0]}
[4] ✅ Response: {'predictions': [1]}
[5] ✅ Response: {'predictions': [0]}
[6] ✅ Response: {'predictions': [0]}
[7] ✅ Response: {'predictions': [1]}
[8] ✅ Response: {'predictions': [1]}
[9] ✅ Response: {'predictions': [1]}
[10] ✅ Response: {'predictions': [0]}
[11] ✅ Response: {'predictions': [1]}
[12] ✅ Response: {'predictions': [1]}
[13] ✅ Response: {'predictions': [1]}
[14] ✅ Response: {'predictions': [1]}
[15] ✅ Response: {'predictions': [1]}
[16] ✅ Response: {'predictions': [1]}
[17] ✅ Response: {'predictions': [1]}
[18] ✅ Response: {'predictions': [0]}
[19] ✅ Response: {'predictions': [1]}
[20] ✅ Response: {'predictions': [1]}
[21] ✅ Response: {'predictions': [1]}
[22] ✅ Response: {'predictions'

In [6]:
# from datetime import datetime
# import boto3

# client = boto3.client("sagemaker", region_name="ap-southeast-1")

# # Optional: give the job a timestamped name
# current_time = datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S')

# # Trigger a monitoring job manually
# response = client.start_monitoring_schedule(
#     MonitoringScheduleName=schedule_name
# )

# print(f"✅ Manual monitoring job started for schedule: '{schedule_name}' at {current_time}")


✅ Manual monitoring job started for schedule: 'Team2-drift-schedule-main' at 2025-08-25-13-14-53


  current_time = datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S')


In [38]:
#test invoke endpoint
sagemaker_runtime_client = boto3.client("sagemaker-runtime", region_name=aws_region)

response = sagemaker_runtime_client.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="application/json",
    Body=json.dumps({"data": drifted_data.head(2).to_dict(orient="records")})
)
print(f"\nContentType: {response["ContentType"]}")
print(response["Body"].read().decode("utf-8"))


ContentType: application/json
{"predictions": [1, 0]}


----
#### Manual start monitoring

In [49]:
sagemaker_client.start_monitoring_schedule(MonitoringScheduleName="Team2-drift-schedule-main")

{'ResponseMetadata': {'RequestId': 'fcf93ee5-3104-49a2-a816-4803f425399a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fcf93ee5-3104-49a2-a816-4803f425399a',
   'content-type': 'application/x-amz-json-1.1',
   'date': 'Mon, 25 Aug 2025 15:44:35 GMT',
   'content-length': '0'},
  'RetryAttempts': 0}}

In [57]:
response = sagemaker_client.describe_monitoring_schedule(
    MonitoringScheduleName="Team2-drift-schedule-main"
)

summary = response.get("LastMonitoringExecutionSummary", {})
print("Schedule Status:", response.get("MonitoringScheduleStatus"))
print("Latest Execution Status:", summary.get("MonitoringExecutionStatus"))
if summary.get("FailureReason"):
    print("Failure Reason:", summary["FailureReason"])


Schedule Status: Scheduled
Latest Execution Status: Failed
Failure Reason: AlgorithmError: Error: Encoding mismatch: Encoding is JSON for endpointInput, but Encoding is BASE64 for endpointOutput. We currently only support the same type of input and output encoding at the moment., exit code: 255


In [58]:
# monitoring_jobs = sagemaker_session.list_monitoring_executions(monitoring_schedule_name="Team2-drift-schedule-main")
# print(monitoring_jobs)

----
#### Check scheduled monitor

In [59]:
#Alternative: read from schedule monitoring

import json
import boto3
from sagemaker.model_monitor import MonitoringExecution

# Check Schedule and Execution Status
desc = monitor.describe_schedule()
print(f"Schedule Name: {desc['MonitoringScheduleName']}")
print(f"Schedule Status: {desc['MonitoringScheduleStatus']}")
print(f"Endpoint Name: {desc['EndpointName']}")

executions = monitor.list_executions()
if executions:
    latest_execution: MonitoringExecution = executions[0]
    status = latest_execution.describe()['ProcessingJobStatus']
    print(f"Latest Execution Status: {status}")

    FailureReason = latest_execution.describe()['FailureReason']
    print(f"FailureReasons: {FailureReason}")

    # Check for Completion and Get Report
    if status in ['CompletedWithViolations', 'Completed']:

        try:
            # Get the report object from the SDK
            violations_report = latest_execution.constraint_violations()

            if violations_report:
                violations_list = violations_report.body_dict.get("violations", [])

                if violations_list:
                    print("\nDRIFT DETECTED! Details:")
                    for v in violations_list:
                        print(f" - {v['feature_name']}: {v['description']}")
                else:
                    # This handles the case where the report exists but is empty
                    print("\nNo violations found in the report body.")
            else:
                # This handles when the job completes but no violations file is found at all
                print("\nNo violations found. The job completed successfully without detecting drift.")

        except Exception as e:
            # This is a fallback for other potential errors
            print(f"\nAn error occurred while trying to retrieve the report: {e}")

    else:
        print(f"Monitoring job did not complete successfully. Current status: {status}")
else:
    print("No monitoring jobs have run yet. Please wait and re-run later.")

Schedule Name: Team2-drift-schedule-main
Schedule Status: Scheduled
Endpoint Name: Team2-predictor-endpoint
Latest Execution Status: Failed
FailureReasons: AlgorithmError: Error: Encoding mismatch: Encoding is JSON for endpointInput, but Encoding is BASE64 for endpointOutput. We currently only support the same type of input and output encoding at the moment., exit code: 255
Monitoring job did not complete successfully. Current status: Failed


-----
#### Check if DataCaptureConfig enabled

In [60]:
import boto3

sagemaker_client = boto3.client('sagemaker')

endpoint_name = "Team2-predictor-endpoint"

# Get endpoint config name
endpoint_response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_config_name = endpoint_response["EndpointConfigName"]

config_response = sagemaker_client.describe_endpoint_config(
    EndpointConfigName=endpoint_config_name
)

capture_config = config_response.get("DataCaptureConfig")

if capture_config and capture_config.get("EnableCapture"):
    print("✅ Data capture is enabled.")
else:
    print("❌ Data capture is NOT enabled. Monitoring cannot be scheduled.")

✅ Data capture is enabled.


#### Clean Up

In [61]:
# Clean up resources
try:
    # Delete the Monitoring Schedule
    monitor.delete_monitoring_schedule()
    print(f"Monitoring schedule '{schedule_name}' deleted.")
except Exception as e:
    print(f"Could not delete schedule: {e}")

Monitoring schedule 'Team2-drift-schedule-main' deleted.
