# 03 — Pipeline E2E: Decomposer → Planner → Retriever → Grader

Run the first 4 nodes of the S4/S6 pipeline on a claim and inspect inputs/outputs at each step.

**Nodes covered:**
1. **Claim Decomposer** (function) — entities, PICO, sub-claims
2. **Retrieval Planner** (ReAct agent) — method selection per sub-claim
3. **Evidence Retriever** (ReAct agent) — multi-source search, reranking, dedup
4. **Evidence Grader** (ReAct agent) — study type, methodology, relevance, GRADE

Each step shows the state delta — what the node added to the pipeline state.

In [None]:
import sys
sys.path.insert(0, '..')

import warnings
warnings.filterwarnings('ignore')

import json
import pandas as pd
from IPython.display import display, Markdown

from src.models import FactCheckState
from src.functions.decomposer import run_decomposer
from systems.s4_langgraph.agents.retrieval_planner import run_retrieval_planner
from systems.s4_langgraph.agents.evidence_retriever import run_evidence_retriever
from systems.s4_langgraph.agents.evidence_grader import run_evidence_grader

In [None]:
# Choose a claim to test
CLAIM = "The MMR vaccine causes autism in children"

# Build initial pipeline state
state: FactCheckState = {
    "claim": CLAIM,
    "pico": None,
    "sub_claims": [],
    "entities": {},
    "retrieval_plan": {},
    "evidence": [],
    "extracted_figures": [],
    "evidence_quality": {},
    "verdict": "",
    "confidence": 0.0,
    "explanation": "",
    "safety_flags": [],
    "is_dangerous": False,
    "agent_trace": [],
    "total_cost_usd": 0.0,
    "total_duration_seconds": 0.0,
}

print(f'Claim: "{CLAIM}"')
print(f'State keys: {list(state.keys())}')

---
## Step 1: Claim Decomposer (function node)

**What it does:** Extracts medical entities via scispaCy NER, extracts PICO elements, decomposes the claim into atomic sub-claims.

**Inputs:** `claim` (string)  
**Outputs:** `entities`, `pico`, `sub_claims`

In [None]:
state = await run_decomposer(state)

# --- Entities ---
print("ENTITIES")
print("=" * 50)
entities = state["entities"]
for etype, elist in entities.items():
    if elist:
        print(f"  {etype}: {elist}")
if not any(v for v in entities.values()):
    print("  (none detected)")

In [None]:
# --- PICO ---
print("PICO EXTRACTION")
print("=" * 50)
pico = state["pico"]
if pico:
    pico_df = pd.DataFrame([{
        "Element": el,
        "Value": getattr(pico, el.lower()) or "(null)",
    } for el in ["Population", "Intervention", "Comparison", "Outcome"]])
    display(pico_df)
else:
    print("  (no PICO)")

In [None]:
# --- Sub-claims ---
print("SUB-CLAIMS")
print("=" * 50)
sc_rows = []
for sc in state["sub_claims"]:
    row = {"id": sc.id, "text": sc.text}
    if sc.pico:
        row["P"] = sc.pico.population or ""
        row["I"] = sc.pico.intervention or ""
        row["C"] = sc.pico.comparison or ""
        row["O"] = sc.pico.outcome or ""
    sc_rows.append(row)

display(pd.DataFrame(sc_rows))

trace = state["agent_trace"][-1]
print(f"\nDuration: {trace.duration_seconds}s | Cost: ${trace.cost_usd:.4f}")

---
## Step 2: Retrieval Planner (ReAct agent)

**What it does:** Examines each sub-claim's characteristics and decides which retrieval methods to use.

**Inputs:** `sub_claims`, `entities`, `pico`  
**Outputs:** `retrieval_plan` (dict mapping sub-claim ID → list of methods)

Uses ReAct with Claude if `ANTHROPIC_API_KEY` is set, otherwise falls back to rule-based keyword matching.

In [None]:
state = await run_retrieval_planner(state)

print("RETRIEVAL PLAN")
print("=" * 60)
plan_rows = []
for sc_id, methods in state["retrieval_plan"].items():
    sc_text = next((sc.text for sc in state["sub_claims"] if sc.id == sc_id), "?")
    plan_rows.append({
        "sub_claim": f"{sc_id}: {sc_text[:60]}",
        "methods": ", ".join(methods),
        "count": len(methods),
    })

display(pd.DataFrame(plan_rows))

trace = state["agent_trace"][-1]
mode = "ReAct (LLM)" if trace.reasoning_steps > 0 else "Rule-based"
print(f"\nMode: {mode} | Duration: {trace.duration_seconds}s | Cost: ${trace.cost_usd:.4f}")

---
## Step 3: Evidence Retriever (ReAct agent)

**What it does:** Executes the retrieval plan — searches PubMed, Semantic Scholar, Cochrane, ClinicalTrials.gov, DrugBank. Re-ranks with cross-encoder, deduplicates, links evidence to sub-claims.

**Inputs:** `retrieval_plan`, `sub_claims`, `pico`, `entities`  
**Outputs:** `evidence` (list of Evidence objects), updated `sub_claims` (with evidence IDs)

In [None]:
state = await run_evidence_retriever(state)

evidence = state["evidence"]
print(f"EVIDENCE RETRIEVED: {len(evidence)} items")
print("=" * 60)

# Per sub-claim summary
for sc in state["sub_claims"]:
    sc_ev = [e for e in evidence if e.id in sc.evidence]
    by_source = {}
    for ev in sc_ev:
        by_source[ev.source] = by_source.get(ev.source, 0) + 1
    source_str = ", ".join(f"{s}: {c}" for s, c in sorted(by_source.items())) if by_source else "none"
    print(f"  [{sc.id}] {sc.text[:55]}")
    print(f"    {len(sc_ev)} items — {source_str}")

trace = state["agent_trace"][-1]
mode = "ReAct (LLM)" if trace.reasoning_steps > 0 else "Rule-based"
print(f"\nMode: {mode} | Duration: {trace.duration_seconds}s | Cost: ${trace.cost_usd:.4f}")

In [None]:
# Show all evidence items as a table
ev_rows = []
for ev in sorted(evidence, key=lambda e: e.quality_score, reverse=True):
    ev_rows.append({
        "id": ev.id,
        "source": ev.source,
        "study_type": ev.study_type or "unknown",
        "quality_score": round(ev.quality_score, 3),
        "title": ev.title[:65],
        "pmid": ev.pmid or "",
    })

ev_df = pd.DataFrame(ev_rows)
print(f"All evidence items (sorted by cross-encoder quality score):")
display(ev_df)

In [None]:
# Inspect a single evidence item in detail
if evidence:
    top_ev = sorted(evidence, key=lambda e: e.quality_score, reverse=True)[0]
    print(f"TOP EVIDENCE ITEM: {top_ev.id}")
    print("=" * 60)
    print(f"  Source:       {top_ev.source}")
    print(f"  Study type:   {top_ev.study_type}")
    print(f"  Quality:      {top_ev.quality_score:.4f}")
    print(f"  PMID:         {top_ev.pmid or '—'}")
    print(f"  Title:        {top_ev.title}")
    print(f"  URL:          {top_ev.url or '—'}")
    print(f"\n  Content (first 500 chars):")
    print(f"  {top_ev.content[:500]}")
else:
    print("No evidence retrieved.")

---
## Step 4: Evidence Grader (ReAct agent)

**What it does:** Evaluates each evidence item for study type, methodological quality, relevance to sub-claims, and applies the GRADE framework. Produces per-evidence quality scores and per-subclaim evidence summaries.

**Inputs:** `evidence`, `sub_claims`, `extracted_figures`  
**Outputs:** `evidence_quality` dict with:
- `per_evidence`: study type, hierarchy weight, methodology score, relevance, GRADE, evidence strength
- `per_subclaim`: evidence count, avg strength, direction summary, top evidence IDs

Uses ReAct with Claude if `ANTHROPIC_API_KEY` is set, otherwise falls back to rule-based (uses existing metadata).

In [None]:
state = await run_evidence_grader(state)

eq = state["evidence_quality"]
per_ev = eq["per_evidence"]
per_sc = eq["per_subclaim"]

print(f"EVIDENCE GRADING RESULTS")
print(f"  Graded: {len(per_ev)} evidence items")
print(f"  Sub-claim summaries: {len(per_sc)}")

trace = state["agent_trace"][-1]
mode = "ReAct (LLM)" if trace.reasoning_steps > 0 else "Rule-based"
print(f"  Mode: {mode} | Duration: {trace.duration_seconds}s | Cost: ${trace.cost_usd:.4f}")

In [None]:
# Per-evidence grading table
grade_rows = []
for ev_id, info in sorted(per_ev.items(), key=lambda x: x[1].get("evidence_strength", 0), reverse=True):
    # Find matching evidence object for title
    ev_obj = next((e for e in evidence if e.id == ev_id), None)
    title = ev_obj.title[:45] if ev_obj else "?"
    
    # Get direction for first sub-claim
    relevance = info.get("relevance", {})
    first_rel = next(iter(relevance.values()), {}) if relevance else {}
    
    grade_rows.append({
        "id": ev_id,
        "title": title,
        "study_type": info.get("study_type", "?"),
        "hierarchy_wt": info.get("hierarchy_weight", 0),
        "methodology": info.get("methodology_score", 0),
        "direction": first_rel.get("direction", "?"),
        "relevance": first_rel.get("score", 0),
        "strength": info.get("evidence_strength", 0),
    })

grade_df = pd.DataFrame(grade_rows)
print("Per-Evidence Grading (sorted by strength):")
display(grade_df)

In [None]:
# Per sub-claim summary
print("PER SUB-CLAIM EVIDENCE SUMMARY")
print("=" * 70)

sc_summary_rows = []
for sc_id, info in per_sc.items():
    sc_obj = next((sc for sc in state["sub_claims"] if sc.id == sc_id), None)
    sc_text = sc_obj.text[:50] if sc_obj else "?"
    ds = info.get("direction_summary", {})
    sc_summary_rows.append({
        "sub_claim": f"{sc_id}: {sc_text}",
        "evidence_count": info.get("evidence_count", 0),
        "avg_strength": round(info.get("avg_strength", 0), 3),
        "supports": ds.get("supports", 0),
        "opposes": ds.get("opposes", 0),
        "neutral": ds.get("neutral", 0),
    })

display(pd.DataFrame(sc_summary_rows))

# Top evidence per sub-claim
for sc_id, info in per_sc.items():
    top_ids = info.get("top_evidence_ids", [])
    if top_ids:
        print(f"\n  [{sc_id}] Top evidence: {', '.join(top_ids[:3])}")

In [None]:
# Inspect a single evidence item's full grading
if per_ev:
    # Pick the strongest
    best_id = max(per_ev, key=lambda k: per_ev[k].get("evidence_strength", 0))
    best = per_ev[best_id]
    ev_obj = next((e for e in evidence if e.id == best_id), None)
    
    print(f"DETAILED GRADING: {best_id}")
    print("=" * 60)
    if ev_obj:
        print(f"  Title: {ev_obj.title}")
        print(f"  Source: {ev_obj.source} | PMID: {ev_obj.pmid or '—'}")
    
    print(f"\n  Study Type: {best.get('study_type', '?')}")
    print(f"  Hierarchy Weight: {best.get('hierarchy_weight', 0)}")
    print(f"  Methodology Score: {best.get('methodology_score', 0)}")
    print(f"  Evidence Strength: {best.get('evidence_strength', 0)}")
    
    # Relevance per sub-claim
    rel = best.get("relevance", {})
    if rel:
        print(f"\n  Relevance:")
        for sc_id, r in rel.items():
            print(f"    {sc_id}: direction={r.get('direction', '?')}, score={r.get('score', 0):.2f}")
            if r.get("key_finding"):
                print(f"            finding: {r['key_finding'][:80]}")
    
    # GRADE
    grade = best.get("grade", {})
    if grade:
        print(f"\n  GRADE Framework:")
        for k, v in grade.items():
            print(f"    {k}: {v}")

---
## Pipeline Summary

In [None]:
print("PIPELINE SUMMARY")
print("=" * 60)
print(f"  Claim: \"{state['claim']}\"")
print(f"  Sub-claims: {len(state['sub_claims'])}")
print(f"  Evidence items: {len(state['evidence'])}")
print(f"  Graded evidence: {len(per_ev)}")
print(f"  Total cost: ${state['total_cost_usd']:.4f}")
print(f"  Total duration: {state['total_duration_seconds']:.2f}s")

# Node trace table
trace_rows = []
for t in state["agent_trace"]:
    trace_rows.append({
        "agent": t.agent,
        "type": t.node_type,
        "duration_s": t.duration_seconds,
        "cost_usd": round(t.cost_usd, 4),
        "tools": ", ".join(t.tools_called) if t.tools_called else "—",
        "reasoning_steps": t.reasoning_steps,
    })

print("\nAgent Traces:")
display(pd.DataFrame(trace_rows))

In [None]:
# Raw evidence_quality dict (for inspection)
print("Raw evidence_quality output (JSON):")
print(json.dumps(state["evidence_quality"], indent=2, default=str))

---
## Try Another Claim

Change the claim below and re-run all cells from Step 1 onwards.

In [None]:
# Quick-run: full pipeline in one cell
# Uncomment and modify the claim to test a different one

# CLAIM2 = "Vitamin D prevents COVID-19 infection"
# 
# state2: FactCheckState = {
#     "claim": CLAIM2, "pico": None, "sub_claims": [], "entities": {},
#     "retrieval_plan": {}, "evidence": [], "extracted_figures": [],
#     "evidence_quality": {}, "verdict": "", "confidence": 0.0,
#     "explanation": "", "safety_flags": [], "is_dangerous": False,
#     "agent_trace": [], "total_cost_usd": 0.0, "total_duration_seconds": 0.0,
# }
# 
# state2 = await run_decomposer(state2)
# state2 = await run_retrieval_planner(state2)
# state2 = await run_evidence_retriever(state2)
# state2 = await run_evidence_grader(state2)
# 
# print(f"Claim: {state2['claim']}")
# print(f"Sub-claims: {len(state2['sub_claims'])}")
# print(f"Evidence: {len(state2['evidence'])}")
# print(f"Cost: ${state2['total_cost_usd']:.4f}")
# print(f"Duration: {state2['total_duration_seconds']:.1f}s")
# 
# for sc_id, info in state2['evidence_quality']['per_subclaim'].items():
#     ds = info['direction_summary']
#     print(f"  {sc_id}: {info['evidence_count']} items, "
#           f"strength={info['avg_strength']:.3f}, "
#           f"supports={ds['supports']}, opposes={ds['opposes']}, neutral={ds['neutral']}")