In [3]:
# Matt Thompson
# AAI 540 
# Assignment 5.1

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('━' (code 9473 / 0x2501)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.5/1.5 MB 50.9 MB/s eta 0:00:00"; line: 1, column: 7]


In [None]:
import sys
import subprocess

# This attempts to install sagemaker into the current environment
print("Starting SageMaker SDK installation...")
# The following command is often robust to write restrictions
try:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "sagemaker", "--user"])
    print("Installation attempt complete. Please RESTART YOUR KERNEL NOW.")
except subprocess.CalledProcessError as e:
    print(f"Installation failed: {e}")

In [None]:
# %%time

!pip install sagemaker

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread
import pandas as pd
from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer
from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

session = Session()

In [None]:
# Get Execution role
role = get_execution_role()
print("RoleArn:", role)
region = session.boto_region_name
print("Region:", region)


In [None]:
# Setup S3 bucket
# This is the bucket into which the data is captured
bucket = session.default_bucket()
print("Demo Bucket:", bucket)

prefix = "sagemaker/Churn-ModelBiasMonitor-20201201" # Changed prefix name for clarity

## S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

# Ground truth path (reused for both Model Quality and Bias)
ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

# New prefixes for Model BIAS baseline and reports
bias_baseline_prefix = prefix + "/bias_baselining"
bias_baseline_data_prefix = bias_baseline_prefix + "/data"
bias_baseline_results_prefix = bias_baseline_prefix + "/results"
bias_baseline_data_uri = f"s3://{bucket}/{bias_baseline_data_prefix}"
bias_baseline_results_uri = f"s3://{bucket}/{bias_baseline_results_prefix}"


## Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)
print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Bias Report path: {bias_baseline_results_uri}")

In [None]:
# Upload some test files
S3Uploader.upload("test_data/upload-test-file.txt", f"s3://{bucket}/test_upload")
print("Success! You are all set to proceed.")

In [None]:
## Upload the pretrained model to S3
s3_key = f"s3://{bucket}/{prefix}"
model_url = S3Uploader.upload("model/xgb-churn-prediction-model.tar.gz", s3_key)
model_url

In [None]:
model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
image_uri = image_uris.retrieve(framework="xgboost", version="0.90-1", region=region)
model = Model(image_uri=image_uri, model_data=model_url, role=role, sagemaker_session=session)

In [None]:
endpoint_name = f"DEMO-xgb-churn-model-bias-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

# Set sampling_percentage to 100 to capture all data for monitoring
data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

In [None]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)

In [None]:
# Section 3 mark down goes here

In [None]:
from sagemaker.model_monitor import ModelBiasMonitor, BiasAnalysisConfig
from sagemaker.model_monitor.dataset_format import DatasetFormat

# --- 3.1 & 3.2 Execute predictions and create Bias Baseline Dataset ---

# Assume the facet/sensitive attribute is 'SeniorCitizen' which is the 19th column (index 19)
# in the input features of the original validation.csv.
churn_cutoff = 0.8
validate_dataset_bias = "validation_with_predictions_and_bias_feature.csv"
sensitive_feature_index = 19  # Example: Index of the sensitive feature in the full input data
sensitive_feature_name = "SeniorCitizen"
limit = 200 # Limit to a small set for quick baselining

i = 0
with open(f"test_data/{validate_dataset_bias}", "w") as baseline_file:
    # Header: probability, prediction, label, sensitive_feature
    baseline_file.write(f"probability,prediction,label,{sensitive_feature_name}\n")
    with open("test_data/validation.csv", "r") as f:
        # Skip header if it exists, otherwise assume no header as in the original template
        
        for row in f:
            (label, input_cols) = row.split(",", 1)
            
            # The input_cols are the features (19 of them).
            # The sensitive feature is the last one (index 18 in input_cols split)
            input_features = input_cols.strip().split(",")
            if len(input_features) < sensitive_feature_index:
                 # Skip if the row doesn't have enough features (e.g., if there's a header)
                 continue
            
            sensitive_feature_value = input_features[sensitive_feature_index - 1] 
            
            try:
                probability = float(predictor.predict(input_cols))
            except Exception as e:
                # Handle potential prediction error or empty line
                print(f"Error predicting: {e}")
                continue
                
            prediction = "1" if probability > churn_cutoff else "0"
            
            # Write prediction, label, and sensitive feature to the baseline file
            baseline_file.write(f"{probability},{prediction},{label},{sensitive_feature_value}\n")
            i += 1
            if i > limit:
                break
            # print(".", end="", flush=True) # Commenting out to keep output clean
            sleep(0.01) # Reduced sleep time
print("Done creating Bias Baseline Dataset!")


# --- 3.3 Upload the predictions as a bias baseline dataset ---
baseline_dataset_bias_uri = S3Uploader.upload(
    f"test_data/{validate_dataset_bias}", bias_baseline_data_uri
)
print(f"Uploaded Bias Baseline Data: {baseline_dataset_bias_uri}")


# --- 3.4 Define Bias Analysis Configuration and Execute Baseling Job ---
# Define the configuration for the bias analysis
bias_analysis_config = BiasAnalysisConfig(
    model_predicted_label="prediction",
    model_scores="probability",
    problem_type="BinaryClassification",
    ground_truth_attribute="label",
    problem_type_config={
        "label_values_or_threshold": ["1"] # positive label value is '1'
    },
    facet_attribute=[sensitive_feature_name], # The sensitive feature column name
    facet_attribute_type="categorical",
    group_of_interest=["1"], # The value in the facet_attribute that is the group of interest (e.g., senior citizen is marked as '1')
    positive_label=["1"] # The value of the label that is the positive outcome (churn)
)

# Create the Model Bias Monitoring Object
churn_model_bias_monitor = ModelBiasMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

# Name and Execute the Baseline Suggestion Job for Bias
bias_baseline_job_name = f"DEMO-xgb-churn-bias-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

print(f"Starting Bias Baseline Job: {bias_baseline_job_name}")

bias_job = churn_model_bias_monitor.suggest_baseline(
    job_name=bias_baseline_job_name,
    baseline_dataset=baseline_dataset_bias_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=bias_baseline_results_uri,
    bias_analysis_config=bias_analysis_config,
)

bias_job.wait(logs=False)
print("Bias Baseline Job finished.")


# --- 3.5 Explore the results of the baselining job ---
bias_baseline_job = churn_model_bias_monitor.latest_baselining_job

print("--- Bias Baseline Statistics (Disparate Impact) ---")
# Example of retrieving a specific bias metric
di_post_training = bias_baseline_job.baseline_statistics().body_dict["post_training_bias_metrics"]["disparate_impact"]["value"]
print(f"Post-training Disparate Impact (DI): {di_post_training}")
# A DI value significantly less than 0.8 or greater than 1.25 may indicate bias.

print("\n--- Bias Constraints ---")
bias_constraints_df = pd.DataFrame(bias_baseline_job.suggested_constraints().body_dict["bias_constraints"])
print(bias_constraints_df.head())

In [None]:
# section 4 markdown goes here

In [None]:
# --- 4.1 Generate prediction data for Model Bias Monitoring ---
# This part starts the thread to send traffic to the endpoint.
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(0.5) # Reduced sleep time
            
def invoke_endpoint_forever():
    while True:
        try:
            # We use the original validation data input features to invoke the endpoint
            invoke_endpoint(endpoint_name, "test_data/test-dataset-input-cols.csv")
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass
        except Exception:
             pass

thread = Thread(target=invoke_endpoint_forever)
thread.start()
print("Inference traffic thread started.")


# --- 4.2 View captured data (optional but good practice) ---
print("Waiting for captures to show up", end="")
for _ in range(30): # Reduced wait time
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        print("\nFound Capture Files:")
        print("\n ".join(capture_files[-1:]))
        break
    print(".", end="", flush=True)
    sleep(1)
print()


# --- 4.3 Generate synthetic ground truth ---
# This part starts the thread to upload ground truth data.
import random

def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.7 else "0",  # randomly generate positive labels 70% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }

def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

NUM_GROUND_TRUTH_RECORDS = 334 

def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour

gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()
print("Ground truth generation thread started.")


# --- 4.4 Create a bias monitoring schedule ---
from sagemaker.model_monitor import CronExpressionGenerator
from sagemaker.model_monitor import EndpointInput

## Monitoring schedule name
churn_bias_monitor_schedule_name = (
    f"DEMO-xgb-churn-bias-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}")

# Create an enpointInput for Bias Monitor
# The bias monitor needs to inspect the captured data for features and predictions.
# We don't need probability_attribute/threshold here, as bias analysis is done on the full captured data and ground truth.
endpointInput_bias = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    destination="/opt/ml/processing/input_data",
)

# The data_input parameter is the location of the raw inference request data, which is where the facet data (SeniorCitizen) is.
data_input = f"s3://{bucket}/{data_capture_prefix}/{endpoint_name}"


print(f"Creating Bias Monitoring Schedule: {churn_bias_monitor_schedule_name}")

response_bias = churn_model_bias_monitor.create_monitoring_schedule(
    monitor_schedule_name=churn_bias_monitor_schedule_name,
    endpoint_input=endpointInput_bias,
    data_input=data_input,
    output_s3_uri=bias_baseline_results_uri,
    ground_truth_input=ground_truth_upload_path,
    constraints=bias_baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
    bias_analysis_config=bias_analysis_config,
)


# --- 4.5 Examine monitoring schedule executions (Wait for first execution) ---
print("Waiting for first execution of Bias Monitor", end="")
bias_execution = None
while True:
    bias_execution = churn_model_bias_monitor.describe_schedule().get(
        "LastMonitoringExecutionSummary"
    )
    if bias_execution:
        break
    print(".", end="", flush=True)
    sleep(10)
print("\nBias Execution found! Status:", bias_execution['MonitoringExecutionStatus'])

# Get all executions
bias_executions = churn_model_bias_monitor.list_executions()

while not bias_executions:
    bias_executions = churn_model_bias_monitor.list_executions()
    print(".", end="", flush=True)
    sleep(10)

latest_bias_execution = bias_executions[-1]

status = bias_execution["MonitoringExecutionStatus"]
while status in ["Pending", "InProgress"]:
    print("Waiting for execution to finish", end="")
    latest_bias_execution.wait(logs=False)
    latest_job = latest_bias_execution.describe()
    print()
    print(f"{latest_job['ProcessingJobName']} job status:", latest_job["ProcessingJobStatus"])
    # Wait for the next stage if it's a multi-stage job
    sleep(30)
    latest_bias_execution = churn_model_bias_monitor.list_executions()[-1]
    bias_execution = churn_model_bias_monitor.describe_schedule()["LastMonitoringExecutionSummary"]
    status = bias_execution["MonitoringExecutionStatus"]

print("Bias Execution status is:", status)

In [None]:
# section 5 markdown goes here

In [None]:
# --- 5.1 View violations generated by monitoring schedule ---
latest_bias_execution = churn_model_bias_monitor.list_executions()[-1]
report_uri = latest_bias_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
print("Bias Report Uri:", report_uri)

pd.options.display.max_colwidth = None
try:
    bias_violations = latest_bias_execution.constraint_violations().body_dict["violations"]
    bias_violations_df = pd.json_normalize(bias_violations)
    print("\n--- Model Bias Constraint Violations ---")
    print(bias_violations_df.head(10))
except Exception as e:
    print(f"No bias violations found yet or error accessing violations: {e}")

In [None]:
# Cleanup markdown goes here

In [None]:
# --- Clean up resources (optional) ---
# Stop the threads (note: this may not always work perfectly in a notebook environment, manual kernel restart may be needed)
thread.join(timeout=1)
gt_thread.join(timeout=1)

# Delete the endpoint and model
predictor.delete_endpoint()
model.delete_model()

# Delete the monitoring schedules
churn_model_bias_monitor.delete_schedule()

# Delete the CloudWatch Alarm (if one were created specifically for bias)
# For simplicity, no specific CW alarm cell was added for bias, but it would go here.

print("Cleanup complete.")