<div class="title-slide">
  
# Module 3.1 -  Practical Use Case Evaluations   
<span style="font-size:20px; line-height:2;">
Dr. Hari Manassery Koduvely <br> 
Principal Data Scientist  <br>  
Cybersecurity Analytics <br>  
Ottawa, Canada <br>  
January 28, 2026


## Quick Recap of Sessions 1 & 2   
- <span style="font-size:18px;">**GenAI Application** evaluation as a Holistic Process.</span>  
  
- <span style="font-size:18px;">**Reference-Based** and **Reference-Free** Evaluations.</span>    
  
- <span style="font-size:18px;">**Classical Metrics** for Reference-Based Evaluations.</span>  
  
- <span style="font-size:18px;">**LLM-as-a-Judge** method</span>  
  
- <span style="font-size:18px;">**Prompt Guidelines** for LLM-as-a-Judge method</span>   
  
- <span style="font-size:18px;">**Biases** in  LLM-as-a-Judge method and their mitigations.</span>   
  
- <span style="font-size:18px;"> Introdution to Observability</span>   
  
- <span style="font-size:18px;"> How to instrument Observability using **Open Telemetry**</span>   
  
- <span style="font-size:18px;"> **G-Eval** and **RAGAS** Frameworks</span>    
  
- <span style="font-size:18px;"> Open Source tool **DeepEval** </span>   

 

## Sesssion 3 Learning Objectives  
- <span style="font-size:18px;">How to collect observability data from a real-world GenAI business application.</span>  
  
- <span style="font-size:18px;">How to use the observability data for evaluation using LLM-as-a-Judge Method.</span>  


## Business Use Case: A CyberWizard RAG Agent 
- <span style="font-size:18px;">An **Agentic RAG AI Assistant** capable of answering any Cybersecurity related questions for assisting **Cyber Security Analysts**.</span>   
  
- <span style="font-size:18px;">Depending on the users question, the agent decides whether to:</span>  
    - <span style="font-size:18px;">Look up a **MITRE knowledgebase** hosted locally in a vector database to get information about a particular attack TTP.</span>  
    - <span style="font-size:18px;">Call **Virus Total API** to find whether a particular file hash belongs to any known malware family.</span>   
      
    - <span style="font-size:18px;">Call **National Vulnerability Database API** to find details about a known software vulnerability.</span>  
      
- <span style="font-size:18px;">**Summarize the result using an LLM** and return the answer to the user.</span>  

## What you'll learn
- <span style="font-size:18px;">How to set up a **RAG Agent** using **Strands Agent SDK** on AwS with observability instrumented using **OpenTelemetry**.</span>    
   
- <span style="font-size:18px;">How to visualize agent traces on **Amazon CloudWatch** and **Amazon CloudTrail** and collect the data to a S3 bucket for further analysis.</span>   
  
- <span style="font-size:18px;">How to use the **LLM-as-a-Judge** method to assess the **Tool Calling** and **RAG Retrieval** performances of the CyberWizard Agent.</span>   


#### Reference: [Amazon Bedrock AgentCore Tutorials](https://github.com/awslabs/amazon-bedrock-agentcore-samples/tree/main/01-tutorials)  
![image.png](../Images/amazon_bedrock_agentcore_tutorials.png)


## Prerequisites
- <span style="font-size:18px;">Enable transaction search on Amazon CloudWatch:</span>   
  
    - <span style="font-size:18px;"> First-time users must enable CloudWatch Transaction Search to view Bedrock AgentCore spans and traces.</span>      
      
    - <span style="font-size:18px;">To enable transaction search, refer to the AWS [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Enable-TransactionSearch.html).</span>  
      
- <span style="font-size:18px;">Log group and Log stream configured on Amazon Cloudwatch to be added to the environment variables.</span>  
  
- <span style="font-size:18px;">AWS account with Amazon Bedrock Model access to Claude Haiku 4.5 with Model ID: global.anthropic.claude-haiku-4-5-20251001-v1:0</span>  
  
- <span style="font-size:18px;">AWS credentials configured using `aws configure`</span>  
  
- <span style="font-size:18px;">.env file updated with environment variables variables.</span>   

## 1. Setup and Installation

<span style="font-size:18px;"> Install the required dependencies mentioned in requirements.txt file.</span>   

In [1]:
#!pip install --force-reinstall -U -r requirements.txt --quiet

In [7]:
!export AWS_PROFILE=628896215736_Fed_Developer

#### Deploying the Pre-requisites

<span style="font-size:18px;"> Create a **log group** and a **log stream** for AgentCore observability. </span>  
  
<span style="font-size:18px;"> This can be done either using the script below or from AWS CloudWatch UI. </span> 

In [None]:
import boto3
cloudwatch_client = boto3.client("logs", region_name="us-gov-east-1")
response = cloudwatch_client.create_log_group(
    logGroupName='agents/cyberwizard-rag-agent'
)
response

In [None]:
response = cloudwatch_client.create_log_stream(
    logGroupName='agents/cyberwizard-rag-agent',
    logStreamName='default'
)
response

#### Enabling transactional search

<span style="font-size:18px;"> To run this example you first need to enable transactional search.</span>     
<span style="font-size:18px;"> You can do so in the AWS console following this [link](https://console.aws.amazon.com/cloudwatch/home#xray:settings/transaction-search).</span>  

<span style="font-size:18px;"> Once in this page, click on edit and set the option to ingest spans as structured logs in the OpenTelemetry format </span>  
![image.png](../Images/transactional_search.png)
![image.png](../Images/transactional_search2.png)

## 2. Environment Configuration
<span style="font-size:18px;"> To enable observability for your Strands agent and send telemetry data to Amazon CloudWatch, you'll need to configure the following environment variables.</span>     
<span style="font-size:18px;"> We use a `.env` file to manage these settings securely, keeping sensitive AWS credentials separate from your code while making it easy to switch between different environments.</span>  

<span style="font-size:18px;">**Ensure your AWS credentials are configured**.</span>  

<span style="font-size:18px;">We will create a `.env` file for configuring the environment variables. Use `Strands/.env.example` as a template.</span>

<span style="font-size:18px;">Required Environment Variables:</span>

| Variable | Value | Purpose |
|----------|-------|---------|
| `OTEL_PYTHON_DISTRO` | `aws_distro` | Use AWS Distro for OpenTelemetry (ADOT) |
| `OTEL_PYTHON_CONFIGURATOR` | `aws_configurator` | Set AWS configurator for ADOT SDK |
| `OTEL_EXPORTER_OTLP_PROTOCOL` | `http/protobuf` | Configure export protocol |
| `OTEL_EXPORTER_OTLP_LOGS_HEADERS` | `x-aws-log-group=<YOUR-LOG-GROUP>,x-aws-log-stream=<YOUR-LOG-STREAM>,x-aws-metric-namespace=<YOUR-NAMESPACE>` | Direct logs to CloudWatch groups |
| `OTEL_RESOURCE_ATTRIBUTES` | `service.name=<YOUR-AGENT-NAME>` | Identify your agent in observability data |
| `AGENT_OBSERVABILITY_ENABLED` | `true` | Activate ADOT pipeline |
| `AWS_REGION` | `<YOUR-REGION>` | AWS Region |

## 3. Load Environment Variables

<span style="font-size:18px;">Load the environment variables from the `.env` file:</span>  

In [3]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Display the OTEL-related environment variables
otel_vars = [
    "OTEL_PYTHON_DISTRO",
    "OTEL_PYTHON_CONFIGURATOR",
    "OTEL_EXPORTER_OTLP_PROTOCOL",
    "OTEL_EXPORTER_OTLP_LOGS_HEADERS",
    "OTEL_RESOURCE_ATTRIBUTES",
    "AGENT_OBSERVABILITY_ENABLED",
    "OTEL_TRACES_EXPORTER"
]

print("OpenTelemetry Configuration:")
for var in otel_vars:
    value = os.getenv(var)
    if value:
        print(f"{var}={value}")

OpenTelemetry Configuration:
OTEL_PYTHON_DISTRO=aws_distro
OTEL_PYTHON_CONFIGURATOR=aws_configurator
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=agents/cyberwizard-rag-agent,x-aws-log-stream=default,x-aws-metric-namespace=bedrock-agentcore
OTEL_RESOURCE_ATTRIBUTES=service.name=cyberwizard-rag-agent
AGENT_OBSERVABILITY_ENABLED=true
OTEL_TRACES_EXPORTER=otlp


## 4. Create a Strands Agent in a python file

<span style="font-size:18px;">CyberWizard Agent implementation is provided in `cyberwizard_rag_agent.py`.</span>  
  
<span style="font-size:18px;"> It is Agentic RAG AI Assistant capable of answering any Cybersecurity related questions for assisting Cyber Threat Analysts.</span>  

<span style="font-size:18px;">The Agent is Configured with the following: </span>   

- <span style="font-size:18px;"> A system prompt that defines the agent's role as a Cybersecurity Expert.</span>    
  
- <span style="font-size:18px;"> Amazon Bedrock's Claude Haiku model as it's Large Language Model.</span>    
  
- <span style="font-size:18px;"> A **Vector DB** serach tool for retrieving MITRE ATT&CK framework related information.</span>    
  
- <span style="font-size:18px;"> A tool to query **Virus Total** API.</span>    
  
- <span style="font-size:18px;"> A tool to query **National Vulnerability Databasl** API.</span>    

<span style="font-size:18px;"> The AWS OpenTelemetry distro will automatically handle tracer provider setup when using `opentelemetry-instrument` command and invoke the python code.</span>  

In [None]:
%%writefile cyberwizard_rag_agent.py
#General imports
import os
import logging
import argparse
import json
import atexit
from datetime import datetime
from io import StringIO
from pathlib import Path
from threading import Lock
from typing import Dict, List, Optional, Tuple
from urllib.parse import quote_plus
from uuid import uuid4
import requests
import numpy as np
from pypdf import PdfReader

# AWS imports
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from aws_xray_exporter import AwsXRaySpanExporter

# Strands Agent SDK imports
from strands import Agent, tool
from strands.models import BedrockModel

# OpenTelemetry imports
from opentelemetry import baggage, context, trace
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.extension.aws.trace.aws_xray_id_generator import AwsXRayIdGenerator
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased


# Parse CLI arguments
def parse_arguments():
    """Parse the CLI arguments that control session metadata and ingestion."""
    parser = argparse.ArgumentParser(description='Strands Cybersecurity Agent with Session Tracking')
    parser.add_argument('--session-id', 
                       type=str, 
                       required=True,
                       help='Session ID to associate with this agent run')
    parser.add_argument('--question',
                       type=str,
                       required=False,
                       help='Cybersecurity investigation question for the agent to analyze')
    parser.add_argument('--ingest-pdfs',
                       action='store_true',
                       help='When set, ingest PDFs from the local folder before running the agent')
    parser.add_argument('--pdf-folder',
                       type=str,
                       default='pdf_files',
                       help='Folder containing PDFs to ingest into the vector store')
    return parser.parse_args()

# Setup Session Id in OpenTelemetry
def set_session_context(session_id):
    """Set the session ID in OpenTelemetry baggage for trace correlation"""
    ctx = baggage.set_baggage("session.id", session_id)
    token = context.attach(ctx)
    logging.info(f"Session ID '{session_id}' attached to telemetry context")
    return token

###########################
#### Agent Code below: ####
###########################

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configure Strands logging
logging.getLogger("strands").setLevel(logging.INFO)


# AWS S3 Log Handler
S3_BUCKET_ENV = "AGENT_LOG_BUCKET"
S3_PREFIX_ENV = "AGENT_LOG_PREFIX"
S3_BUFFER_BYTES_ENV = "AGENT_LOG_MAX_BUFFER"
_s3_log_handler = None

class S3LogHandler(logging.Handler):
    """Buffer logs locally and periodically upload them to a S3 bucket."""

    def __init__(
        self,
        bucket_name: str,
        prefix: str = "cloudwatch-export",
        session_id: Optional[str] = None,
        client=None,
        max_bytes: int = 262_144,
    ) -> None:
        super().__init__()
        self.bucket_name = bucket_name
        self.session_id = session_id or "unknown-session"
        self._prefix = prefix.strip("/") or "cloudwatch-export"
        self._client = client or boto3.client("s3")
        self._buffer = StringIO()
        self._lock = Lock()
        self._run_id = uuid4().hex
        self._sequence = 0
        self._max_bytes = max(4096, int(max_bytes))

    def set_session_id(self, session_id: Optional[str]) -> None:
        """Refresh the session identifier used in generated log keys."""
        if session_id:
            self.session_id = session_id

    def emit(self, record: logging.LogRecord) -> None:
        """Format a log record, buffer it, and flush if the buffer is full."""
        try:
            message = self.format(record)
        except Exception:  # pragma: no cover - fallback path
            message = record.getMessage()

        payload = None
        with self._lock:
            self._buffer.write(message + "\n")
            if self._buffer.tell() >= self._max_bytes:
                payload = self._drain_buffer_locked()

        if payload:
            self._upload_payload(payload)

    def flush(self) -> None:
        """Force a buffered upload regardless of the current buffer size."""
        payload = None
        with self._lock:
            payload = self._drain_buffer_locked()
        if payload:
            self._upload_payload(payload)

    def close(self) -> None:
        """Flush remaining bytes and release resources."""
        try:
            self.flush()
        finally:
            try:
                self._buffer.close()
            except Exception:  # pragma: no cover - defensive
                pass
            super().close()

    def _drain_buffer_locked(self) -> Optional[Tuple[str, bytes]]:
        """Return the current buffer contents as bytes and reset the buffer."""
        contents = self._buffer.getvalue()
        if not contents.strip():
            return None
        self._buffer.close()
        self._buffer = StringIO()
        key = self._build_key()
        return key, contents.encode("utf-8")

    def _build_key(self) -> str:
        """Generate a deterministic S3 object key for the current session."""
        self._sequence += 1
        now = datetime.utcnow()
        date_path = now.strftime("%Y/%m/%d")
        filename = f"{self._sanitize(self.session_id)}-{now.strftime('%H%M%S')}-{self._run_id}-{self._sequence:04d}.log"
        return f"{self._prefix}/{date_path}/{filename}"

    def _sanitize(self, value: str) -> str:
        """Replace unsafe characters in path components with hyphens."""
        safe = [ch if ch.isalnum() or ch in ("-", "_") else "-" for ch in value[:80]]
        return "".join(safe) or "session"

    def _upload_payload(self, payload: Tuple[str, bytes]) -> None:
        """Send the buffered payload to S3 and log the outcome locally."""
        key, data = payload
        try:
            self._client.put_object(Bucket=self.bucket_name, Key=key, Body=data)
            print(f"[S3LogHandler] Uploaded logs to s3://{self.bucket_name}/{key}")
        except Exception as exc:  # pragma: no cover - network errors
            print(f"[S3LogHandler] Failed to upload logs to s3://{self.bucket_name}/{key}: {exc}")


def configure_s3_log_archival(session_id: Optional[str]) -> Optional[S3LogHandler]:
    """Ensure the shared S3 log handler exists and is bound to the session."""
    global _s3_log_handler
    bucket = os.getenv(S3_BUCKET_ENV)
    if not bucket:
        return None

    max_buffer = os.getenv(S3_BUFFER_BYTES_ENV, "262144")
    try:
        max_buffer_bytes = int(max_buffer)
    except ValueError:
        logger.warning(
            "Invalid %s value '%s'. Using default buffer size.",
            S3_BUFFER_BYTES_ENV,
            max_buffer,
        )
        max_buffer_bytes = 262_144

    if _s3_log_handler is None:
        prefix = os.getenv(S3_PREFIX_ENV, "cloudwatch-export")
        handler = S3LogHandler(
            bucket_name=bucket,
            prefix=prefix,
            session_id=session_id,
            max_bytes=max_buffer_bytes,
        )
        handler.setLevel(logging.INFO)
        handler.setFormatter(
            logging.Formatter("%(asctime)s %(levelname)s [%(name)s] %(message)s")
        )
        logging.getLogger().addHandler(handler)
        _s3_log_handler = handler
        atexit.register(handler.close)
        logger.info("S3 log archival enabled (bucket=%s, prefix=%s)", bucket, prefix)
    else:
        _s3_log_handler.set_session_id(session_id)

    return _s3_log_handler


def _parse_sample_ratio(value: str) -> float:
    """Safely coerce the OTEL sampling ratio into the valid [0, 1] range."""
    try:
        ratio = float(value)
    except (TypeError, ValueError):
        logger.warning("Invalid OTEL_TRACES_SAMPLER_ARG '%s'. Falling back to 1.0", value)
        return 1.0
    return min(1.0, max(0.0, ratio))

# sets up an in-process OpenTelemetry tracer that exports spans to AWS X-Ray 
# and instruments HTTP (requests) and AWS SDK (botocore) calls 
def configure_aws_tracing() -> None:
    """Configure an in-process AWS X-Ray exporter (Option 2)."""
    if os.getenv("STRANDS_DISABLE_CUSTOM_XRAY") in {"1", "true", "True"}:
        logger.info("Custom AWS X-Ray exporter disabled via STRANDS_DISABLE_CUSTOM_XRAY")
        return

    region = (
        os.getenv("AWS_TRACES_REGION")
        or os.getenv("AWS_REGION")
        or os.getenv("AWS_DEFAULT_REGION")
        or "us-east-1"
    )
    service_name = os.getenv("OTEL_SERVICE_NAME", "cyberwizard-rag-agent")
    resource_attrs = {SERVICE_NAME: service_name}
    if service_version := os.getenv("SERVICE_VERSION"):
        resource_attrs["service.version"] = service_version
    if service_namespace := os.getenv("SERVICE_NAMESPACE"):
        resource_attrs["service.namespace"] = service_namespace

    sample_ratio = _parse_sample_ratio(os.getenv("OTEL_TRACES_SAMPLER_ARG", "1.0"))

    tracer_provider = TracerProvider(
        resource=Resource.create(resource_attrs),
        id_generator=AwsXRayIdGenerator(),
        sampler=ParentBased(TraceIdRatioBased(sample_ratio)),
    )
    exporter = AwsXRaySpanExporter(region=region, service_name=service_name)
    tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(tracer_provider)
    logger.info(
        "AWS X-Ray tracing configured (region=%s, service=%s, sample_ratio=%s)",
        region,
        service_name,
        sample_ratio,
    )

    requests_instrumentor = RequestsInstrumentor()
    try:
        already_requests = requests_instrumentor.is_instrumented()  # type: ignore[attr-defined]
    except AttributeError:
        already_requests = False
    if not already_requests:
        try:
            requests_instrumentor.instrument()
        except Exception as exc:  # pragma: no cover - guard against re-entry issues
            logger.warning("Requests instrumentation failed: %s", exc)

    botocore_instrumentor = BotocoreInstrumentor()
    try:
        already_botocore = botocore_instrumentor.is_instrumented()  # type: ignore[attr-defined]
    except AttributeError:
        already_botocore = False
    if not already_botocore:
        try:
            botocore_instrumentor.instrument()
        except Exception as exc:  # pragma: no cover
            logger.warning("Botocore instrumentation failed: %s", exc)


configure_aws_tracing()

# Local numpy-based Vector Store Implementation for RAG

DEFAULT_VECTOR_DIR = "vector_store"

def _resolve_vector_store_dir(override: Optional[str] = None) -> Path:
    """Resolve the folder that stores embeddings and metadata."""
    if override:
        return Path(override)

    for env_key in ("VECTOR_STORE_DIR", "MITRE_VECTOR_DB_PATH"):
        configured = os.getenv(env_key)
        if configured:
            return Path(configured)

    return Path(DEFAULT_VECTOR_DIR)


def _vector_store_paths(store_dir: Path) -> Tuple[Path, Path]:
    """Return the embeddings.npy and documents.jsonl paths, ensuring the folder exists."""
    store_dir.mkdir(parents=True, exist_ok=True)
    return store_dir / "embeddings.npy", store_dir / "documents.jsonl"


def _extend_vector_store(store_dir: Path, embeddings: List[List[float]], records: List[dict]) -> int:
    """Append new embeddings and records to the local vector store."""
    if not embeddings:
        return 0

    embeddings_path, documents_path = _vector_store_paths(store_dir)
    new_array = np.array(embeddings, dtype=np.float32)

    if embeddings_path.exists():
        existing = np.load(embeddings_path)
        existing = existing.astype(np.float32, copy=False)
        new_array = np.vstack([existing, new_array])

    np.save(embeddings_path, new_array)

    with open(documents_path, "a", encoding="utf-8") as doc_file:
        for record in records:
            doc_file.write(json.dumps(record, ensure_ascii=False) + "\n")

    return len(records)


def _load_vector_store(store_dir: Path) -> Tuple[Optional[np.ndarray], List[dict]]:
    """Load embeddings and associated metadata from disk."""
    embeddings_path, documents_path = _vector_store_paths(store_dir)

    if not embeddings_path.exists() or not documents_path.exists():
        return None, []

    embeddings = np.load(embeddings_path)
    embeddings = embeddings.astype(np.float32, copy=False)

    with open(documents_path, "r", encoding="utf-8") as doc_file:
        records = [json.loads(line) for line in doc_file if line.strip()]

    if len(records) != len(embeddings):
        logger.warning(
            "Vector store record count mismatch (records=%s, embeddings=%s). Truncating to smallest.",
            len(records),
            len(embeddings),
        )
        min_len = min(len(records), len(embeddings))
        embeddings = embeddings[:min_len]
        records = records[:min_len]

    return embeddings, records


def _cosine_similarity_matrix(matrix: np.ndarray, vector: np.ndarray) -> np.ndarray:
    """Compute cosine similarity between a matrix of embeddings and a single vector."""
    if matrix.size == 0:
        return np.array([])

    vector_norm = np.linalg.norm(vector)
    if vector_norm == 0:
        return np.zeros(matrix.shape[0])

    normalized_vector = vector / vector_norm
    matrix_norms = np.linalg.norm(matrix, axis=1)
    matrix_norms[matrix_norms == 0] = 1e-10

    scores = matrix @ normalized_vector
    return scores / matrix_norms


def _chunk_text(text: str, chunk_size: int = 1200, overlap: int = 200):
    """Yield overlapping text windows for downstream embedding."""
    cleaned = " ".join(text.split())
    if not cleaned:
        return
    text_len = len(cleaned)
    start = 0
    while start < text_len:
        end = min(text_len, start + chunk_size)
        yield cleaned[start:end]
        if end == text_len:
            break
        start = max(0, end - overlap)


def _embed_texts_with_bedrock(texts: List[str]) -> List[List[float]]:
    """Call an AWS Bedrock embedding model and return dense vectors."""
    if not texts:
        return []

    model_id = os.getenv("BEDROCK_EMBEDDING_MODEL_ID", "amazon.titan-embed-text-v2:0")
    region = os.getenv("AWS_DEFAULT_REGION", "us-west-2")
    client = boto3.client("bedrock-runtime", region_name=region)

    embeddings: List[List[float]] = []
    for text in texts:
        payload = json.dumps({"inputText": text})
        try:
            response = client.invoke_model(
                body=payload,
                modelId=model_id,
                accept="application/json",
                contentType="application/json",
            )
        except (BotoCoreError, ClientError) as exc:
            logger.error("Embedding request failed: %s", exc)
            raise RuntimeError(f"Embedding request failed: {exc}") from exc

        body = response.get("body")
        data = json.loads(body.read()) if hasattr(body, "read") else json.loads(body)  # type: ignore[arg-type]
        embedding = data.get("embedding")
        if not embedding:
            raise RuntimeError("Embedding response missing 'embedding' field")
        embeddings.append(embedding)

    return embeddings


def ingest_pdf_folder(
    folder_path: str = "pdf_files",
    store_dir: Optional[str] = None,
    batch_size: int = 16,
    chunk_size: int = 1200,
    chunk_overlap: int = 200,
) -> str:
    """Ingest PDFs into the local numpy-based vector store."""
    source_dir = Path(folder_path)
    if not source_dir.exists():
        return f"PDF folder '{folder_path}' not found. Skipping ingestion."

    pdf_files = sorted(path for path in source_dir.iterdir() if path.suffix.lower() == ".pdf")
    if not pdf_files:
        return f"No PDF files discovered in '{folder_path}'."

    vector_dir = _resolve_vector_store_dir(store_dir)

    pending_records: List[dict] = []
    staged_records: List[dict] = []
    staged_embeddings: List[List[float]] = []

    def flush_batch():
        nonlocal pending_records
        if not pending_records:
            return
        batch = pending_records
        pending_records = []
        texts = [record["text"] for record in batch]
        embeddings = _embed_texts_with_bedrock(texts)
        staged_records.extend(batch)
        staged_embeddings.extend(embeddings)

    for pdf_path in pdf_files:
        try:
            reader = PdfReader(str(pdf_path))
        except Exception as exc:  # pragma: no cover - depends on local files
            logger.error("Failed to read %s: %s", pdf_path, exc)
            continue

        for page_number, page in enumerate(reader.pages, start=1):
            text = page.extract_text() or ""
            for chunk_index, chunk in enumerate(_chunk_text(text, chunk_size, chunk_overlap), start=1):
                snippet = chunk.strip()
                if not snippet:
                    continue
                record_id = f"pdf::{pdf_path.stem}::p{page_number}::c{chunk_index}::{uuid4().hex}"
                metadata = {
                    "source": "pdf",
                    "file": pdf_path.name,
                    "page": page_number,
                    "chunk": chunk_index,
                }
                pending_records.append({"id": record_id, "text": snippet, "metadata": metadata})
                if len(pending_records) >= batch_size:
                    flush_batch()

    flush_batch()
    added_chunks = _extend_vector_store(vector_dir, staged_embeddings, staged_records)
    store_label = str(vector_dir)
    return (
        f"Ingested {added_chunks} PDF chunks from {len(pdf_files)} files into vector store "
        f"'{store_label}'."
    )


def _format_json(metadata):
    """Return metadata as a compact JSON string suitable for logging."""
    try:
        return json.dumps(metadata, ensure_ascii=True)
    except (TypeError, ValueError):
        return str(metadata)


# Tool implementations for the Cybersecurity RAG Agent
# Tool: Mitre Attack Search
@tool
def mitre_attack_search(query: str) -> str:
    """Search the local numpy-based MITRE ATT&CK vector store for techniques and mitigations."""
    store_dir = _resolve_vector_store_dir()
    top_k = int(os.getenv("MITRE_TOP_K", "3"))

    embeddings, records = _load_vector_store(store_dir)
    if embeddings is None or not records:
        return (
            f"Vector store '{store_dir}' is empty. Ingest MITRE or PDF data before searching."
        )

    query_embedding = _embed_texts_with_bedrock([query])[0]
    query_vector = np.array(query_embedding, dtype=np.float32)

    scores = _cosine_similarity_matrix(embeddings, query_vector)
    if scores.size == 0:
        return f"No MITRE ATT&CK matches for '{query}'."

    ranked_indices = np.argsort(scores)[::-1][:max(1, top_k)]

    formatted = []
    for rank, idx in enumerate(ranked_indices, start=1):
        record = records[idx]
        meta = record.get("metadata", {})
        snippet_text = (record.get("text") or "").strip().replace("\n", " ")
        technique = meta.get("technique_id") or meta.get("technique") or "Unknown Technique"
        tactic = meta.get("tactic") or meta.get("phase_name")
        reference = meta.get("url") or meta.get("reference") or meta.get("file")
        similarity = f"{scores[idx]:.4f}"

        formatted.append(
            f"{rank}. Technique: {technique}"
            f"{' | Tactic: ' + tactic if tactic else ''}\n"
            f"   Cosine similarity: {similarity}\n"
            f"   Insight: {snippet_text[:600]}\n"
            f"   Reference: {reference or _format_json(meta)}"
        )

    return "\n".join(formatted)

# Tool: VirusTotal Lookup
@tool
def virustotal_lookup(query: str) -> str:
    """Query VirusTotal for malware, hashes, domains, or URLs related to the input."""
    api_key = os.getenv("VIRUSTOTAL_API_KEY")
    if not api_key:
        return "VirusTotal API key not configured. Set VIRUSTOTAL_API_KEY to enable this tool."

    url = f"https://www.virustotal.com/api/v3/search?query={quote_plus(query)}"
    headers = {"x-apikey": api_key}

    try:
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()
    except requests.RequestException as exc:  # pragma: no cover - network call
        logger.error("VirusTotal lookup error: %s", exc)
        return f"VirusTotal lookup error: {exc}"

    payload = response.json()
    rows = payload.get("data", [])
    if not rows:
        return f"No VirusTotal matches found for '{query}'."

    formatted = []
    for idx, row in enumerate(rows[:5]):
        attributes = row.get("attributes", {})
        stats = attributes.get("last_analysis_stats", {})
        total = sum(stats.values()) or 1
        detection_ratio = f"{stats.get('malicious', 0)}/{total}"
        threat_label = attributes.get("popular_threat_classification", {}).get("suggested_threat_label")
        item_type = row.get("type", "unknown")
        vt_id = row.get("id", "unknown")
        gui_url = f"https://www.virustotal.com/gui/{item_type}/{vt_id}"

        formatted.append(
            f"{idx + 1}. Type: {item_type} | Detection ratio: {detection_ratio}\n"
            f"   Threat label: {threat_label or 'Not classified'}\n"
            f"   VT link: {gui_url}\n"
            f"   Query context: {query}"
        )

    return "\n".join(formatted)

# Tool: NVD Vulnerability Search

def _extract_cvss(metrics: dict) -> Tuple[str, str, str]:
    """Pick the most specific CVSS tuple (severity, score, vector) from NVD metrics."""
    metric_priority = ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2")
    for metric_name in metric_priority:
        metric_entries = metrics.get(metric_name)
        if metric_entries:
            entry = metric_entries[0]
            data = entry.get("cvssData", {})
            severity = entry.get("baseSeverity") or data.get("baseSeverity") or "UNKNOWN"
            score = data.get("baseScore") or entry.get("baseScore") or "N/A"
            vector = data.get("vectorString") or "N/A"
            return severity, str(score), vector
    return "UNKNOWN", "N/A", "N/A"


@tool
def nvd_vulnerability_search(query: str) -> str:
    """Search the National Vulnerability Database for CVEs related to the query."""
    params = {
        "keywordSearch": query,
        "resultsPerPage": os.getenv("NVD_RESULTS_PER_PAGE", "5"),
    }
    headers = {}
    api_key = os.getenv("NVD_API_KEY")
    if api_key:
        headers["apiKey"] = api_key

    try:
        response = requests.get(
            "https://services.nvd.nist.gov/rest/json/cves/2.0",
            params=params,
            headers=headers,
            timeout=30,
        )
        response.raise_for_status()
    except requests.RequestException as exc:  # pragma: no cover - network call
        logger.error("NVD lookup error: %s", exc)
        return f"NVD lookup error: {exc}"

    payload = response.json()
    vulns = payload.get("vulnerabilities", [])
    if not vulns:
        return f"No NVD entries found for '{query}'."

    formatted = []
    for idx, item in enumerate(vulns[:5]):
        cve = item.get("cve", {})
        cve_id = cve.get("id", "Unknown CVE")
        descriptions = cve.get("descriptions", [])
        description = next((d.get("value") for d in descriptions if d.get("lang") == "en"), "No English description available.")
        metrics = cve.get("metrics", {})
        severity, score, vector = _extract_cvss(metrics)
        published = cve.get("published")

        formatted.append(
            f"{idx + 1}. {cve_id}\n"
            f"   Severity: {severity} (score {score}, vector {vector})\n"
            f"   Published: {published or 'Unknown'}\n"
            f"   Summary: {description}"
        )

    return "\n".join(formatted)

# Bedrock LLM Model Initialization
def get_bedrock_model():
    """Instantiate the configured Bedrock text model for conversational responses."""
    model_id = os.getenv("BEDROCK_MODEL_ID", "global.anthropic.claude-haiku-4-5-20251001-v1:0")
    region = os.getenv("AWS_DEFAULT_REGION", "us-east-1")

    try:
        bedrock_model = BedrockModel(
            model_id=model_id,
            region_name=region,
            temperature=0.7,
            max_tokens=1024
        )
        logger.info(f"Successfully initialized Bedrock model: {model_id} in region: {region}")
        return bedrock_model
    except Exception as e:
        logger.error(f"Failed to initialize Bedrock model: {str(e)}")
        logger.error("Please ensure you have proper AWS credentials configured and access to the Bedrock model")
        raise


def main():
    """Entry point that ingests data, configures telemetry, and runs the agent."""
    # Parse command line arguments
    args = parse_arguments()
    s3_handler = configure_s3_log_archival(args.session_id)

    # Set session context for telemetry
    context_token = set_session_context(args.session_id)

    try:
        # Optionally ingest PDFs before initializing the conversational model
        ingest_summary = None
        if args.ingest_pdfs:
            try:
                ingest_summary = ingest_pdf_folder(folder_path=args.pdf_folder)
                logger.info(ingest_summary)
            except Exception as exc:  # pragma: no cover - depends on local data
                logger.error("PDF ingestion failed: %s", exc)
                ingest_summary = f"PDF ingestion failed: {exc}"

        # Initialize Bedrock model
        bedrock_model = get_bedrock_model()

        # Create cybersecurity agent
        cybersecurity_agent = Agent(
            model=bedrock_model,
            system_prompt="""You are a Cybersecurity Expert that helps analyst questions, selects the most relevant 
            data source, and synthesizes a concise incident response report. Use the MITRE ATT&CK tool for questions 
            about adversary behavior or attack techniques, the VirusTotal tool for malware, hashes, or domains, and the 
            NVD tool for software vulnerabilities or CVE lookups. Always explain which source you used and cite key 
            evidence.""",
            tools=[mitre_attack_search, virustotal_lookup, nvd_vulnerability_search],
            trace_attributes={
                "user.id": "user@domain.com",
                "tags": ["Strands", "Cybersecurity"],
            }
        )

        # # Execute the cybersecurity investigation
        # query = args.question or """What is the MITRE ATT&CK technique for brute forcing RDP, and are there any known vulnerabilities"""

        # result = cybersecurity_agent(query)
        # if ingest_summary:
        #     print("PDF ingestion:", ingest_summary)
        # print("Result:", result)

    finally:
        # Detach context when done
        context.detach(context_token)
        logger.info(f"Session context for '{args.session_id}' detached")
        if s3_handler:
            s3_handler.close()


if __name__ == "__main__":
    main()


Overwriting cyberwizard_rag_agent.py


## 5. AWS OpenTelemetry Python Distro

<span style="font-size:18px;"> Now that the agent is setup and environment is configured, let's understand how the observability happens.</span>  
   
<span style="font-size:18px;"> The [AWS OpenTelemetry Python Distro](https://pypi.org/project/aws-opentelemetry-distro/) automatically instruments the Strands agent to capture telemetry data without requiring code changes.</span>  

<span style="font-size:18px;">The distribution provides:</span>  
- <span style="font-size:18px;"> **Auto-instrumentation** for your Strands Agent hosted outside of AgentCore Runtime (i.e. EC2, Lambda etc..)</span>  
- <span style="font-size:18px;">**AWS-optimized configuration** for seamless CloudWatch integration</span>    

### Running Your Instrumented Agent

<span style="font-size:18px;"> To capture traces from your Strands agent, use the `opentelemetry-instrument` command instead of running Python directly.</span>  

```bash
opentelemetry-instrument python cyberwizard_rag_agent.py
```

<span style="font-size:18px;">This automatically applies instrumentation using the environment variables from the `.env` file.</span>

<span style="font-size:18px;">This command will:</span>

- <span style="font-size:18px;">Load your OTEL configuration from the .env file.</span>
  
- <span style="font-size:18px;">Automatically instrument Strands, Amazon Bedrock calls, agent tool and databases, and other requests made by agent.</span>
  
- <span style="font-size:18px;">Send traces to CloudWatch.</span>
  
- <span style="font-size:18px;">Enable you to visualize the agent's decision-making process in the GenAI Observability dashboard.</span>  

In [4]:
!opentelemetry-instrument python cyberwizard_rag_agent.py --session-id 1100 --question "What is the main technique used for bruteforce attack?"

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:AWS X-Ray tracing configured (region=us-east-1, service=strands-cyber-rag, sample_ratio=1.0)
INFO:__main__:S3 log archival enabled (bucket=cyberwizard-rag-agent, prefix=cloudwatch-export)
INFO:root:Session ID '1100' attached to telemetry context
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:Successfully initialized Bedrock model: global.anthropic.claude-haiku-4-5-20251001-v1:0 in region: us-east-1
INFO:strands.telemetry.metrics:Creating Strands MetricsClient

Tool #1: mitre_attack_search
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
Based on the MITRE ATT&CK framework, the main technique used for brute force attacks is **T1110 - Brute Force**.

## Key Details:

**Technique Name:** Brute Force (T1110)

**Definition:** Adversaries use brute force techniques to gain access to account

In [5]:
!opentelemetry-instrument python cyberwizard_rag_agent.py --session-id 1100 --question "Can you check this file hash on VirusTotal and tell me if it’s malicious? The hash is 44d88612fea8a8f36de82e1278abb02f"

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:AWS X-Ray tracing configured (region=us-east-1, service=strands-cyber-rag, sample_ratio=1.0)
INFO:__main__:S3 log archival enabled (bucket=cyberwizard-rag-agent, prefix=cloudwatch-export)
INFO:root:Session ID '1100' attached to telemetry context
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:Successfully initialized Bedrock model: global.anthropic.claude-haiku-4-5-20251001-v1:0 in region: us-east-1
INFO:strands.telemetry.metrics:Creating Strands MetricsClient
I'll check that file hash on VirusTotal for you.
Tool #1: virustotal_lookup
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
## VirusTotal Lookup Result

**Hash:** 44d88612fea8a8f36de82e1278abb02f

**Status:** ⚠️ **FLAGGED AS MALICIOUS**

*

In [6]:
!opentelemetry-instrument python cyberwizard_rag_agent.py --session-id 1100 --question "Query the NVD database for the CVSS severity of CVE‑2021‑44228"

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:AWS X-Ray tracing configured (region=us-east-1, service=strands-cyber-rag, sample_ratio=1.0)
INFO:__main__:S3 log archival enabled (bucket=cyberwizard-rag-agent, prefix=cloudwatch-export)
INFO:root:Session ID '1100' attached to telemetry context
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:__main__:Successfully initialized Bedrock model: global.anthropic.claude-haiku-4-5-20251001-v1:0 in region: us-east-1
INFO:strands.telemetry.metrics:Creating Strands MetricsClient

Tool #1: nvd_vulnerability_search
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
## CVE-2021-44228 CVSS Severity Report

**Source:** National Vulnerability Database (NVD)

### Key Findings:

**CVE-2021-44228 Severity: CRITICAL**
- **CVSS

## 7. Gen AI Observability Dashboard Understanding the Traces in AWS CloudWatch

<span style="font-size:18px;">Once the Strands agent runs with OpenTelemetry instrumentation, one can visualize and analyze the traces in AWS CloudWatch dashboard.</span> 

Navigate to Bedrock Agentcore and click on the Agent you just created.

#### OverView Page:

![image.png](../Images/cloudwatch-1.png)


#### Trace View Page:
Trace View:

![image.png](../Images/cloudwatch-2.png)


Trace details:

![image.png](../Images/cloudwatch-3.png)



## 9. Evaluation of the results using Judge-LLM  

<span style="font-size:18px;">Here we set up an evaluation pipeline using **DeepEval** framework which will:</span>  
  
- <span style="font-size:18px;">Extract the agent response and tools used from CloudWatch observability data.</span>  

- <span style="font-size:18px;">If the query involves searching the MITRE ATT&CK knowledgebase:</span>  

    - <span style="font-size:18px;">Evaluate the Correctness of the Tool Call.</span>  

    - <span style="font-size:18px;">Evaluate the result using the RAGAS Metrics Faithfulness, Context Relevance and Answer Relevance.</span>   



![image.png](../Images/deepeval-ragas-1.png)

![image.png](../Images/deepeval-tooluse-1.png)

In [None]:
%%writefile llm_as_a_judge_evaluator_agent.py
from __future__ import annotations

import argparse
import json
import os
from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple

from dotenv import load_dotenv

try:
    from tabulate import tabulate
except ImportError:  # pragma: no cover - optional dependency
    tabulate = None

from deepeval.metrics import (
    AnswerRelevancyMetric,
    ContextualRelevancyMetric,
    FaithfulnessMetric,
    ToolUseMetric
)

try:
    from deepeval import evaluate as deepeval_evaluate
except ImportError:  # pragma: no cover - optional dependency
    deepeval_evaluate = None
from deepeval.test_case import ConversationalTestCase, LLMTestCase
from deepeval.test_case.conversational_test_case import Turn
from deepeval.test_case.llm_test_case import ToolCall

SKIP_TEXT_KEYS = {"toolUseId", "status", "role", "finish_reason", "id", "name"}
CaseBundle = Tuple[str, LLMTestCase, Dict[str, object], ConversationalTestCase]


def _iter_table_objects(log_path: Path) -> Iterable[Dict]:
    """Yield OTEL log dicts parsed from CloudWatch table exports."""
    decoder = json.JSONDecoder()
    text = log_path.read_text(encoding="utf-8")
    length = len(text)
    idx = 0
    while idx < length:
        brace = text.find("{", idx)
        if brace == -1:
            break
        try:
            payload, end = decoder.raw_decode(text, brace)
        except json.JSONDecodeError:
            idx = brace + 1
            continue
        if isinstance(payload, dict) and "resource" in payload and "timeUnixNano" in payload:
            yield payload
            idx = end
        else:
            idx = brace + 1


def iter_log_objects(log_path: Path) -> Iterable[Dict]:
    """Iterate JSON log objects from a trace file with table fallback."""
    text = log_path.read_text(encoding="utf-8")
    try:
        payload = json.loads(text)
    except json.JSONDecodeError:
        yield from _iter_table_objects(log_path)
        return

    if isinstance(payload, list):
        for item in payload:
            if isinstance(item, dict):
                yield item
    elif isinstance(payload, dict):
        yield payload


def extract_texts(node) -> List[str]:
    """Collect unique text snippets from nested payload structures."""
    seen: Set[str] = set()
    texts: List[str] = []

    def _walk(value, key_hint: str | None = None) -> None:
        if isinstance(value, str):
            stripped = value.strip()
            if not stripped or key_hint in SKIP_TEXT_KEYS:
                return
            if stripped.startswith(("{", "[")):
                try:
                    parsed = json.loads(stripped)
                except json.JSONDecodeError:
                    pass
                else:
                    _walk(parsed, key_hint)
                    return
            if stripped not in seen:
                seen.add(stripped)
                texts.append(stripped)
        elif isinstance(value, dict):
            for key, child in value.items():
                if key in SKIP_TEXT_KEYS:
                    continue
                _walk(child, key)
        elif isinstance(value, list):
            for item in value:
                _walk(item, key_hint)

    _walk(node)
    return texts


def extract_tool_names(node) -> Set[str]:
    """Collect tool names referenced within nested payloads."""
    names: Set[str] = set()

    def _walk(value) -> None:
        if isinstance(value, dict):
            if "toolUse" in value and isinstance(value["toolUse"], dict):
                maybe = value["toolUse"].get("name")
                if isinstance(maybe, str) and maybe:
                    names.add(maybe)
            if "function" in value and isinstance(value["function"], dict):
                maybe = value["function"].get("name")
                if isinstance(maybe, str) and maybe:
                    names.add(maybe)
            if "name" in value and any(k in value for k in {"arguments", "input_parameters"}):
                maybe = value.get("name")
                if isinstance(maybe, str) and maybe:
                    names.add(maybe)
            for child in value.values():
                _walk(child)
        elif isinstance(value, list):
            for item in value:
                _walk(item)

    _walk(node)
    return names


def _ingest_strands_tracer_event(details: Dict[str, object], body: Dict) -> None:
    """Merge strands telemetry tracer messages into trace details."""
    messages = body.get("input", {}).get("messages", [])
    for message in messages:
        role = message.get("role")
        texts = extract_texts(message)
        if role == "user" and texts:
            if not details["user"]:
                details["user"] = texts[0]
        elif role == "tool":
            for ctx in texts:
                if ctx not in details["contexts"]:
                    details["contexts"].append(ctx)
        details["tool_names"].update(extract_tool_names(message))

    outputs = body.get("output", {}).get("messages", [])
    for message in outputs:
        role = message.get("role")
        texts = extract_texts(message)
        if role == "assistant" and texts:
            details["answer"] = texts[-1]
        elif role == "tool":
            for ctx in texts:
                if ctx not in details["contexts"]:
                    details["contexts"].append(ctx)
        details["tool_names"].update(extract_tool_names(message))


def collect_trace_data(log_path: Path) -> Dict[str, Dict[str, object]]:
    """Aggregate trace entries into user, answer, context, and tool data."""
    traces: Dict[str, Dict[str, object]] = defaultdict(
        lambda: {
            "user": None,
            "answer": None,
            "contexts": [],
            "tool_names": set(),
            "source_files": {log_path.name},
        }
    )

    for entry in iter_log_objects(log_path):
        trace_id = entry.get("traceId") or entry.get("attributes", {}).get("otelTraceID")
        if not trace_id or trace_id in {"0", ""}:
            continue
        event_name = entry.get("attributes", {}).get("event.name")
        body = entry.get("body")
        details = traces[trace_id]
        details["source_files"].add(log_path.name)
        if event_name == "gen_ai.user.message":
            texts = extract_texts(body)
            if texts and not details["user"]:
                details["user"] = texts[0]
        elif event_name == "gen_ai.choice":
            texts = extract_texts(body)
            if texts:
                details["answer"] = texts[-1]
        elif event_name == "gen_ai.assistant.message":
            texts = extract_texts(body)
            if texts:
                details["answer"] = texts[-1]
        elif event_name == "gen_ai.tool.message":
            for ctx in extract_texts(body):
                if ctx not in details["contexts"]:
                    details["contexts"].append(ctx)
        elif event_name == "strands.telemetry.tracer" and isinstance(body, dict):
            _ingest_strands_tracer_event(details, body)
        if body:
            details["tool_names"].update(extract_tool_names(body))

    return traces


def build_cases(log_path: Path) -> List[CaseBundle]:
    """Construct DeepEval test cases from collected trace data."""
    traces = collect_trace_data(log_path)
    bundles: List[CaseBundle] = []
    for trace_id, info in traces.items():
        if not info["user"] or not info["answer"]:
            continue
        retrieval_context = info["contexts"] or []
        tool_calls = [ToolCall(name=name) for name in sorted(info["tool_names"])]
        case = LLMTestCase(
            input=info["user"],
            actual_output=info["answer"],
            retrieval_context=retrieval_context,
            tools_called=tool_calls,
            additional_metadata={
                "trace_id": trace_id,
                "source_files": sorted(info["source_files"]),
            },
        )
        # Tool correctness metrics require an expected tool plan; default to observed tools.
        case.expected_tools = tool_calls  # type: ignore[attr-defined]

        turns = [
            Turn(role="user", content=info["user"], retrieval_context=retrieval_context or None),
            Turn(
                role="assistant",
                content=info["answer"],
                tools_called=tool_calls or None,
                retrieval_context=retrieval_context or None,
            ),
        ]
        conversational_case = ConversationalTestCase(
            turns=turns,
            context=retrieval_context or None,
            additional_metadata={
                "trace_id": trace_id,
                "source_files": sorted(info["source_files"]),
            },
        )

        bundles.append((trace_id, case, info, conversational_case))
    return bundles


def evaluate_rag(cases: List[CaseBundle]) -> None:
    """Measure RAG quality metrics for the provided cases."""
    eval_model_name = os.getenv("DEEPEVAL_MODEL_NAME", "gpt-4o-mini")
    metrics = [
        ("Faithfulness", FaithfulnessMetric(model=eval_model_name)),
        ("Answer Relevance", AnswerRelevancyMetric(model=eval_model_name)),
        ("Context Relevance", ContextualRelevancyMetric(model=eval_model_name)),
    ]
    rows = []
    for trace_id, case, info, _ in cases:
        row = {
            "trace": trace_id[:8],
            "tools": ", ".join(sorted(info["tool_names"])) or "None",
            "contexts": len(info["contexts"]),
        }
        for label, metric in metrics:
            try:
                score = metric.measure(case, _show_indicator=False)
            except Exception as exc:  # pragma: no cover - network / LLM issues
                score = float("nan")
                print(f"[WARN] {label} failed for trace {trace_id}: {exc}")
            row[label] = score
        rows.append(row)
    if tabulate:
        print(tabulate(rows, headers="keys", floatfmt=".3f"))
    else:
        for row in rows:
            print(row)


def evaluate_tool_usage(cases: List[CaseBundle]) -> None:
    """Summarize tool usage and score it with the ToolUseMetric."""
    tool_use_metric = None
    if ToolUseMetric:
        try:
            threshold = float(os.getenv("TOOL_USE_THRESHOLD", "0.5"))
        except ValueError:
            threshold = 0.5
        available_tools_env = os.getenv("TOOL_USE_AVAILABLE_TOOLS")
        if available_tools_env:
            available_tools = [name.strip() for name in available_tools_env.split(",") if name.strip()]
        else:
            available_tools = [
                "mitre_attack_search",
                "virustotal_lookup",
                "nvd_vulnerability_search",
            ]
        tool_use_metric = ToolUseMetric(
            available_tools=available_tools,
            threshold=threshold,
        )
    else:
        print("[INFO] ToolUseMetric unavailable; install deepeval with agentic extras")

    rows = []
    tool_frequency: Dict[str, int] = defaultdict(int)
    conversational_cases = []
    for trace_id, case, info, conv_case in cases:
        names = sorted(info["tool_names"])
        for name in names:
            tool_frequency[name] += 1
        row = {
            "trace": trace_id[:8],
            "tool_count": len(names),
            "tools": ", ".join(names) or "None",
        }
        if tool_use_metric:
            conversational_cases.append(conv_case)
            try:
                row["Tool Use"] = tool_use_metric.measure(conv_case, _show_indicator=False)
            except Exception as exc:  # pragma: no cover - LLM/network failures
                row["Tool Use"] = float("nan")
                print(f"[WARN] Tool Use metric failed for trace {trace_id}: {exc}")
        rows.append(row)

    if tabulate:
        print(tabulate(rows, headers="keys", floatfmt=".3f"))
    else:
        for row in rows:
            print(row)

    if tool_frequency:
        print("\nAggregated tool usage:")
        for name, count in sorted(tool_frequency.items(), key=lambda item: (-item[1], item[0])):
            print(f"- {name}: {count} trace(s)")
    else:
        print("\nNo tool usage captured in the provided trace data.")

    if tool_use_metric and deepeval_evaluate and conversational_cases:
        try:
            deepeval_evaluate(
                test_cases=conversational_cases,
                metrics=[tool_use_metric],
                show_indicator=False,
            )
        except TypeError:
            # Older deepeval versions may not support keyword args.
            try:
                deepeval_evaluate(conversational_cases, [tool_use_metric])
            except Exception as exc:  # pragma: no cover
                print(f"[WARN] Unable to run deepeval.evaluate: {exc}")
        except Exception as exc:  # pragma: no cover - network/LLM issues
            print(f"[WARN] ToolUseMetric evaluation failed: {exc}")
    elif tool_use_metric and not conversational_cases:
        print("[INFO] No conversational test cases available for ToolUseMetric evaluation.")
    elif tool_use_metric:
        print("[INFO] deepeval.evaluate not available; skipped evaluator run")


def parse_args() -> argparse.Namespace:
    """Parse CLI arguments for the evaluator script."""
    parser = argparse.ArgumentParser(
        description="Evaluate Strands agent traces using DeepEval metrics."
    )
    parser.add_argument(
        "log_path",
        help="Path to the OTEL trace log file (JSON or CloudWatch table export).",
    )
    parser.add_argument(
        "mode",
        choices=["rag", "tool"],
        help="Select 'rag' for RAG quality metrics or 'tool' for tool usage summaries.",
    )
    return parser.parse_args()


def main() -> None:
    """Entrypoint that loads configuration and runs the selected evaluation."""
    load_dotenv(override=False)
    args = parse_args()
    log_path = Path(args.log_path).expanduser()
    if not log_path.exists():
        raise FileNotFoundError(f"Trace file not found: {log_path}")
    cases = build_cases(log_path)
    if not cases:
        print("No completed traces found to evaluate.")
        return
    if args.mode == "rag":
        evaluate_rag(cases)
    else:
        evaluate_tool_usage(cases)


if __name__ == "__main__":
    main()

Writing llm_as_a_judge_evaluator_agent.py


In [7]:
!python llm_as_a_judge_evaluator_agent.py ../Data-Agent-Rag/otel_traces_1.json rag

trace     tools                  contexts    Faithfulness    Answer Relevance    Context Relevance
--------  -------------------  ----------  --------------  ------------------  -------------------
6977d1df  mitre_attack_search           2           1.000               1.000                0.625


In [4]:
!export TOOL_USE_AVAILABLE_TOOLS="mitre_attack_search,virustotal_lookup,nvd_vulnerability_search"

In [6]:
!python llm_as_a_judge_evaluator_agent.py ../Data-Agent-Rag/otel_traces_1.json tool

trace       tool_count  tools                Tool Use
--------  ------------  -------------------  ----------
6977d1df             1  mitre_attack_search

Aggregated tool usage:
- mitre_attack_search: 1 trace(s)
✨ You're running DeepEval's latest [35mTool Use Metric[0m! [1;90m([0m[90musing gpt-[0m[1;90m4.1[0m[90m, [0m
[90mstrict[0m[90m=[0m[3;90mFalse[0m[90m, [0m[90masync_mode[0m[90m=[0m[3;90mTrue[0m[1;90m)[0m[90m...[0m
[2KEvaluating 1 test case(s) in parallel [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [96m  0%[0m [94m0:00:00[0m0m
[2K[1A[2KEvaluating 1 test case(s) in parallel [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [96m  0%[0m [94m0:00:00[0m
[2K[1A[2KEvaluating 1 test case(s) in parallel [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [96m  0%[0m [94m0:00:00[0m
[2K[1A[2KEvaluating 1 test case(s) in parallel [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [96m  0%[0m [94m0:00:00[0m
[2K[1A[2KEvaluating 1 test case(s) in parallel [90m━━━━━━━━━━━━━━━━━━━━

## 10. Conclusion 

Congratulations you implemented and instrumented a Strands Agent SDK with Amazon Bedrock Model which has observability through Amazon CloudWatch.

- Strands travel agent.
- Full OpenTelemetry tracing
- Traces for Amazon Bedrock calls, Strands operations, etc.
- Service name: agentic-travel-agent 

## 11. Next Steps

Now that you have CrewAI with OpenTelemetry set up, you can:

1. **Add More Agents**: Create a multi-agent architectures with different patterns
2. **Add Tools to your agent**: Integrate search tools, API tools, or custom tools
3. **[Set Up Alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)**: Create alarms on the metrics that are important to your business like `latency`, `token input`, and `token output` etc..
