In [9]:
# =========================
# Block 1: Imports + Config + Normalization Helpers
# =========================

from __future__ import annotations
import json
import os
import re
from typing import Any, Dict, List, Tuple, Optional

# -------------------------
# CONFIG: file paths (edit if your filenames differ)
# You can override any of these using environment variables with the same names.
# -------------------------
P1_FILE = os.getenv("P1_FILE", "/upb/users/b/balram/profiles/unix/cs/promptKG/data/output/prompt1/dbpedia/ont_12_monument_output.jsonl")  # Prompt 1 outputs
P2_FILE = os.getenv("P2_FILE", "/upb/users/b/balram/profiles/unix/cs/promptKG/data/output/prompt2/dbpedia/ont_12_monument_output.jsonl")  # Prompt 2 outputs
P3_FILE = os.getenv("P3_FILE", "/upb/users/b/balram/profiles/unix/cs/promptKG/data/output/prompt3/dbpedia/ont_12_monument_output.jsonl")  # Prompt 3 outputs
ONT_FILE = os.getenv("ONT_FILE", "/upb/users/b/balram/profiles/unix/cs/promptKG/data/input/dbpedia/input_ontology/12_monument_ontology.json")  # Ontology (authority)
GT_FILE  = os.getenv("GT_FILE",  "/upb/users/b/balram/profiles/unix/cs/promptKG/data/input/dbpedia/ground_truth/ont_12_monument_ground_truth.jsonl")  # Optional: for metrics only

# -------------------------
# Normalization helpers (strings, keys) — NO date detection/normalization
# -------------------------

_WHITESPACE_RE = re.compile(r"\s+")
_QUOTES_TO_STRIP = "“”\"'`’"

def norm_surface(s: Any) -> str:
    """
    Normalize a surface string for display/storage (light touch).
    - convert non-strings with json.dumps to preserve structure
    - strip surrounding quotes
    - collapse internal whitespace
    """
    if s is None:
        return ""
    if not isinstance(s, str):
        s = json.dumps(s, ensure_ascii=False)
    t = s.strip().strip(_QUOTES_TO_STRIP)
    t = _WHITESPACE_RE.sub(" ", t)
    return t

def norm_key(s: Any) -> str:
    """
    Normalize a string for equality comparisons across prompts.
    - convert non-strings with json.dumps to preserve structure
    - strip surrounding quotes
    - collapse internal whitespace
    - lowercase
    """
    if s is None:
        return ""
    if not isinstance(s, str):
        s = json.dumps(s, ensure_ascii=False)
    t = s.strip().strip(_QUOTES_TO_STRIP)
    t = _WHITESPACE_RE.sub(" ", t)
    return t.lower()

def canonical_triple_key(subj: Any, rel: Any, obj: Any) -> Tuple[str, str, str]:
    """
    Build a normalized key for comparing triples across prompts.
    NO date parsing. Object is treated as plain text.
    - subject & relation: norm_key
    - object: norm_surface → norm_key
    """
    s = norm_key(subj)
    r = norm_key(rel)
    o = norm_key(norm_surface(obj))
    return (s, r, o)


In [10]:
# =========================
# Block 2: Loaders + Per-ID Index (no ontology)
# =========================
# Purpose:
#   - Read P1/P2/P3 JSONL outputs safely.
#   - Normalize heterogeneous triple schemas into a common internal shape.
#   - Attach provenance (which prompt/file, source line).
#   - Preserve the common `input text` per id.
#   - Build a per-id index: {id: {"input_text": str|None, "p1":[...], "p2":[...], "p3":[...]}}
#   - (No ontology logic here; no date parsing — handled by Block 1 normalizers only.)
#
# Notes:
#   - This block depends on Block 1 helpers: norm_surface, norm_key, canonical_triple_key
#   - We DO NOT change the evaluator rules here; we just prepare clean data for Block 3.

from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional, Tuple

# -------------------------
# 1) JSONL reader (strict JSON; keeps parse errors as rows with _parse_error)
# -------------------------
def read_jsonl(path: str) -> List[Dict[str, Any]]:
    """
    Read a JSONL file into a list of row dicts.
    - On parse errors, inject a row with _parse_error and the raw prefix.
    - If the file is missing, return [] and print a warning.
    """
    rows: List[Dict[str, Any]] = []
    try:
        with open(path, "r", encoding="utf-8") as f:
            for ln, line in enumerate(f, start=1):
                line = line.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    obj["_line_no"] = ln
                    rows.append(obj)
                except Exception as e:
                    rows.append({
                        "_line_no": ln,
                        "_parse_error": str(e),
                        "_raw": line[:500],
                    })
    except FileNotFoundError:
        print(f"[WARN] File not found: {path}")
    return rows


# -------------------------
# 2) Support coercion helpers
# -------------------------
def _extract_support_text(triple_obj: Dict[str, Any]) -> Optional[str]:
    """
    Return a unified support text string:
      - P1/P2: support is a string: return as-is (normalized lightly by caller if needed).
      - P3: support is a list of {quote, char_span}; concatenate all quotes with ' | '.
      - If support is missing/empty, return None.
    """
    sup = triple_obj.get("support")
    if sup is None:
        return None

    # P3 style: list of dicts with "quote"
    if isinstance(sup, list):
        quotes = []
        for item in sup:
            if isinstance(item, dict):
                q = item.get("quote")
                if isinstance(q, str) and q.strip():
                    quotes.append(q.strip())
            elif isinstance(item, str) and item.strip():
                # be tolerant if it's already strings
                quotes.append(item.strip())
        return " | ".join(quotes) if quotes else None

    # P1/P2 style: single string
    if isinstance(sup, str):
        s = sup.strip()
        return s if s else None

    # Unknown shape → None
    return None


# -------------------------
# 3) STRICT extractor (response -> json -> triples), with provenance & normalization
# -------------------------
def extract_triples_strict(row: Dict[str, Any], file_tag: str) -> List[Dict[str, Any]]:
    """
    Expect fixed schema per pipeline: row["response"]["json"]["triples"] -> list
    Normalize heterogeneous triple dicts into a common internal representation:
      {
        "s": <subject original surface>,
        "p": <relation original surface>,
        "o": <object original surface>,
        "confidence": <float|None>,
        "support_raw": <original support field>,
        "support_text": <coerced string or None>,
        "source_prompt": <file_tag>,
        "canonical": (s_norm, p_norm, o_norm)  # via Block 1 canonical_triple_key
      }
    If schema missing/invalid → return [].
    """
    resp = row.get("response")
    if not isinstance(resp, dict):
        return []
    js = resp.get("json")
    if not isinstance(js, dict):
        return []
    triples = js.get("triples")
    if not isinstance(triples, list):
        return []

    out: List[Dict[str, Any]] = []
    for t in triples:
        if not isinstance(t, dict):
            continue

        # Expected shape has "triple": ["s","p","o"]
        spo = t.get("triple")
        if not (isinstance(spo, list) and len(spo) == 3):
            continue

        s_raw, p_raw, o_raw = spo[0], spo[1], spo[2]
        # Keep originals as-is for output
        s = s_raw if isinstance(s_raw, str) else norm_surface(s_raw)
        p = p_raw if isinstance(p_raw, str) else norm_surface(p_raw)
        o = o_raw if isinstance(o_raw, str) else norm_surface(o_raw)

        # Confidence (optional)
        conf = t.get("confidence")
        try:
            conf = float(conf) if conf is not None else None
        except Exception:
            conf = None

        support_text = _extract_support_text(t)

        triple_norm_key = canonical_triple_key(s, p, o)

        out.append({
            "s": s,
            "p": p,
            "o": o,
            "confidence": conf,
            "support_raw": t.get("support"),
            "support_text": support_text,
            "source_prompt": file_tag,
            "canonical": triple_norm_key,
        })

    return out



# -------------------------
# 4) Load one prompt file into a per-id store
# -------------------------
def _pick_input_text(existing: Optional[str], candidate_row: Dict[str, Any]) -> Optional[str]:
    """
    Choose the non-empty 'input text' if available;
    keep the first non-empty value we encounter across rows.
    Accept both 'input text' and 'input_text' keys (be tolerant).
    """
    if existing and existing.strip():
        return existing
    # Prefer exact key 'input text' (as in your sample)
    txt = candidate_row.get("input text")
    if isinstance(txt, str) and txt.strip():
        return txt
    # Fallback variants
    for k in ("input_text", "input", "text", "source_text"):
        v = candidate_row.get(k)
        if isinstance(v, str) and v.strip():
            return v
    return existing  # unchanged (possibly None)


def load_prompt_outputs_strict(path: str, file_tag: str) -> Dict[str, Dict[str, Any]]:
    """
    Read a prompt JSONL file and produce:
      {
        id: {
          "input_text": str|None,
          "rows":   [raw rows without parse errors],
          "triples":[normalized triple dicts with provenance]
        },
        ...
      }
    - Skips rows with _parse_error.
    - Aggregates triples across multiple rows for the same id.
    """
    data: Dict[str, Dict[str, Any]] = {}
    rows = read_jsonl(path)

    for row in rows:
        if row.get("_parse_error"):
            continue
        rid = row.get("id")
        if not rid:
            # Skip rows without id
            continue

        bucket = data.setdefault(rid, {"input_text": None, "rows": [], "triples": []})
        bucket["rows"].append(row)

        # Update/keep the common input_text
        bucket["input_text"] = _pick_input_text(bucket["input_text"], row)

        # Extract triples
        triples = extract_triples_strict(row, file_tag)
        if triples:
            bucket["triples"].extend(triples)

    return data


# -------------------------
# 5) Dedup helper (per list of triples)
# -------------------------
def dedup_triples(triples: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Deduplicate triples by their canonical (s,p,o) key.
    Keep the first occurrence (preserves a representative confidence/support/source).
    """
    seen = set()
    out: List[Dict[str, Any]] = []
    for t in triples:
        k = t.get("canonical")
        if not k:
            # If canonical missing, compute on the fly (defensive)
            k = canonical_triple_key(t.get("s"), t.get("p"), t.get("o"))
            t["canonical"] = k
        if k in seen:
            continue
        seen.add(k)
        out.append(t)
    return out


# -------------------------
# 6) Build per-ID unified view across prompts
# -------------------------
def build_id_index(P1: Dict[str, Dict[str, Any]],
                   P2: Dict[str, Dict[str, Any]],
                   P3: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    """
    Combine three per-id maps (from load_prompt_outputs_strict) into:
      {
        id: {
          "input_text": first-nonempty among P1,P2,P3,
          "p1": [triples...],  # deduped
          "p2": [triples...],  # deduped
          "p3": [triples...],  # deduped
        }
      }
    """
    ids = sorted(set(P1.keys()) | set(P2.keys()) | set(P3.keys()),
                 key=lambda x: int(re.findall(r'\d+$', x)[0]) if re.findall(r'\d+$', x) else x)

    out: Dict[str, Dict[str, Any]] = {}

    for rid in ids:
        input_text = None
        for src in (P1.get(rid), P2.get(rid), P3.get(rid)):
            if src:
                input_text = _pick_input_text(input_text, {"input text": src.get("input_text")})
                # _pick_input_text expects a row-like dict; we adapt here by keying to "input text"

        p1_tr = dedup_triples(P1.get(rid, {}).get("triples", []))
        p2_tr = dedup_triples(P2.get(rid, {}).get("triples", []))
        p3_tr = dedup_triples(P3.get(rid, {}).get("triples", []))

        out[rid] = {
            "input_text": input_text,
            "p1": p1_tr,
            "p2": p2_tr,
            "p3": p3_tr,
        }

    return out


# -------------------------
# 7) Summary printer
# -------------------------
def summarize_loaded(index_by_id: Dict[str, Dict[str, Any]]) -> None:
    """
    Print a concise summary of loaded content: number of ids and triple counts per prompt.
    """
    total_ids = len(index_by_id)
    p1 = sum(len(b["p1"]) for b in index_by_id.values())
    p2 = sum(len(b["p2"]) for b in index_by_id.values())
    p3 = sum(len(b["p3"]) for b in index_by_id.values())
    print(f"[Loaded] ids={total_ids} | triples: p1={p1}, p2={p2}, p3={p3} | total={p1+p2+p3}")


# -------------------------
# 8) Execute loads (strict) using paths from Block 1
# -------------------------
if __name__ == "__main__":
    # Load each prompt's outputs with provenance
    P1 = load_prompt_outputs_strict(P1_FILE, "P1")
    P2 = load_prompt_outputs_strict(P2_FILE, "P2")
    P3 = load_prompt_outputs_strict(P3_FILE, "P3")

    # Build cross-prompt index
    INDEX_BY_ID = build_id_index(P1, P2, P3)
    summarize_loaded(INDEX_BY_ID)

    # Quick peek at one id
    SAMPLE_ID = next(iter(INDEX_BY_ID)) if INDEX_BY_ID else None
    print("Sample ID:", SAMPLE_ID)
    if SAMPLE_ID:
        for tag in ("p1", "p2", "p3"):
            print(f"  {tag} triples:", len(INDEX_BY_ID[SAMPLE_ID][tag]))
        # Show one example triple (any prompt)
        for tag in ("p1", "p2", "p3"):
            if INDEX_BY_ID[SAMPLE_ID][tag]:
                ex = INDEX_BY_ID[SAMPLE_ID][tag][0]
                print("  example triple from", tag, ":", {
                    "s": ex["s"], "p": ex["p"], "o": ex["o"],
                    "confidence": ex["confidence"],
                    "support_text": ex["support_text"],
                    "source_prompt": ex["source_prompt"],
                    "canonical": ex["canonical"],
                })
                break


[Loaded] ids=19 | triples: p1=0, p2=59, p3=57 | total=116
Sample ID: ont_12_monument_test_1
  p1 triples: 0
  p2 triples: 3
  p3 triples: 3
  example triple from p2 : {'s': 'The 14th New Jersey Volunteer Infantry Monument', 'p': 'location', 'o': 'Monocacy National Battlefield', 'confidence': 1.0, 'support_text': 'which is located in the Monocacy National Battlefield', 'source_prompt': 'P2', 'canonical': ('the 14th new jersey volunteer infantry monument', 'location', 'monocacy national battlefield')}


In [11]:
# =========================
# Block 3: Evaluator (Consensus + Evidence Scoring)
# =========================
# Purpose:
#   - Select the best triples per ID from P1, P2, P3 according to your rules.
#   - Rule A: consensus (same canonical s,p,o) appears in ≥2 prompts + subj/obj in input text (fuzzy ≥ 0.90).
#   - Rule B: single-prompt with evidence (support required), Evidence = 0.50*COLOC + 0.25*SUBJ_SUP + 0.25*OBJ_SUP + 0.10*SIM > 0.70.
#   - NO synonym mapping (e.g., "US" != "United States" unless fuzzy ≥ 0.90 vs input text).
#   - Outputs JSONL: {"id":"...","input_text":"...","triples":[{"s":"...","p":"...","o":"..."}]} ; if none → "triples": null
#   - Includes a debug mode to inspect the first N IDs with detailed reasoning per triple.

from __future__ import annotations
import json
import re
import difflib
from typing import Any, Dict, List, Tuple, Optional

# ---------- Matching & Similarity Helpers ----------

_WS_RE = re.compile(r"\s+")
_TOKEN_RE = re.compile(r"\w+", flags=re.UNICODE)

def _norm_for_match(s: Optional[str]) -> str:
    """Lowercase and collapse whitespace for fuzzy/substring checks."""
    if not isinstance(s, str):
        return ""
    t = s.strip().lower()
    t = _WS_RE.sub(" ", t)
    return t

def _tokens(s: str) -> List[str]:
    """Tokenize to word tokens (letters/digits/underscore)."""
    return _TOKEN_RE.findall(_norm_for_match(s))

def jaccard_similarity(a: str, b: str) -> float:
    """
    Jaccard similarity over token sets between two strings.
    - If both token sets are empty → 0.0
    """
    ta, tb = set(_tokens(a)), set(_tokens(b))
    if not ta and not tb:
        return 0.0
    if not ta or not tb:
        return 0.0
    inter = len(ta & tb)
    union = len(ta | tb)
    return inter / union if union else 0.0

def fuzzy_in(needle: str, haystack: str, threshold: float = 0.90) -> Tuple[bool, float]:
    """
    Substring-aware fuzzy membership:
      - True if normalized 'needle' is a literal substring of normalized 'haystack'.
      - Else compute best difflib ratio over sliding token windows of haystack
        with window sizes len(needle_tokens) .. len(needle_tokens)+2.
      - Returns (bool_passed, best_score).
    """
    n = _norm_for_match(needle)
    h = _norm_for_match(haystack)
    if not n or not h:
        return (False, 0.0)

    if n in h:
        return (True, 1.0)

    n_tokens = _tokens(n)
    h_tokens = _tokens(h)

    # Fallback: global ratio if tokens missing
    if not n_tokens or not h_tokens:
        score = difflib.SequenceMatcher(None, h, n).ratio()
        return (score >= threshold, score)

    best = 0.0
    win_min = max(1, len(n_tokens))
    win_max = min(len(h_tokens), len(n_tokens) + 2)
    for w in range(win_min, win_max + 1):
        for i in range(0, len(h_tokens) - w + 1):
            seg = " ".join(h_tokens[i:i + w])
            r = difflib.SequenceMatcher(None, seg, n).ratio()
            if r > best:
                best = r
            if best >= 1.0:
                break
        if best >= 1.0:
            break

    # If nothing matched well in windows, try a global ratio as a last resort
    if best < threshold:
        global_r = difflib.SequenceMatcher(None, h, n).ratio()
        best = max(best, global_r)

    return (best >= threshold, best)

# ---------- Rule A & Rule B Evaluation ----------

def _choose_surface_variant(instances: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Given multiple instances of the same canonical triple (from different prompts),
    choose one representative triple to output.
    - Prefer highest confidence; if None, treat as 0.0.
    - If tie, keep the first.
    """
    def conf_or_zero(x):
        c = x.get("confidence")
        try:
            return float(c) if c is not None else 0.0
        except Exception:
            return 0.0

    best = max(instances, key=conf_or_zero)
    return best

def _collect_by_canonical(p1_tr: List[Dict[str, Any]],
                          p2_tr: List[Dict[str, Any]],
                          p3_tr: List[Dict[str, Any]]) -> Dict[Tuple[str,str,str], List[Dict[str, Any]]]:
    """
    Build a map: canonical (s,p,o) -> list of triple instances across prompts.
    """
    by_key: Dict[Tuple[str,str,str], List[Dict[str, Any]]] = {}
    for t in (p1_tr + p2_tr + p3_tr):
        k = t.get("canonical")
        if not k:
            k = canonical_triple_key(t.get("s"), t.get("p"), t.get("o"))
            t["canonical"] = k
        by_key.setdefault(k, []).append(t)
    return by_key

def _rule_a_select(by_key: Dict[Tuple[str,str,str], List[Dict[str, Any]]],
                   input_text: str,
                   threshold: float = 0.90,
                   debug: bool = False) -> List[Dict[str, Any]]:
    """
    Rule A: Consensus selection.
    - Same canonical triple appears in >= 2 distinct prompts.
    - Subject and Object both fuzzy-match the full input text (>= threshold).
    Returns representative triple dicts (original s,p,o preserved).
    """
    selected: List[Dict[str, Any]] = []
    for k, insts in by_key.items():
        prompts = {t.get("source_prompt") for t in insts}
        if len(prompts) >= 2:
            rep = _choose_surface_variant(insts)
            s, o = rep["s"], rep["o"]
            s_ok, s_score = fuzzy_in(s, input_text, threshold)
            o_ok, o_score = fuzzy_in(o, input_text, threshold)

            if s_ok and o_ok:
                selected.append(rep)
                if debug:
                    print(f"  [Rule A PASS] {rep['s']} — {rep['p']} — {rep['o']}  (s={s_score:.2f}, o={o_score:.2f}) from {sorted(prompts)}")
            else:
                if debug:
                    print(f"  [Rule A FAIL] {rep['s']} — {rep['p']} — {rep['o']}  "
                          f"(s={s_score:.2f}, o={o_score:.2f}) from {sorted(prompts)}")
    return selected

def _present_in_support_and_input(s: str, support: str, input_text: str, thr: float) -> Tuple[bool, float, float]:
    """Utility: check s appears in BOTH support and input_text."""
    in_sup, sup_sc = fuzzy_in(s, support, thr)
    in_inp, inp_sc = fuzzy_in(s, input_text, thr)
    return (in_sup and in_inp, sup_sc, inp_sc)

def _rule_b_select(by_key: Dict[Tuple[str,str,str], List[Dict[str, Any]]],
                   input_text: str,
                   threshold: float = 0.90,
                   evidence_cut: float = 0.70,
                   debug: bool = False) -> List[Dict[str, Any]]:
    """
    Rule B: Single-prompt with evidence.
    - Only in 1 prompt; support required.
    - COLOC: subj & obj & rel all in support (>=thr) AND each in input (>=thr) → 1.0 else 0.0
    - SUBJ_SUP: subject in support (>=thr) AND object NOT in support AND relation NOT in support → 1.0 else 0.0
    - OBJ_SUP: object in support (>=thr) AND subject NOT in support AND relation NOT in support → 1.0 else 0.0
    - SIM: Jaccard(support_concat, input_text) in [0,1]
    - Evidence = 0.50*COLOC + 0.25*SUBJ_SUP + 0.25*OBJ_SUP + 0.10*SIM; keep if > evidence_cut
    """
    selected: List[Dict[str, Any]] = []

    for k, insts in by_key.items():
        prompts = {t.get("source_prompt") for t in insts}
        if len(prompts) != 1:
            continue  # not a single-prompt case

        t = insts[0]
        s, p, o = t["s"], t["p"], t["o"]
        sup = t.get("support_text")
        if not sup:
            if debug:
                print(f"  [Rule B REJECT] (no support) {s} — {p} — {o}")
            continue  # support required → reject

        # COLOC: subj+obj+rel in support & in input
        s_co, s_sup_sc, s_inp_sc = _present_in_support_and_input(s, sup, input_text, threshold)
        o_co, o_sup_sc, o_inp_sc = _present_in_support_and_input(o, sup, input_text, threshold)
        p_co, p_sup_sc, p_inp_sc = _present_in_support_and_input(p, sup, input_text, threshold)
        coloc = 1.0 if (s_co and o_co and p_co) else 0.0

        # SUBJ_SUP: subject in support AND object NOT in support AND relation NOT in support
        subj_in_sup, _ = fuzzy_in(s, sup, threshold)
        obj_in_sup, _ = fuzzy_in(o, sup, threshold)
        rel_in_sup, _ = fuzzy_in(p, sup, threshold)
        subj_sup = 1.0 if (subj_in_sup and not obj_in_sup and not rel_in_sup) else 0.0

        # OBJ_SUP: object in support AND subject NOT in support AND relation NOT in support
        obj_sup = 1.0 if (obj_in_sup and not subj_in_sup and not rel_in_sup) else 0.0

        # SIM: Jaccard(support, input_text)
        sim = jaccard_similarity(sup, input_text)
        if sim < 0.0:  # defensive (shouldn't happen)
            sim = 0.0
        if sim > 1.0:
            sim = 1.0

        evidence = (0.50 * coloc) + (0.25 * subj_sup) + (0.25 * obj_sup) + (0.10 * sim)
        evidence = max(0.0, min(1.0, evidence))

        if evidence > evidence_cut:
            selected.append(t)
            if debug:
                print(f"  [Rule B PASS] {s} — {p} — {o} | "
                      f"COLOC={coloc:.2f} (s_sup={s_sup_sc:.2f}, o_sup={o_sup_sc:.2f}, p_sup={p_sup_sc:.2f}; "
                      f"s_inp={s_inp_sc:.2f}, o_inp={o_inp_sc:.2f}, p_inp={p_inp_sc:.2f}) "
                      f"| SUBJ_SUP={subj_sup:.2f} | OBJ_SUP={obj_sup:.2f} | SIM={sim:.2f} "
                      f"| Evidence={evidence:.2f}")
        else:
            if debug:
                print(f"  [Rule B FAIL] {s} — {p} — {o} | "
                      f"COLOC={coloc:.2f} (s_sup={s_sup_sc:.2f}, o_sup={o_sup_sc:.2f}, p_sup={p_sup_sc:.2f}; "
                      f"s_inp={s_inp_sc:.2f}, o_inp={o_inp_sc:.2f}, p_inp={p_inp_sc:.2f}) "
                      f"| SUBJ_SUP={subj_sup:.2f} | OBJ_SUP={obj_sup:.2f} | SIM={sim:.2f} "
                      f"| Evidence={evidence:.2f}")

    return selected

# ---------- Public API: Evaluate & Write ----------

def evaluate_ids(index_by_id: Dict[str, Dict[str, Any]],
                 out_jsonl_path: str,
                 limit_ids: Optional[int] = None,
                 debug: bool = True) -> None:
    """
    Run evaluator over the cross-prompt index and write final filtered triples to JSONL.
    - limit_ids: if provided, only process the first N IDs (sorted numerically by trailing digits).
    - debug: print per-ID details (Rule A/B decisions and evidence).
    Output JSONL per line:
      {"id":"...","input_text":"...","triples":[{"s":"...","p":"...","o":"..."}]}
      If no triple selected for an ID → "triples": null
    """
    # Sort IDs numerically by any trailing digits for stable, human-friendly processing
    def sort_key(x):
        m = re.findall(r"(\d+)$", x)
        return int(m[0]) if m else x
    all_ids = sorted(index_by_id.keys(), key=sort_key)

    if limit_ids is not None:
        ids = all_ids[:limit_ids]
    else:
        ids = all_ids

    # Open output file
    with open(out_jsonl_path, "w", encoding="utf-8") as fout:
        for rid in ids:
            rec = index_by_id[rid]
            input_text = rec.get("input_text") or ""

            # triples by prompt
            p1_tr = rec.get("p1", [])
            p2_tr = rec.get("p2", [])
            p3_tr = rec.get("p3", [])

            # Build consensus map
            by_key = _collect_by_canonical(p1_tr, p2_tr, p3_tr)

            if debug:
                print("\n" + "="*80)
                print(f"[ID] {rid}")
                print(f"  P1 triples: {len(p1_tr)} | P2: {len(p2_tr)} | P3: {len(p3_tr)}")
                print("  → Running Rule A (consensus ≥ 2 prompts) …")

            # Rule A selection
            sel_a = _rule_a_select(by_key, input_text, threshold=0.90, debug=debug)

            # Rule B selection
            if debug:
                print("  → Running Rule B (single-prompt with evidence) …")
            sel_b = _rule_b_select(by_key, input_text, threshold=0.90, evidence_cut=0.70, debug=debug)

            # Merge and dedup final selections by canonical; pick the best surface variant
            final_map: Dict[Tuple[str,str,str], List[Dict[str, Any]]] = {}
            for t in (sel_a + sel_b):
                k = t.get("canonical")
                final_map.setdefault(k, []).append(t)

            final_triples: List[Dict[str, str]] = []
            for k, insts in final_map.items():
                rep = _choose_surface_variant(insts)
                final_triples.append({"s": rep["s"], "p": rep["p"], "o": rep["o"]})

            if debug:
                print(f"  → Selected triples: {len(final_triples)}")
                for tt in final_triples:
                    print(f"    [SELECTED] {tt['s']} — {tt['p']} — {tt['o']}")

            # Write record
            out_obj = {
                "id": rid,
                "input_text": input_text,
                "triples": final_triples if final_triples else None
            }
            fout.write(json.dumps(out_obj, ensure_ascii=False) + "\n")

    if debug:
        print("\n[Evaluator] Done.")
        print(f"Wrote results to: {out_jsonl_path}")


In [12]:
if __name__ == "__main__":
    # ... after building INDEX_BY_ID in Block 2 ...
    evaluate_ids(INDEX_BY_ID, out_jsonl_path="evaluator_output_debug.jsonl", limit_ids=5, debug=True)
    # When satisfied, run on all:
    # evaluate_ids(INDEX_BY_ID, out_jsonl_path="evaluator_output_all.jsonl", limit_ids=None, debug=False)



[ID] ont_12_monument_test_1
  P1 triples: 0 | P2: 3 | P3: 3
  → Running Rule A (consensus ≥ 2 prompts) …
  [Rule A PASS] The 14th New Jersey Volunteer Infantry Monument — location — Monocacy National Battlefield  (s=1.00, o=1.00) from ['P2', 'P3']
  → Running Rule B (single-prompt with evidence) …
  [Rule B FAIL] The 14th New Jersey Volunteer Infantry Monument — country — US | COLOC=0.00 (s_sup=0.25, o_sup=1.00, p_sup=0.53; s_inp=1.00, o_inp=1.00, p_inp=0.62) | SUBJ_SUP=0.00 | OBJ_SUP=1.00 | SIM=0.29 | Evidence=0.28
  [Rule B FAIL] Monocacy National Battlefield — district — US | COLOC=0.00 (s_sup=1.00, o_sup=0.00, p_sup=0.29; s_inp=1.00, o_inp=1.00, p_inp=1.00) | SUBJ_SUP=1.00 | OBJ_SUP=0.00 | SIM=0.18 | Evidence=0.27
  [Rule B FAIL] The 14th New Jersey Volunteer Infantry Monument — established — 11 July 1907 | COLOC=0.00 (s_sup=0.17, o_sup=1.00, p_sup=1.00; s_inp=1.00, o_inp=1.00, p_inp=1.00) | SUBJ_SUP=0.00 | OBJ_SUP=0.00 | SIM=0.25 | Evidence=0.03
  [Rule B FAIL] The 14th New Jerse

In [13]:
if __name__ == "__main__":
    # After Block 2 produced INDEX_BY_ID
    evaluate_ids(
        INDEX_BY_ID,
        out_jsonl_path="/upb/users/b/balram/profiles/unix/cs/promptKG/data/evaluation/final_triples/dbpedia/ont_12_monument_final_output_latest.jsonl",
        limit_ids=None,     # process all IDs
        debug=False          # turn off debug printing
    )
