# Experimentation jupyter notebook for SageMaker AI Endpoint Inference Monitoring with SageMaker AI MLflow
This is a notebook to experiment with the concepts used in SageMaker AI Endpoint Inference Monitoring custom solution.

> IMPORTANT: This notebook is provided for education and experimentation. See the CDK code for production ready solution.
> This jupyter notebook for tested in SageMakerAI Studio.

In [None]:
# Install required dependencies. Ignore any warnings and residual dependency errors.
!pip install --force-reinstall -U -r requirements.txt --quiet  --no-warn-conflicts

# Import dependencies and set required variables

In [None]:
import boto3
import sagemaker
from sagemaker.jumpstart.model import JumpStartModel
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
import mlflow
from datetime import datetime

# Initialize SageMaker session
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_bucket = sagemaker_session.default_bucket()

print(f"SageMaker role: {role}")
print(f"Region: {region}")
print(f"Default bucket: {sagemaker_bucket}")


In [None]:
# S3 folder key to store data capture jsonl files
data_capture_s3_key = f"llama-3-1-8b-instruct-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"

# Deploy Llama 3.1 8B Instruct from JumpStart
model_id = "meta-textgeneration-llama-3-1-8b-instruct"
model_version = "*"  # Use latest version

# Deploy with data capture enabled
endpoint_name = f"llama-3-1-8b-instruct-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"

# Enter your desired S3 bucket
bucket = sagemaker_bucket # REPLACE if not default bucket

print(f"SageMaker data capture: {data_capture_s3_key}")
print(f"SageMaker endpoint name: {endpoint_name}")

## Data capture configuration
> Note: We will use the default bucket configured with sagemaker studio 

In [None]:
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,  # Capture 100% of requests
    destination_s3_uri=f's3://{bucket}/{data_capture_s3_key}',
    capture_options=["REQUEST", "RESPONSE"],
    json_content_types=["application/json"],
)

print("Data capture will be saved to:", data_capture_config.destination_s3_uri)

## Deploy LLM to SageMaker AI Endpoint
1. Load LLM from SageMaker Jumpstart
2. Deploy the Jumpstart LLM to SageMakerAI Endpoint

In [None]:
print(f"Deploying {model_id}...")

# Create JumpStart model
model = JumpStartModel(
    model_id=model_id,
    model_version=model_version,
    role=role,
    sagemaker_session=sagemaker_session
)

In [None]:
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.g5.2xlarge",  # GPU instance for Llama
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config, # Enable the data capture config
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
    accept_eula=True,
)

print(f"✅ Endpoint deployed successfully: {endpoint_name}")

## Configure SageMakerAI MLflow app connection

In [None]:

# Update this if you want to use a specific MLflow app
mlflow_app_name = 'DefaultMLFlowApp'

# Get MLflow app ARN
sm_client = boto3.client('sagemaker', region_name=region)
mlflow_list = sm_client.list_mlflow_apps()

print(f'Found {len(mlflow_list["Summaries"])} MLflow app(s) in your account:')
for app in mlflow_list['Summaries']:
    print(f'  - {app["Name"]}')

# Find the specified MLflow app
mlflow_app_arn = None
for mlflow_app in mlflow_list['Summaries']:
    if mlflow_app['Name'] == mlflow_app_name:
        mlflow_app_arn = mlflow_app['Arn']
        break

if mlflow_app_arn:
    print(f'\n Using MLflow app: {mlflow_app_name}')
    print(f'  ARN: {mlflow_app_arn}')
else:
    raise ValueError(f'MLflow app "{mlflow_app_name}" not found. Please check the name or create one in SageMaker Studio.')

In [None]:
# Set MLflow tracking URI and create/select experiment
mlflow.set_tracking_uri(mlflow_app_arn)
mlflow_experiment_name = endpoint_name # We will keep it same as the endpoint name. You can replace.

try:
    # Try to create a new experiment
    experiment_id = mlflow.create_experiment(mlflow_experiment_name)
    print(f' Created new MLflow experiment: {mlflow_experiment_name}')
    print(f'  Experiment ID: {experiment_id}')
except:
    # Experiment already exists, set it as active
    mlflow.set_experiment(mlflow_experiment_name)
    experiment = mlflow.get_experiment_by_name(mlflow_experiment_name)
    print(f' Using existing MLflow experiment: {mlflow_experiment_name}')
    print(f'  Experiment ID: {experiment.experiment_id}')

print('\n You can view your experiments in the SageMaker Studio MLflow App UI')

## Test your sagemaker endpoint

In [None]:
# Create a function to invoke your sagemaker endpoint
import json
import boto3
from datetime import datetime

sagemaker_runtime = boto3.client('sagemaker-runtime')

def invoke_sagemaker_endpoint(prompt, endpoint_name, max_tokens=512, temperature=0.7):
    """
    Invoke  endpoint
    """
    payload = {
        "inputs": prompt,
        "parameters": {
            "max_new_tokens": max_tokens,
            "temperature": temperature,
            "top_p": 0.9,
            "do_sample": True
        }
    }
    
    start_time = datetime.now()
    
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body=json.dumps(payload)
    )
    
    end_time = datetime.now()
    latency = (end_time - start_time).total_seconds() * 1000
    
    result = json.loads(response['Body'].read().decode())
    
    print(f"Latency: {latency:.2f}ms")
    print(f"Response: {result}")
    
    return result, latency


In [None]:
# Test with sample prompts
test_prompts = [
    "What is machine learning?",
    "Explain the difference between supervised and unsupervised learning.",
    "Write a Python function to calculate fibonacci numbers."
]

for prompt in test_prompts:
    print(f"\n{'='*60}")
    print(f"Prompt: {prompt}")
    print(f"{'='*60}")
    response, latency = invoke_sagemaker_endpoint(prompt, endpoint_name)
    print()


## Verify data capture jsonl file creation
> IMPORTANT: It will take a few seconds from the data capture jsonl file to land in the S3 bucket.
> 
> You will need to retry after few seconds if you don't see jsonl file

In [None]:
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(
    Bucket=bucket,
    Prefix=data_capture_s3_key,
    MaxKeys=50
)

if 'Contents' in response:
    print(f"Found {len(response['Contents'])} capture files")
    for obj in response['Contents']:
        print(f"   {obj['Key']}")
        s3_file_key = obj['Key']
else:
    print("No capture files yet. Retry after few seconds")

print("Final data capture json file: {s3_file_key}")

## Log MLflow traces from SageMaker DataCapture 

In [None]:
import base64
import awswrangler as wr
import pandas as pd

def log_single_inference_event(event_data, additional_trace_attr):
    """
    Log a single inference event with MLflow trace decorator
    """
    print(f'Recieved {event_data}')
    # Extract data
    event_id = event_data['eventMetadata']['eventId']
    inference_time = event_data['eventMetadata']['inferenceTime']
    
    # Parse input/output
    input_data = json.loads(event_data['captureData']['endpointInput']['data'])

    # Parse output
    endpoint_output = event_data['captureData']['endpointOutput']
    data = endpoint_output['data']
    output_encoding = endpoint_output.get('encoding', 'JSON')
    
    # Decode and parse
    output_data = json.loads(base64.b64decode(data).decode('utf-8')) if output_encoding == 'BASE64' else json.loads(data)
    output_type = event_data['captureData']['endpointOutput']['observedContentType']
    print(f"Trace input data: {input_data} /n")
    print(f"Trace output data: {output_data} /n")
    
    # Create trace data
    with mlflow.start_span(name=additional_trace_attr['s3_file_key']) as span:
        span.set_inputs(
            {
                "prompt": input_data.get('inputs', ''),
                "parameters": input_data.get('parameters', {})
            }
        )
        span.set_attributes(
            event_data['eventMetadata']
        )
        span.set_attributes(
            additional_trace_attr
        )
        if output_type == None:
            span.set_status("ERROR")
        # Update
        span.set_outputs(
            output_data
        )
    
    return

In [None]:
# Helper processing function
def process_capture_file_events(bucket, s3_file_key):
    path = f's3://{bucket}/{s3_file_key}'
    # define custom metadata as needed
    additional_trace_attr = {
        "s3_bucket.name":bucket,
        "s3_file_key":s3_file_key,
        "sagemaker.endpoint_name":endpoint_name,
    }
    mlflow.set_experiment(mlflow_experiment_name)
    with mlflow.start_run(run_name="sagemaker_inference_logging"):
        df = wr.s3.read_json(path=path, lines=True)
        
        for idx, row in df.iterrows():
            result = log_single_inference_event(row.to_dict(), additional_trace_attr)


In [None]:
# Run
process_capture_file_events(bucket, s3_file_key)

# MLflow LLM Evaluation

In [None]:
import os
# Set IAM Role for the Amazon Bedrock model to assume
os.environ["AWS_ROLE_ARN"] = sagemaker.get_execution_role()
print(os.environ["AWS_ROLE_ARN"])

In [None]:
# Set the Amazon Bedrock model ID to use as the LLM evaluator
MLFLOW_EVALUATION_MODEL_ID = "bedrock:/global.anthropic.claude-sonnet-4-20250514-v1:0"
MLFLOW_EVALUATION_MODEL_PARAM = {
    "temperature": 0, # 0 for deterministic
    "max_tokens": 512, # 256
    "anthropic_version": "bedrock-2023-05-31",
    "top_p": 0.9,  # Add top_p for more controlled generation
    "stop_sequences": ["}"]  # Stop after JSON closes
}

## Define MLflow score metrics

#### Create custom MLflow scorer function

In [None]:
from typing import Literal
from mlflow.genai import scorer
from mlflow.genai.judges import make_judge

@scorer
def tokens_words(outputs) -> int:
    """Approximate words in the response"""
    try:
        words = len(outputs['generated_text'])
    except:
        return 0
    return words

# Create a judge that evaluates coherence using MLflow template-based scorers
coherence_judge = make_judge(
    name="coherence",
    instructions=(
        "Evaluate if the response is coherent, maintaining a constant tone "
        "and following a clear flow of thoughts/concepts"
        "Question: {{ inputs }}\n"
        "Response: {{ outputs }}\n"
    ),
    feedback_value_type=Literal["coherent", "somewhat coherent", "incoherent"],
    model= MLFLOW_EVALUATION_MODEL_ID
)

#### Define all the MLflow scorer
We will include predefined MLflow scorer. And then Load all the scorers toghether.

In [None]:
from mlflow.genai.scorers import Guidelines, Safety, RelevanceToQuery, Fluency

scorers = [
    # MLflow built-in genai scorers
    Safety(
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    RelevanceToQuery(
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    Fluency(
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    # MLflow built-in guidelines-based LLM-as-a-judge
    Guidelines(
        name="follows_objective",
        guidelines="The generated response must follow the objective in the request.",
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    Guidelines(
        name="professional_tone",
        guidelines="The response must be in a professional tone.",
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    Guidelines(
        name="no_harmful_advice",
        guidelines="The response MUST NOT cause harm to any human.",
        model= MLFLOW_EVALUATION_MODEL_ID,
        parameters= MLFLOW_EVALUATION_MODEL_PARAM,
    ),
    # MLflow built-in template-based LLM-as-a-judge
    coherence_judge,
    # Custom defined scorers
    tokens_words,
]

### Perform MLflow GenAI Evaluation

#### Load MLflow traces for GenAI Evaluation

In [None]:
traces = mlflow.search_traces(
    filter_string=f"name = '{s3_file_key}'"
)

In [None]:
traces

#### Perform the evaluation job
> Note: This is the first pass at Traces evaluation, skip this if you see error to the second pass
>
> Ignore All WARNINGS
>
> During the first pass you might encounter MLflow OSS Error `RestException: BAD_REQUEST: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "entity_associations_pk"`

In [None]:
results = mlflow.genai.evaluate(
    data=traces,
    scorers=scorers,
)

### Second pass: run evaluation in batches
> To handle the MLflow issue https://github.com/mlflow/mlflow/issues/21002
>
> Ignore All WARNINGS.

In [None]:
results = mlflow.genai.evaluate(
    data=traces,
    scorers=scorers,
)
print(f"Evaluations completed successfully ✅")

### See results in SageMaker AI MLflow App
1. Go the the MLflow experiment and view the results

# Load testing CDK deployment(Optional)
We reuse samples from the `FreedomIntelligence/medical-o1-reasoning-SFT` dataset as a proxy for a load testing dataset.

Each sample contains:

- A medical **Question**.
- A detailed **Response** that we treat as the expected answer.
- An optional chain‑of‑thought field (`Complex_CoT`) that we ignore for scoring, but which could be used in more advanced evaluation setups.

You can replace this dataset with your own dataset (for example, human‑annotated medical Q&A pairs corresponding to your domain and compliance requirements).

> Note: Load testing with large sample size of dataset will take significant time. Start with a small dataset sample size. 

In [None]:
from datasets import load_dataset
import pandas as pd

# Set the desired sample size to test. 
num_samples = 100

full_dataset = load_dataset("FreedomIntelligence/medical-o1-reasoning-SFT", "en", split=f"train[:{num_samples}]")

full_dataset[0]

In [None]:
import json
import boto3
from datetime import datetime

sagemaker_runtime = boto3.client('sagemaker-runtime')

def invoke_smai_endpoint(prompt, endpoint_name, max_tokens=512, temperature=0.7):
    """
    Invoke sagemaker endpoint
    """
    payload = {
        "inputs": prompt,
        "parameters": {
            "max_new_tokens": max_tokens,
            "temperature": temperature,
            "top_p": 0.9,
            "do_sample": True
        }
    }
    
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body=json.dumps(payload)
    )
    
    result = json.loads(response['Body'].read().decode())
    
    # print(f"Response: {result}")
    
    return result


In [None]:
# Test with sample prompts
count = 0
for prompt in full_dataset:
    response = invoke_smai_endpoint(prompt["Question"], endpoint_name)
    count+=1
print(f"✨ Load testing complete: {count}")