# Multi-agent RAG workflow for R&D (battery tech example).

## Agents:
  1. Planner Agent  – frames the R&D question and decides retrieval queries.
  2. Retrieval Agent – semantic search over a small 'research KB' (RAG).
  3. Writer Agent   – writes a grounded R&D brief using retrieved docs.

Prereqs:

    `pip install openai numpy`

Environment:

    `export OPENAI_API_KEY="sk-..."`

Replace the KB_DOCS list with your own internal R&D docs / notes to adapt.

In [None]:
import os
import json
from typing import List, Dict
import numpy as np
from openai import OpenAI

In [None]:
# -------------------- OpenAI setup --------------------

#client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
from google.colab import userdata
client = OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

EMBEDDING_MODEL = "text-embedding-3-small"
CHAT_MODEL = "gpt-4.1-mini"

In [None]:
# -------------------- 0. Tiny R&D knowledge base --------------------
# In real life, replace this with:
# - parsed PDFs of papers
# - internal lab reports
# - patent abstracts
# etc.

KB_DOCS = [
    {
        "id": "paper1",
        "title": "Solid-state lithium metal batteries for EVs",
        "text": (
            "Solid-state lithium metal batteries offer high energy density, "
            "but face challenges with dendrite formation and interface stability. "
            "Recent progress has focused on sulfide-based solid electrolytes and "
            "interface engineering to improve cycling performance and safety."
        ),
    },
    {
        "id": "paper2",
        "title": "Sodium-ion batteries for low-cost energy storage",
        "text": (
            "Sodium-ion batteries provide lower energy density than lithium-ion, "
            "but benefit from abundant raw materials and lower cost. "
            "Hard carbon anodes and layered oxide cathodes are commonly studied, "
            "with applications in stationary storage and low-cost EV segments."
        ),
    },
    {
        "id": "paper3",
        "title": "Lithium-sulfur batteries: opportunities and challenges",
        "text": (
            "Lithium-sulfur batteries promise very high theoretical energy density. "
            "Key challenges include polysulfide shuttle, poor cycle life, and "
            "volume expansion. Research directions involve novel cathode hosts, "
            "electrolyte formulations, and protective interlayers."
        ),
    },
    {
        "id": "internal1",
        "title": "Internal lab report: solid-state pouch cell prototype",
        "text": (
            "Our 2024 solid-state pouch cell prototypes achieved 350 Wh/kg at cell level "
            "with acceptable safety performance. However, cycle life was limited to "
            "300 cycles at 80% capacity retention due to interface degradation."
        ),
    },
    {
        "id": "internal2",
        "title": "Internal cost study: sodium-ion vs LFP",
        "text": (
            "A cost comparison showed sodium-ion cells could be 20-30% cheaper than LFP "
            "for similar cycle life in low-range EVs. Main risks are supply chain maturity "
            "and limited field data for automotive qualification."
        ),
    },
]

In [None]:
# -------------------- 1. Vector store utilities (Retrieval Agent core) --------------------

def get_embedding(text: str) -> List[float]:
    """Call OpenAI embeddings API."""
    resp = client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=text,
    )
    return resp.data[0].embedding

print("Building vector store for R&D KB...")
KB_EMBEDDINGS = np.array([get_embedding(doc["text"]) for doc in KB_DOCS])
KB_IDS = [doc["id"] for doc in KB_DOCS]
print(f"Vector store ready with {len(KB_DOCS)} documents.\n")


def search_knowledge_base(query: str, k: int = 3) -> List[Dict]:
    """Simple cosine similarity search over KB_DOCS."""
    q_emb = np.array(get_embedding(query))

    doc_norms = np.linalg.norm(KB_EMBEDDINGS, axis=1)
    q_norm = np.linalg.norm(q_emb)
    sims = KB_EMBEDDINGS @ q_emb / (doc_norms * q_norm + 1e-8)

    top_idx = sims.argsort()[-k:][::-1]

    results = []
    for i in top_idx:
        doc = KB_DOCS[i]
        results.append(
            {
                "id": doc["id"],
                "title": doc["title"],
                "score": float(sims[i]),
                "text": doc["text"],
            }
        )
    return results


def format_retrieval_results(results: List[Dict]) -> str:
    """Turn retrieved docs into a context block for the Writer Agent."""
    lines = []
    for r in results:
        lines.append(f"[{r['id']}] {r['title']} (score={r['score']:.3f})")
        lines.append(r["text"])
        lines.append("")  # blank line
    return "\n".join(lines)

In [None]:
# -------------------- 2. Planner Agent --------------------

PLANNER_SYSTEM_PROMPT = """
You are the Planner Agent in an R&D assistant focused on next-generation battery technologies for EVs.

Your job:
- Understand the user's high-level R&D question.
- Break it into more concrete sub-questions or evaluation dimensions.
- Decide what to retrieve from the R&D knowledge base.
- Propose an answer structure for the final report.

You MUST respond ONLY in this JSON format (no extra text):

{
  "retrieval_queries": ["..."],
  "answer_plan": "..."
}

Where:
- retrieval_queries: 2-5 short search queries for the knowledge base.
- answer_plan: 3-6 sentences describing how the final answer should be structured
  (e.g., compare options by energy density, safety, cost, technology readiness).
"""

def call_planner_agent(user_question: str) -> Dict:
    messages = [
        {"role": "system", "content": PLANNER_SYSTEM_PROMPT},
        {"role": "user", "content": user_question},
    ]
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        messages=messages,
        temperature=0.2,
    )

    content = resp.choices[0].message.content.strip()
    print("=== PLANNER AGENT OUTPUT ===")
    print(content)
    print("================================\n")

    try:
        data = json.loads(content)
    except json.JSONDecodeError:
        # Fallback if model deviates from JSON
        data = {
            "retrieval_queries": [user_question],
            "answer_plan": "Summarize key chemistries, compare pros/cons, and recommend 1-2 focus areas."
        }
    return data

In [None]:
# -------------------- 3. Retrieval Agent (Python RAG) --------------------

def call_retrieval_agent(retrieval_queries: List[str], k_per_query: int = 3) -> List[Dict]:
    """
    The 'Retrieval Agent' is implemented as Python logic over embeddings.
    It:
      - takes planner's retrieval_queries
      - performs semantic search for each
      - merges and de-duplicates results
    """
    all_results: Dict[str, Dict] = {}

    for q in retrieval_queries:
        print(f"Retrieval Agent: searching for query -> {q!r}")
        results = search_knowledge_base(q, k=k_per_query)
        for r in results:
            doc_id = r["id"]
            # Keep the best score per document
            if doc_id not in all_results or r["score"] > all_results[doc_id]["score"]:
                all_results[doc_id] = r

    merged_results = list(all_results.values())
    print("\n=== RETRIEVAL AGENT MERGED RESULTS ===")
    for r in merged_results:
        print(f"- {r['id']} | {r['title']} | score={r['score']:.3f}")
    print("=======================================\n")

    return merged_results

In [None]:
# -------------------- 4. Writer Agent --------------------

WRITER_SYSTEM_PROMPT = """
You are the Writer Agent in an R&D assistant for next-generation battery technologies for EVs.

You receive:
- The original user question.
- An answer plan created by the Planner Agent.
- Retrieved R&D snippets with IDs like [paper1], [paper2], [internal1].

Your job:
- Follow the answer plan.
- Use the retrieved knowledge as the primary source of truth.
- Write a concise R&D-style briefing for a technical audience (engineers, scientists).
- Clearly compare options (e.g., solid-state, sodium-ion, lithium-sulfur).
- Highlight key advantages, challenges, and technology readiness.
- End with 2–3 concrete recommendations for which chemistries to prioritize.
- Cite document IDs like [paper1], [internal1] where appropriate.

If something is not covered by the documents, say so explicitly instead of guessing.
"""

def call_writer_agent(user_question: str, answer_plan: str, retrieved_context: str) -> str:
    messages = [
        {"role": "system", "content": WRITER_SYSTEM_PROMPT},
        {
            "role": "user",
            "content": (
                f"User question:\n{user_question}\n\n"
                f"Answer plan from Planner Agent:\n{answer_plan}\n\n"
                f"Retrieved R&D knowledge:\n{retrieved_context}"
            ),
        },
    ]
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        messages=messages,
        temperature=0.3,
    )
    final_answer = resp.choices[0].message.content.strip()

    print("=== WRITER AGENT OUTPUT ===")
    print(final_answer)
    print("================================\n")

    return final_answer

In [None]:
# -------------------- 5. Orchestrator: Multi-Agent R&D RAG Pipeline --------------------

def run_multi_agent_rnd_rag(user_question: str) -> str:
    """
    High-level orchestration for the R&D assistant:
      1. Planner Agent  -> retrieval_queries + answer_plan
      2. Retrieval Agent -> relevant R&D docs
      3. Writer Agent   -> final R&D brief
    """
    # 1) Planner
    planner_output = call_planner_agent(user_question)
    retrieval_queries = planner_output.get("retrieval_queries", [user_question])
    answer_plan = planner_output.get("answer_plan", "Summarize options and recommend priorities.")

    # 2) Retrieval
    retrieved_docs = call_retrieval_agent(retrieval_queries, k_per_query=3)
    context_block = format_retrieval_results(retrieved_docs)

    # 3) Writer
    final_answer = call_writer_agent(user_question, answer_plan, context_block)
    return final_answer

In [None]:
# -------------------- 6. Demo --------------------

if __name__ == "__main__":
    question = (
        "For EV applications in the next 5–8 years, which next-generation battery chemistries "
        "should our R&D team prioritize, and why?"
    )
    answer = run_multi_agent_rnd_rag(question)
    print("\n=== FINAL ANSWER (RETURNED TO USER) ===")
    print(answer)