# Episode 03 – Evaluating WattBot RAG with Amazon Bedrock

In this episode, we re-run the WattBot evaluation pipeline using a **hosted LLM on Amazon Bedrock** instead of:

- a powerful notebook instance (Episode 01), or  
- a SageMaker Processing job (Episode 02).

The **retrieval and evaluation logic stays the same**. The only major change is how we generate answers:

- Before: we loaded an open‑source model (Qwen) on our own GPU instance.
- Now: we send prompts to a managed model on Amazon Bedrock (for example, Claude 3 Haiku or Sonnet) and pay **per token**.

This gives you a template for plugging WattBot (or other RAG systems) into hosted frontier models.


## Why use a hosted LLM for WattBot?

Amazon Bedrock exposes popular, high‑capacity models from providers such as **Anthropic (Claude)**, **Meta (Llama)**, **Mistral**, **Cohere**, and **Amazon Titan**.  
From a WattBot perspective, the pattern is similar to using other hosted APIs (including OpenAI APIs): you send a prompt and receive a completion, and you are **billed per token**, not per GPU‑hour.

There are real trade‑offs here:

- **Pros of hosted LLMs (Bedrock / OpenAI‑style APIs)**  
  - You can use state‑of‑the‑art models without provisioning or patching GPU instances.  
  - Scaling up/down is handled for you by the provider.  
  - For **small or one‑off evaluations** (like a single WattBot run over a modest question set), token‑based pricing is often cheaper and much simpler to budget.

- **Pros of running your own model on GPU instances (Episodes 01–02)**  
  - You have full control over which model you run (including custom fine‑tunes).  
  - If you run **many large batch jobs** or keep GPUs busy most of the time, paying by instance‑hour can be comparable or cheaper than token‑based APIs.  
  - There are no provider‑level rate limits beyond what your infrastructure can handle.

In practice, the choice depends on:

- How many questions you need to answer.  
- How large the model is and how many tokens you expect per question.  
- How often you will repeat this evaluation.  
- Whether your team is comfortable managing GPU infrastructure.

In this notebook we’ll keep the **RAG evaluation strategy identical** to earlier episodes and simply swap in a Bedrock model for generation.


## Setup: libraries, configuration, and Bedrock client

This notebook assumes you:

- Already ran Episodes 01–02 and uploaded the following artifacts to S3:
  - `wattbot_chunks.jsonl` (RAG chunks)
  - `embeddings.npy` (chunk embeddings)
  - `train_QA.csv` (WattBot training questions)
  - `metadata.csv` (document‑level metadata)
- Have **Amazon Bedrock** enabled in your account and an Anthropic Claude 3 model available.

You can adjust the S3 keys and model ID below to match your environment.


In [None]:
import os
import json
from typing import Dict, Any, List

import boto3
import pandas as pd
import numpy as np

from sentence_transformers import SentenceTransformer
from botocore.exceptions import ClientError

# ---- AWS configuration ----

region = "us-east-1"  # Update if needed

# Claude 3 Haiku is a good starting point for batch evaluation.
# Swap for Sonnet/Opus if you have access and want higher quality.
bedrock_model_id = "anthropic.claude-3-haiku-20240307-v1:0"

# S3 bucket + keys where Episode 02 wrote the artifacts.
# TODO: Update these keys to match your pipeline.
bucket_name = "chris-rag"  # <-- change to your bucket
chunks_key = "wattbot/chunks/wattbot_chunks.jsonl"
embeddings_key = "wattbot/embeddings/embeddings.npy"
train_key = "wattbot/train/train_QA.csv"
metadata_key = "wattbot/metadata/metadata.csv"

# Local working directory for downloaded artifacts
local_data_dir = "data"
os.makedirs(local_data_dir, exist_ok=True)

# AWS clients
s3 = boto3.client("s3", region_name=region)
bedrock_runtime = boto3.client("bedrock-runtime", region_name=region)


In [None]:
def download_from_s3(key: str, local_name: str) -> str:
    """Download a file from S3 to local_data_dir and return the local path."""
    local_path = os.path.join(local_data_dir, local_name)
    print(f"Downloading s3://{bucket_name}/{key} -> {local_path}")
    s3.download_file(bucket_name, key, local_path)
    return local_path


chunks_path = download_from_s3(chunks_key, "wattbot_chunks.jsonl")
emb_path = download_from_s3(embeddings_key, "embeddings.npy")
train_qa_path = download_from_s3(train_key, "train_QA.csv")
metadata_path = download_from_s3(metadata_key, "metadata.csv")

# Load artifacts
with open(chunks_path, "r", encoding="utf-8") as f:
    chunked_docs = [json.loads(line) for line in f]

chunk_embeddings = np.load(emb_path)
train_df = pd.read_csv(train_qa_path)

# Robust metadata load: handle possible non-UTF-8 characters
try:
    metadata_df = pd.read_csv(metadata_path)
except UnicodeDecodeError:
    metadata_df = pd.read_csv(metadata_path, encoding="latin1")

print(f"Chunks: {len(chunked_docs)}")
print(f"Train QAs: {len(train_df)}")
print("Embeddings shape:", chunk_embeddings.shape)


In [None]:
# Build doc_id -> url mapping from metadata
docid_to_url: Dict[str, str] = {}
for _, row in metadata_df.iterrows():
    doc_id = str(row.get("id", "")).strip()
    url = row.get("url", "")
    if doc_id and isinstance(url, str) and url.strip():
        docid_to_url[doc_id] = url.strip()

print(f"Metadata doc URLs: {len(docid_to_url)} entries")

# Load the same embedding model we used earlier
embedding_model_id = "thenlper/gte-large"
embedder = SentenceTransformer(embedding_model_id)


In [None]:
# ---------------------- similarity + retrieval ----------------------

def cosine_similarity_matrix(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    """Cosine similarity between two sets of vectors."""
    a_norm = a / np.linalg.norm(a, axis=1, keepdims=True)
    b_norm = b / np.linalg.norm(b, axis=1, keepdims=True)
    return np.matmul(a_norm, b_norm.T)


def retrieve_top_k(
    query_embedding: np.ndarray,
    chunk_embeddings: np.ndarray,
    chunked_docs: List[Dict[str, Any]],
    k: int = 8,
) -> List[Dict[str, Any]]:
    """Return the top–k chunks for a single query embedding."""
    query = query_embedding.reshape(1, -1)
    sims = cosine_similarity_matrix(query, chunk_embeddings)[0]

    top_idx = np.argsort(-sims)[:k]

    results = []
    for idx in top_idx:
        ch = chunked_docs[idx]
        results.append(
            {
                "score": float(sims[idx]),
                "text": ch["text"],
                "doc_id": ch.get("doc_id", ""),
                "title": ch.get("title", ""),
                "url": ch.get("url", ""),
                "page_num": ch.get("page_num", None),
                "page_label": ch.get("page_label", None),
            }
        )
    return results


def format_context_for_prompt(retrieved_chunks: List[Dict[str, Any]]) -> str:
    """Turn retrieved chunk dicts into a compact context string for the LLM."""
    lines = []
    for i, ch in enumerate(retrieved_chunks, start=1):
        label = ch.get("doc_id", f"chunk_{i}")
        page = ch.get("page_label", ch.get("page_num", ""))
        header = f"[{label}, page {page}]".strip()
        txt = ch["text"].replace("\n", " ")
        lines.append(f"{header}: {txt}")
    return "\n".join(lines)


def retrieve_context_for_question(
    question: str,
    embedder: SentenceTransformer,
    chunk_embeddings: np.ndarray,
    chunked_docs: List[Dict[str, Any]],
    top_k: int = 8,
):
    q_emb = embedder.encode([question], convert_to_numpy=True, normalize_embeddings=True)[0]
    retrieved = retrieve_top_k(q_emb, chunk_embeddings, chunked_docs, k=top_k)
    return retrieved, q_emb


In [None]:
# ---------------------- answer normalization ----------------------

def normalize_answer_value(raw_value: str) -> str:
    """Normalize answer_value according to WattBot conventions."""
    if raw_value is None:
        return "is_blank"

    s = str(raw_value).strip()

    if not s or s.lower() == "none":
        return "is_blank"

    if s.startswith("[") and s.endswith("]"):
        return s

    if s.lower() == "is_blank":
        return "is_blank"

    # If there is whitespace, keep only the first token
    if " " in s:
        first, *_ = s.split()
        s = first

    # Remove commas
    s = s.replace(",", "")

    try:
        val = float(s)
        if val.is_integer():
            return str(int(val))
        return f"{val:.10g}"  # avoid scientific notation
    except ValueError:
        return s


## Calling Claude 3 on Amazon Bedrock

Next, we define a small helper that:

- Formats a request for the Claude 3 Messages API on Bedrock.
- Sends the request with `bedrock-runtime.invoke_model`.
- Returns the generated text string.

(The response also includes token usage; you can extend this function to track total
input/output tokens for cost estimation if you’d like.)


In [None]:
def call_bedrock_claude(
    system_prompt: str,
    user_prompt: str,
    model_id: str = bedrock_model_id,
    max_tokens: int = 512,
    temperature: float = 0.3,
) -> str:
    """Call an Anthropic Claude 3 model on Bedrock and return the text response."""
    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "temperature": temperature,
        "system": system_prompt,
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": user_prompt}],
            }
        ],
    }

    request = json.dumps(body)
    try:
        response = bedrock_runtime.invoke_model(modelId=model_id, body=request)
    except ClientError as e:
        print(f"ERROR calling Bedrock model {model_id}: {e}")
        raise

    model_response = json.loads(response["body"].read())
    text = model_response["content"][0]["text"]
    return text.strip()


In [None]:
# ---------------------- explanation helpers ----------------------

def build_explanation_prompt(question: str, answer: str, supporting_materials: str) -> str:
    return (
        "You are explaining answers for an energy, water, and carbon footprint assistant.\n\n"
        f"Question: {question}\n\n"
        f"Answer: {answer}\n\n"
        f"Supporting materials:\n{supporting_materials}\n\n"
        "In 1–3 sentences, explain how the supporting materials justify the answer. "
        "Be precise but concise."
    )


def explanation_system_prompt() -> str:
    return (
        "You are an AI assistant that explains how evidence supports answers about "
        "energy, water, and carbon footprint. Focus on clear, factual reasoning, "
        "and refer directly to the cited documents when appropriate."
    )


def bedrock_explanation_phase_for_question(
    qid: str,
    question: str,
    answer: str,
    supporting_materials: str,
    model_id: str = bedrock_model_id,
) -> str:
    sys_prompt = explanation_system_prompt()
    prompt = build_explanation_prompt(question, answer, supporting_materials)
    raw_explanation = call_bedrock_claude(
        system_prompt=sys_prompt,
        user_prompt=prompt,
        model_id=model_id,
        max_tokens=256,
    )
    return raw_explanation.strip()


# ---------------------- answer phase (JSON contract) ----------------------

def bedrock_answer_phase_for_question(
    qid: str,
    question: str,
    retrieved_chunks: List[Dict[str, Any]],
    model_id: str = bedrock_model_id,
):
    """Use Claude 3 on Bedrock to answer a single WattBot question given retrieved chunks."""
    context = format_context_for_prompt(retrieved_chunks)

    system_prompt = (
        "You are WattBot, a question-answering assistant for energy, water, and carbon footprint.\n"
        "You must answer questions using ONLY the provided context from scientific papers.\n"
        "If the context does not contain enough information to answer with high confidence,\n"
        "you must mark the question as unanswerable.\n\n"
        "You must respond with a single JSON object with the following keys:\n"
        "- answer: natural language answer, including numeric value and units if applicable.\n"
        "- answer_value: normalized numeric or categorical value with NO units or symbols;\n"
        "  use 'is_blank' if the question is unanswerable.\n"
        "- answer_unit: unit string (e.g., kWh, gCO2, %, is_blank).\n"
        "- ref_id: list of document IDs that support the answer.\n"
        "- is_blank: true if unanswerable, false otherwise.\n"
        "- supporting_materials: short quote or table/figure pointer from the context.\n"
    )

    user_prompt = (
        "Use the context below to answer the question. "
        "Return ONLY a JSON object, no extra commentary.\n\n"
        f"Question: {question}\n\n"
        f"Context:\n{context}\n"
    )

    raw_answer = call_bedrock_claude(
        system_prompt=system_prompt,
        user_prompt=user_prompt,
        model_id=model_id,
        max_tokens=512,
    )

    parsed = {
        "answer": "",
        "answer_value": "is_blank",
        "answer_unit": "is_blank",
        "ref_id": [],
        "is_blank": True,
        "supporting_materials": "is_blank",
    }

    try:
        first_brace = raw_answer.find("{")
        last_brace = raw_answer.rfind("}")
        if first_brace != -1 and last_brace != -1:
            json_str = raw_answer[first_brace : last_brace + 1]
        else:
            json_str = raw_answer

        candidate = json.loads(json_str)

        parsed["answer"] = candidate.get("answer", "").strip()
        parsed["answer_value"] = normalize_answer_value(candidate.get("answer_value", "is_blank"))
        parsed["answer_unit"] = str(candidate.get("answer_unit", "is_blank")).strip() or "is_blank"

        ref_id = candidate.get("ref_id", [])
        if isinstance(ref_id, str):
            ref_ids = [ref_id]
        elif isinstance(ref_id, list):
            ref_ids = [str(x).strip() for x in ref_id if x]
        else:
            ref_ids = []
        parsed["ref_id"] = ref_ids

        is_blank_flag = candidate.get("is_blank", False)
        parsed["is_blank"] = bool(is_blank_flag)

        supp = candidate.get("supporting_materials", "is_blank")
        parsed["supporting_materials"] = str(supp).strip() or "is_blank"

    except Exception as e:
        print(f"JSON parse error for question {qid}; defaulting to is_blank. Error: {e}")

    return (
        parsed["answer"],
        parsed["answer_value"],
        parsed["is_blank"],
        parsed["ref_id"],
        parsed["supporting_materials"],
    )


In [None]:
def run_single_qa_bedrock(
    row: pd.Series,
    embedder: SentenceTransformer,
    chunk_embeddings: np.ndarray,
    chunked_docs: List[Dict[str, Any]],
    docid_to_url: Dict[str, str],
    top_k: int = 8,
    retrieval_threshold: float = 0.25,
    model_id: str = bedrock_model_id,
) -> Dict[str, Any]:
    """Full RAG + Bedrock pipeline for a single question."""
    qid = row["id"]
    question = row["question"]

    retrieved, q_emb = retrieve_context_for_question(
        question=question,
        embedder=embedder,
        chunk_embeddings=chunk_embeddings,
        chunked_docs=chunked_docs,
        top_k=top_k,
    )

    top_score = retrieved[0]["score"] if retrieved else 0.0

    (
        answer,
        answer_value,
        is_blank_llm,
        ref_ids,
        supporting_materials,
    ) = bedrock_answer_phase_for_question(
        qid=qid,
        question=question,
        retrieved_chunks=retrieved,
        model_id=model_id,
    )

    is_blank = bool(is_blank_llm) or (top_score < retrieval_threshold)

    if is_blank:
        answer = "Unable to answer with confidence based on the provided documents."
        answer_value = "is_blank"
        answer_unit = "is_blank"
        ref_ids = []
        ref_id_str = "is_blank"
        ref_url_str = "is_blank"
        supporting_materials = "is_blank"
        explanation = ""
    else:
        answer_value = normalize_answer_value(answer_value)
        answer_unit = "is_blank"

        if isinstance(ref_ids, list) and ref_ids:
            ref_id_str = ";".join(ref_ids)
            urls = []
            for rid in ref_ids:
                url = docid_to_url.get(str(rid), "")
                if url:
                    urls.append(url)
            ref_url_str = ";".join(urls) if urls else "is_blank"
        else:
            ref_id_str = "is_blank"
            ref_url_str = "is_blank"

        explanation = bedrock_explanation_phase_for_question(
            qid=qid,
            question=question,
            answer=answer,
            supporting_materials=supporting_materials,
            model_id=model_id,
        )

    return {
        "id": qid,
        "question": question,
        "answer": answer,
        "answer_value": answer_value,
        "answer_unit": answer_unit,
        "ref_id": ref_id_str,
        "ref_url": ref_url_str,
        "supporting_materials": supporting_materials,
        "explanation": explanation,
    }


## Run the WattBot evaluation with Bedrock

Now we can loop over all questions in `train_QA.csv`, run retrieval + Bedrock
generation, and write a `wattbot_solutions_bedrock.csv` file.

This mirrors the logic from Episode 02 – the only difference is that the answer
and explanation phases call a hosted Claude 3 model instead of a local Qwen model.


In [None]:
results = []

# For quick smoke tests, you can slice train_df (e.g., train_df.head(5))
for _, row in train_df.iterrows():
    out = run_single_qa_bedrock(
        row=row,
        embedder=embedder,
        chunk_embeddings=chunk_embeddings,
        chunked_docs=chunked_docs,
        docid_to_url=docid_to_url,
        top_k=8,
        retrieval_threshold=0.25,
        model_id=bedrock_model_id,
    )
    results.append(out)

results_df = pd.DataFrame(results)

output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "wattbot_solutions_bedrock.csv")

results_df.to_csv(output_path, index=False)
print(f"Wrote predictions to {output_path}")

results_df.head()


## Wrap‑up: comparing Bedrock to GPU‑based runs

At this point you should have three versions of the WattBot evaluation:

1. **Episode 01 – Notebook GPU instance** using a locally loaded open‑source model.  
2. **Episode 02 – SageMaker Processing job** running the same model in batch.  
3. **Episode 03 – Bedrock** using a hosted Claude 3 model with per‑token billing.

When deciding between these options in practice:

- Use **Bedrock or other hosted APIs** when:
  - You want to try the latest frontier models quickly.  
  - You only need to run a modest number of questions, or you are still prototyping.  
  - You prefer a simple, token‑based cost model and don’t want to manage GPU capacity.

- Use **self‑hosted models on GPU instances** when:
  - You expect to run large batches repeatedly (e.g., many thousands of questions).  
  - You want tight control over which architectures/checkpoints you run or fine‑tune.  
  - You already have institutional access to cost‑effective on‑prem or cloud GPUs.

The core **RAG evaluation logic stays identical** across all three episodes, which is the main takeaway:
once you have a clean retrieval + normalization pipeline (like WattBot’s), swapping out the generator
is mostly a matter of re‑implementing `answer_phase_for_question` and `explanation_phase_for_question`
for each compute option you care about.
