# Financial Compliance Agent Evaluation Pipeline (SageMaker Pipelines + Managed MLflow)

This notebook productionalizes the financial compliance agent evaluation flow into a
three-step Amazon SageMaker Pipeline:

1. **Data Preparation**  
   - Load ground-truth dataset from S3  
   - Basic validation and dataset profiling  
   - Log dataset metadata to **SageMaker managed MLflow**

2. **Agent Inference**  
   - Build the RAG + Web Search agent (Qwen on Amazon Bedrock)  
   - Run inference over all ground-truth prompts  
   - Extract normalized outputs (clean answers, retrieved contexts, tool usage)  
   - Persist an evaluation-ready dataset to S3  
   - Log inference-level metrics to MLflow

3. **Evaluation & Metrics**  
   - Compute semantic similarity (SAS)  
   - Compute tool-selection confusion matrix + accuracy  
   - Compute nDCG for RAG retrieval quality  
   - (Optionally) run LLM-as-a-judge factuality checks  
   - Log all metrics and artifacts to **SageMaker managed MLflow**  
   - Enforce an `AccuracyThreshold` quality gate

In [None]:
#Cell 2 - Setup
import os
import boto3
import sagemaker

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterFloat,
    ParameterInteger,
)

from sagemaker.workflow.function_step import step  # @step decorator


# Basic AWS / SageMaker setup
session = boto3.Session()
region = session.region_name or os.getenv("AWS_REGION", "us-east-1")

# In SageMaker Studio this will resolve automatically
try:
    role = sagemaker.get_execution_role()
except Exception:
    # Fallback for local dev; replace with your role ARN if needed
    # role = os.getenv("SAGEMAKER_ROLE_ARN", "arn:aws:iam::874604298668:role/service-role/YOUR-ROLE-HERE")
    role = os.getenv("SAGEMAKER_ROLE_ARN", "arn:aws:iam::874604298668:role/service-role/AmazonSageMaker-ExecutionRole-20240122T092140e")

pipeline_session = PipelineSession()

default_bucket = pipeline_session.default_bucket()
base_job_prefix = "financial-compliance-agent-eval"

print("Region:", region)
print("Role:", role)
print("Default bucket:", default_bucket)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


Region: us-east-1
Role: arn:aws:iam::874604298668:role/service-role/AmazonSageMaker-ExecutionRole-20240122T092140e
Default bucket: sagemaker-us-east-1-874604298668


In [None]:
#Cell 3 - Pipeline parameters
from sagemaker.workflow.parameters import ParameterString, ParameterFloat, ParameterInteger

# -------------------------------------------------------------------
# Pipeline parameters (editable when starting the pipeline)
# -------------------------------------------------------------------

# Input ground-truth dataset (ground_truth.json) in S3
DataInputS3Uri = ParameterString(
    name="DataInputS3Uri",
    default_value=f"s3://{default_bucket}/{base_job_prefix}/data/ground_truth.json",
)

# Base output prefix for artifacts & intermediate outputs
BaseOutputS3Uri = ParameterString(
    name="BaseOutputS3Uri",
    default_value=f"s3://{default_bucket}/{base_job_prefix}/artifacts",
)

# SageMaker managed MLflow tracking server ARN
MLflowTrackingServerArn = ParameterString(
    name="MLflowTrackingServerArn",
    #default_value="arn:aws:sagemaker:REGION:ACCOUNT_ID:mlflow-tracking-server/YOUR-ID",
    default_value="arn:aws:sagemaker:us-east-1:874604298668:mlflow-tracking-server/riv-eval"
)

# MLflow experiment name
MLflowExperimentName = ParameterString(
    name="MLflowExperimentName",
    default_value="financial-compliance-agent-eval",
)

# Bedrock model ID (Qwen or any chat model you want)
ModelId = ParameterString(
    name="ModelId",
    default_value="qwen.qwen3-32b-v1:0",
)

# Prompt ID (if you later wire Bedrock Prompt Management; not strictly required here)
PromptId = ParameterString(
    name="PromptId",
    default_value="financial-compliance-base-prompt",
)

# Evaluation + throttling parameters
AccuracyThreshold = ParameterFloat(
    name="AccuracyThreshold",
    default_value=0.8,
)

RateLimitDelaySeconds = ParameterInteger(
    name="RateLimitDelaySeconds",
    default_value=10,
)

print("Pipeline parameters created.")


Pipeline parameters created.


In [None]:
#Cell 4 - S3 + MLFlow utilities 
import io
import json
from urllib.parse import urlparse

import boto3
import mlflow
import pandas as pd


def _parse_s3_uri(s3_uri: str):
    """Split s3://bucket/key into (bucket, key)."""
    if not s3_uri.startswith("s3://"):
        raise ValueError(f"Invalid S3 URI: {s3_uri}")
    parsed = urlparse(s3_uri)
    bucket = parsed.netloc
    key = parsed.path.lstrip("/")
    return bucket, key


def read_json_records_from_s3(s3_uri: str) -> pd.DataFrame:
    """Read a JSON 'records' file from S3 into a DataFrame."""
    bucket, key = _parse_s3_uri(s3_uri)
    s3 = boto3.client("s3")
    obj = s3.get_object(Bucket=bucket, Key=key)
    body = obj["Body"].read()
    return pd.read_json(io.BytesIO(body), orient="records")


def write_json_records_to_s3(df: pd.DataFrame, s3_uri: str):
    """Write a DataFrame as JSON 'records' to S3."""
    bucket, key = _parse_s3_uri(s3_uri)
    s3 = boto3.client("s3")
    body = df.to_json(orient="records").encode("utf-8")
    s3.put_object(Bucket=bucket, Key=key, Body=body)


def init_mlflow(tracking_server_arn: str, experiment_name: str):
    """
    Connect to SageMaker managed MLflow and select/create the experiment.

    Note: in the console you'll copy the MLflow tracking server ARN and pass
    it via `MLflowTrackingServerArn` pipeline parameter.
    """
    mlflow.set_tracking_uri(tracking_server_arn)
    mlflow.set_experiment(experiment_name)


In [None]:
#Cell 5 - Data prep (Step 1)

from typing import Optional


@step(
    name="data-preparation",
    instance_type="ml.t3.medium",
    keep_alive_period_in_seconds=3600,
)
def data_preparation_step(
    data_input_s3_uri: str,
    base_output_s3_uri: str,
    tracking_server_arn: str,
    experiment_name: str,
    pipeline_run_id: str,
) -> str:
    """
    Step 1: Load ground-truth dataset, validate, log to MLflow, and
    write a normalized dataset to S3 for downstream steps.

    Returns:
        normalized_dataset_s3_uri (str): S3 URI of normalized dataset.
    """
    import numpy as np

    init_mlflow(tracking_server_arn, experiment_name)

    run_name = f"data-prep-{pipeline_run_id}"
    with mlflow.start_run(run_name=run_name):
        df = read_json_records_from_s3(data_input_s3_uri)

        # Basic sanity checks / stats
        n_rows = len(df)
        n_rag = int((df.get("tool_label") == "rag").sum()) if "tool_label" in df else 0
        n_web = int((df.get("tool_label") == "web_search").sum()) if "tool_label" in df else 0

        mlflow.log_param("data_input_s3_uri", data_input_s3_uri)
        mlflow.log_metric("num_rows", n_rows)
        mlflow.log_metric("num_rag_rows", n_rag)
        mlflow.log_metric("num_web_search_rows", n_web)

        # Normalize/ensure expected columns exist
        expected_cols = ["prompt", "output", "tool_label", "context", "page"]
        for col in expected_cols:
            if col not in df.columns:
                df[col] = np.nan

        # Output location for normalized dataset
        normalized_dataset_s3_uri = (
            f"{base_output_s3_uri.rstrip('/')}/data/ground_truth_normalized.json"
        )

        write_json_records_to_s3(df, normalized_dataset_s3_uri)
        mlflow.log_param("normalized_dataset_s3_uri", normalized_dataset_s3_uri)

    return normalized_dataset_s3_uri


In [10]:
#Cell 6 - Import helpers 
# Agent + RAG helpers from fc_agent_eval.py

from fc_agent_eval import (
    init_chroma_retriever,
    build_financial_agent,
    format_prompt,
    get_clean_docs,
    extract_combined_tools,
)

print("Imported fc_agent_eval helpers successfully.")

# Note:
# - fc_agent_eval.init_chroma_retriever() uses a DEFAULT_CHROMA_PERSIST_PATH
#   that is relative to the repo root: ../data/10k-vec-db
# - Make sure you've already built the Chroma DB once (locally or via a
#   separate job) so that the persisted index exists at that location.
#
# If you want the pipeline step to rebuild the index instead, you can
# call init_chroma_retriever(...) with pdf_paths + recreate=True inside
# the inference step.


Imported fc_agent_eval helpers successfully.


In [11]:
#Cell 7 - Inference (Step 2)
@step(
    name="agent-inference",
    instance_type="ml.g5.xlarge",   # pick an instance with GPU if needed for Bedrock SDK usage
    keep_alive_period_in_seconds=3600,
)
def agent_inference_step(
    dataset_s3_uri: str,
    model_id: str,
    prompt_id: str,  # kept for consistency, even if unused
    base_output_s3_uri: str,
    tracking_server_arn: str,
    experiment_name: str,
    pipeline_run_id: str,
    rate_limit_delay: int = 10,
) -> str:
    """
    Step 2: Run agent inference on all prompts, build an evaluation-ready
    dataset, and write it to S3.

    Returns:
        eval_dataset_s3_uri (str): S3 URI of results with columns:
          [prompt, output, tool_label, context, page,
           clean_answers, extracted_contexts, tool_used, raw_answers]
    """
    import time
    import numpy as np
    import pandas as pd
    from tqdm import tqdm

    init_mlflow(tracking_server_arn, experiment_name)

    run_name = f"inference-{pipeline_run_id}"
    with mlflow.start_run(run_name=run_name):
        df = read_json_records_from_s3(dataset_s3_uri)

        mlflow.log_param("dataset_s3_uri", dataset_s3_uri)
        mlflow.log_param("model_id", model_id)
        mlflow.log_param("prompt_id", prompt_id)
        mlflow.log_param("rate_limit_delay_seconds", rate_limit_delay)

        # --------------------------------------------------------------
        # Initialize Chroma RAG index (no rebuild by default)
        # --------------------------------------------------------------
        # If the 10k-vec-db already exists on disk (as in your original
        # notebook), this will just attach to it. To rebuild, pass
        # pdf_paths=[...], recreate=True.
        init_chroma_retriever(
            pdf_paths=None,      # or ["../data/AMZN-2023-10k.pdf"] if rebuilding
            recreate=False,
        )

        # --------------------------------------------------------------
        # Build Bedrock-backed financial agent (Qwen + RAG + web_search)
        # --------------------------------------------------------------
        agent = build_financial_agent(model_id=model_id)

        # --------------------------------------------------------------
        # Run inference over all prompts
        # --------------------------------------------------------------
        prompts = df["prompt"].tolist()
        answers = []

        for p in tqdm(prompts, desc="Running agent inference"):
            try:
                prompt_msg_list = format_prompt(p)  # List[ChatMessage]
                res = agent.run(prompt_msg_list)
            except Exception as e:
                # Keep alignment even if something fails
                res = {"messages": [], "error": str(e)}
            answers.append(res)
            time.sleep(rate_limit_delay)

        # Attach raw answers
        df["raw_answers"] = answers

        # Clean final answers (mirror your original notebook logic)
        def get_final_text(answer):
            try:
                msgs = answer["messages"]
                last = msgs[-1]
                # Haystack ChatMessage usually exposes .text
                return getattr(last, "text", None) or getattr(last, "content", None) or "I don't know"
            except Exception:
                return "I don't know"

        df["clean_answers"] = [get_final_text(a) for a in answers]

        # Extract retrieved contexts for RAG (using fc_agent_eval.get_clean_docs)
        df["extracted_contexts"] = [get_clean_docs(a) for a in answers]

        # Extract tool usage (using fc_agent_eval.extract_combined_tools)
        df["tool_used"] = [extract_combined_tools(a) for a in answers]

        # Basic counts for logging
        tool_counts = df["tool_used"].value_counts().to_dict()
        for k, v in tool_counts.items():
            mlflow.log_metric(f"tool_used_{k}", float(v))

        # Write evaluation-ready dataset to S3
        eval_dataset_s3_uri = (
            f"{base_output_s3_uri.rstrip('/')}/eval/agent_eval_dataset.json"
        )
        write_json_records_to_s3(df, eval_dataset_s3_uri)
        mlflow.log_param("eval_dataset_s3_uri", eval_dataset_s3_uri)

    return eval_dataset_s3_uri


In [12]:
#Cell 8 Evaluation & Metrics (Step 3)
@step(
    name="agent-evaluation",
    instance_type="ml.m5.xlarge",
    keep_alive_period_in_seconds=3600,
)
def agent_evaluation_step(
    eval_dataset_s3_uri: str,
    tracking_server_arn: str,
    experiment_name: str,
    pipeline_run_id: str,
    accuracy_threshold: float,
) -> float:
    """
    Step 3: Compute evaluation metrics (SAS, tool selection accuracy, nDCG)
    and log everything to MLflow.

    Returns:
        avg_sas_score (float): Average semantic answer similarity.
    """
    import io
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns

    from haystack.components.evaluators import SASEvaluator, DocumentNDCGEvaluator
    from haystack import Document

    init_mlflow(tracking_server_arn, experiment_name)
    run_name = f"evaluation-{pipeline_run_id}"

    with mlflow.start_run(run_name=run_name):
        df = read_json_records_from_s3(eval_dataset_s3_uri)
        mlflow.log_param("eval_dataset_s3_uri", eval_dataset_s3_uri)

        # --------------------------------------------------------------
        # 1) Semantic Answer Similarity (SAS)
        # --------------------------------------------------------------
        gt = df["output"].tolist()
        pa = df["clean_answers"].tolist()

        sas_evaluator = SASEvaluator()
        sas_evaluator.warm_up()
        sas_result = sas_evaluator.run(
            ground_truth_answers=gt,
            predicted_answers=pa,
        )

        df["sas_score"] = sas_result["individual_scores"]
        avg_sas = float(sas_result["score"])

        mlflow.log_metric("sas_mean", avg_sas)
        mlflow.log_metric("sas_min", float(np.min(df["sas_score"])))
        mlflow.log_metric("sas_max", float(np.max(df["sas_score"])))

        # --------------------------------------------------------------
        # 2) Tool selection confusion matrix & accuracy
        # --------------------------------------------------------------
        tool_confusion = pd.crosstab(df["tool_label"], df["tool_used"])
        correct = 0
        for tool in tool_confusion.index:
            if tool in tool_confusion.columns:
                correct += tool_confusion.loc[tool, tool]
        total = tool_confusion.to_numpy().sum()
        tool_selection_accuracy = float(correct / total) if total > 0 else 0.0

        mlflow.log_metric("tool_selection_accuracy", tool_selection_accuracy)

        # Confusion matrix heatmap as artifact
        plt.figure(figsize=(6, 4))
        sns.heatmap(
            tool_confusion,
            annot=True,
            fmt="d",
            cmap="Blues",
            linewidths=0.5,
            linecolor="gray",
            cbar=True,
        )
        plt.title("Tool Selection Confusion Matrix")
        plt.xlabel("Tool Used by Agent")
        plt.ylabel("Ground Truth Tool Label")
        plt.tight_layout()
        mlflow.log_figure(plt.gcf(), "tool_selection_confusion_matrix.png")
        plt.close()

        # --------------------------------------------------------------
        # 3) RAG retrieval quality (nDCG)
        # --------------------------------------------------------------
        rag_df = df[df["tool_label"] == "rag"].copy()

        context_gt = []
        for ctx in rag_df["context"].tolist():
            if isinstance(ctx, list):
                context_gt.append([Document(content=str(doc)) for doc in ctx])
            else:
                context_gt.append([])

        retrieved_docs = rag_df["extracted_contexts"].tolist()

        ndcg_evaluator = DocumentNDCGEvaluator()
        ndcg_result = ndcg_evaluator.run(
            ground_truth_documents=context_gt,
            retrieved_documents=retrieved_docs,
        )

        ndcg_scores = ndcg_result["individual_scores"]
        avg_ndcg = float(ndcg_result["score"])

        rag_df = rag_df.reset_index(drop=False).rename(columns={"index": "row_index"})
        rag_df["ndcg_score"] = [round(s, 3) for s in ndcg_scores]

        mlflow.log_metric("ndcg_mean", avg_ndcg)
        mlflow.log_metric("ndcg_min", float(np.min(ndcg_scores)))
        mlflow.log_metric("ndcg_max", float(np.max(ndcg_scores)))

        # Worst RAG cases CSV artifact
        worst_rag = (
            rag_df.sort_values("ndcg_score")
                 .loc[:, ["prompt", "ndcg_score"]]
                 .head(10)
        )
        worst_csv = worst_rag.to_csv(index=False)
        mlflow.log_text(worst_csv, "worst_rag_ndcg_cases.csv")

        # --------------------------------------------------------------
        # 4) Simple quality gate (SAS vs AccuracyThreshold)
        # --------------------------------------------------------------
        quality_pass = float(avg_sas >= accuracy_threshold)
        mlflow.log_metric("quality_pass", quality_pass)
        mlflow.log_metric("accuracy_threshold", float(accuracy_threshold))

    return avg_sas
 

In [14]:
# Cell 9 - Wire steps
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "financial-compliance-agent-eval-pipeline"

# -------------------------------------------------------------------
# Wire steps together using their DelayedReturn outputs
# -------------------------------------------------------------------

# Step 1 – Data Prep
data_prep = data_preparation_step(
    data_input_s3_uri=DataInputS3Uri,
    base_output_s3_uri=BaseOutputS3Uri,
    tracking_server_arn=MLflowTrackingServerArn,
    experiment_name=MLflowExperimentName,
    pipeline_run_id=ExecutionVariables.PIPELINE_EXECUTION_ID,
)

# Step 2 – Inference (takes the *return value* of step 1 as input)
agent_infer = agent_inference_step(
    dataset_s3_uri=data_prep,    # <-- pass DelayedReturn directly, no .properties
    model_id=ModelId,
    prompt_id=PromptId,
    base_output_s3_uri=BaseOutputS3Uri,
    tracking_server_arn=MLflowTrackingServerArn,
    experiment_name=MLflowExperimentName,
    pipeline_run_id=ExecutionVariables.PIPELINE_EXECUTION_ID,
    rate_limit_delay=RateLimitDelaySeconds,
)

# Step 3 – Evaluation (takes the *return value* of step 2 as input)
agent_eval = agent_evaluation_step(
    eval_dataset_s3_uri=agent_infer,   # <-- pass DelayedReturn directly
    tracking_server_arn=MLflowTrackingServerArn,
    experiment_name=MLflowExperimentName,
    pipeline_run_id=ExecutionVariables.PIPELINE_EXECUTION_ID,
    accuracy_threshold=AccuracyThreshold,
)

# -------------------------------------------------------------------
# Create SageMaker Pipeline object
#   - only the leaf node (agent_eval) is strictly required
#   - SageMaker infers upstream dependencies from the DelayedReturn graph
# -------------------------------------------------------------------
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        DataInputS3Uri,
        BaseOutputS3Uri,
        MLflowTrackingServerArn,
        MLflowExperimentName,
        ModelId,
        PromptId,
        AccuracyThreshold,
        RateLimitDelaySeconds,
    ],
    steps=[agent_eval],   # leaf node; data_prep & agent_infer inferred automatically
    sagemaker_session=pipeline_session,
)

print("Pipeline object created:", pipeline_name)


Pipeline object created: financial-compliance-agent-eval-pipeline


In [None]:
#Cell 10 - upsert/ run 
# Register / update the pipeline definition in SageMaker
pipeline_upsert_response = pipeline.upsert(role_arn=role)
print("Upsert response:", pipeline_upsert_response)

# Example: start an execution (you can also start from the SageMaker console)
execution = pipeline.start(
    parameters={
        "DataInputS3Uri": f"s3://{default_bucket}/{base_job_prefix}/data/ground_truth.json",
        "BaseOutputS3Uri": f"s3://{default_bucket}/{base_job_prefix}/artifacts",
        "MLflowTrackingServerArn": "arn:aws:iam::874604298668:role/service-role/AmazonSageMaker-ExecutionRole-20240122T092140e",
        "MLflowExperimentName": "financial-compliance-agent-eval",
        "ModelId": "qwen.qwen3-32b-v1:0",
        "PromptId": "financial-compliance-base-prompt",
        "AccuracyThreshold": 0.8,
        "RateLimitDelaySeconds": 10,
    }
)

print("Started execution:", execution.arn)

KeyboardInterrupt: 