Requisitos (instalar antes)

Tener en cuenta que para el uso maximo con gpu es necesario instalar cuda con la version de python y drivers de la gráfica. 

In [None]:
#!pip install -q -U rank-bm25 sentence-transformers torch faiss-cpu numpy llama-cpp-python gradio

Descargar modelo mistral Q4 K M de 4.37gb desde el navegador escribiendo la url* https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF *
Ubicar en la misma carpeta que este archivo ipynb

In [1]:
# ============================================================
# RAG híbrido TIC 
# - Cita automática de evidencias usadas en la respuesta: [E1] [E3] ...
# - Comandos (desde la MISMA GUI):
#     :help
#     :mode agent|eval
#     :facet soft|strict
#     :facets on|off
#     :rerank on|off
#     :gen on|off
#     :exit
# ============================================================

import sys, subprocess, importlib, json, re, unicodedata, time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Dict, Any, Tuple, Optional

# -------------------- seguridad / límites --------------------
SAFE_CTX_MAX = 5200   # tamaño máximo de contexto como seguridad
SAFE_CTX_MIN = 800

def clamp_int(x, lo, hi, default):
    try:
        v = int(x)
    except Exception:
        return int(default)
    if v < lo:
        return int(lo)
    if v > hi:
        return int(hi)
    return int(v)

# -------------------- deps --------------------
def ensure(pkg, pip_name=None):
    try:
        return importlib.import_module(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", (pip_name or pkg), "-q"])
        return importlib.import_module(pkg)

rank_bm25 = ensure("rank_bm25", "rank-bm25")
st = ensure("sentence_transformers", "sentence-transformers")
torch = ensure("torch", "torch")
numpy_mod = ensure("numpy", "numpy")
faiss = ensure("faiss", "faiss-cpu")
gradio = ensure("gradio", "gradio")

from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer, CrossEncoder
import numpy as np
import gradio as gr

# llama.cpp
try:
    llama_cpp = ensure("llama_cpp", "llama-cpp-python")
    from llama_cpp import Llama
except Exception:
    Llama = None

# ============================ paths / modelos ============================

JSON_PATH = r"estructura_semantica_final_PTIC2.json"
GGUF_PATH = r"mistral-7b-instruct-v0.2.Q4_K_M.gguf"
PROMPT_PATH_AGENT = r"prompt_base.txt"
PROMPT_PATH_EVAL = r"prompt_base_ev.txt"

EMB_MODEL_NAME = "intfloat/multilingual-e5-base"
RERANKER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"

# llama cfg
N_GPU_LAYERS = 5
N_THREADS = 8
N_CTX = 4096
MAX_TOKENS = 384
TEMPERATURE_AGENT = 0.2
TEMPERATURE_EVAL = 0.0

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[RAG] DEVICE = {DEVICE}")

engine = None  # global (se comparte con la GUI)

# ============================ utils ============================

def norm(s: Any) -> str:
    if s is None:
        return ""
    if isinstance(s, list):
        s = " ".join(map(str, s))
    s = str(s).lower().strip()
    s = unicodedata.normalize("NFD", s)
    s = "".join(c for c in s if unicodedata.category(c) != "Mn")
    s = re.sub(r"\s+", " ", s)
    toks = s.split()
    out = []
    for t in toks:
        out.append(t[:-1] if (len(t) > 3 and t.endswith("s")) else t)
    return " ".join(out)

def tokenize_simple(s: str) -> List[str]:
    return re.findall(r"[a-z0-9]+", norm(s))

def char_trigrams(s: str) -> set:
    s = re.sub(r"\s+", " ", s)
    s = f" {s} "
    if len(s) < 3:
        return {s}
    return {s[i:i+3] for i in range(len(s)-2)}

def jaccard_trigram(a: str, b: str) -> float:
    A, B = char_trigrams(a), char_trigrams(b)
    if not A or not B:
        return 0.0
    inter = len(A & B)
    union = len(A | B)
    return inter / union if union else 0.0

def jaccard_tokens(a: str, b: str) -> float:
    A, B = set(tokenize_simple(a)), set(tokenize_simple(b))
    if not A or not B:
        return 0.0
    inter = len(A & B)
    union = len(A | B)
    return inter / union if union else 0.0

def dedup_type_name(tipo: Optional[str], nombre: Optional[str]) -> Tuple[str, str]:
    t = (tipo or "").strip()
    n = (nombre or "").strip()
    if not n and not t:
        return "", ""
    tn = norm(t)
    nn = norm(n)
    base = n
    if tn and (nn.startswith(tn + " ") or nn == tn):
        if n.lower().startswith(t.lower()):
            base = n[len(t):].lstrip()
    display = (f"{t} {base}".strip() if t else base).strip()
    return base, display

CODE_RE = re.compile(r"\b[a-z]?\d{2,4}[a-z\d]*\b", re.IGNORECASE)

# ============================ carga docs ============================

def load_docs(json_path: str | Path) -> List[Dict[str, Any]]:
    data = json.load(open(json_path, "r", encoding="utf-8"))
    docs: List[Dict[str, Any]] = []

    def flatten_doc(d: Dict[str, Any], prefix: str = "", out: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        if out is None:
            out = {}
        for k, v in (d or {}).items():
            if str(k).startswith("_"):
                continue
            path = f"{prefix}.{k}" if prefix else str(k)
            if isinstance(v, dict):
                flatten_doc(v, path, out)
            elif isinstance(v, list):
                out[path] = v
            else:
                out[path] = v
        return out

    for e in data.get("espacios", []):
        attrs = e.get("attrs", {}) or {}
        personal = []
        for p in (attrs.get("Personal", []) or []):
            nm = p.get("ConNombre") or p.get("nombre") or p.get("Encargado")
            if nm:
                personal.append(str(nm))

        bloque = e.get("bloque") or e.get("bloque_id") or ""
        if isinstance(bloque, str) and bloque.startswith("BL-"):
            bloque = bloque.replace("BL-", "")
        if isinstance(bloque, str) and bloque.isdigit():
            bloque = f"A{bloque}"

        pieza = {
            "id": str(e.get("id") or e.get("codigo") or e.get("nombre")),
            "kind": "espacio",
            "codigo": e.get("codigo"),
            "tipo": e.get("tipo"),
            "nombre": e.get("nombre"),
            "bloque": bloque,
            "piso": attrs.get("piso"),
            "direccionRelativa": attrs.get("direccionRelativa"),
            "direccionOrientativa": attrs.get("direccionOrientativa"),
            "carrera": attrs.get("AsignadoACarrera"),
            "facultad": attrs.get("Facultad") or attrs.get("facultad"),
            "capacidad": attrs.get("ConCapacidad"),
            "encargados": personal,
            "aliases": [str(a) for a in (e.get("aliases") or [])],
            "attrs": attrs,
        }

        fields_raw = [
            pieza.get("tipo"),
            pieza.get("nombre"),
            pieza.get("codigo"),
            str(bloque),
            (f"piso {pieza.get('piso')}" if pieza.get("piso") is not None else ""),
            pieza.get("direccionRelativa"),
            pieza.get("direccionOrientativa"),
            pieza.get("carrera"),
            pieza.get("facultad"),
            pieza.get("capacidad"),
            " ".join(pieza.get("aliases") or []),
            " ".join(pieza.get("encargados") or []),
            json.dumps(pieza.get("attrs", {}), ensure_ascii=False),
        ]
        pieza["_raw_text"] = " ".join([str(x) for x in fields_raw if x])
        pieza["_fulltext"] = norm(" ".join([str(x) for x in fields_raw if x]))

        _, name_display = dedup_type_name(pieza.get("tipo"), pieza.get("nombre"))
        pieza["_name_norm"] = norm(name_display)
        pieza["_aliases_norm"] = [norm(a) for a in (pieza.get("aliases") or [])]

        code_ft = norm(pieza.get("codigo") or "")
        name_ft = norm(name_display)
        aliases_ft = norm(" ".join(pieza.get("aliases") or []))
        encargados_ft = norm(" ".join(pieza.get("encargados") or []))
        rest_ft = pieza["_fulltext"]

        def rep(text, k):
            return " ".join([text] * k) if text else ""

        pieza["_bm25_text"] = " ".join([
            rep(code_ft, 6),
            rep(name_ft, 4),
            rep(encargados_ft, 4),
            rep(aliases_ft, 3),
            rest_ft,
        ]).strip()

        pieza["_flat_attrs"] = flatten_doc({"attrs": attrs})
        docs.append(pieza)

    return docs

def collect_attr_catalog(docs: List[Dict[str, Any]]) -> Dict[str, set]:
    values: Dict[str, set] = {}
    for d in docs:
        if d.get("kind") != "espacio":
            continue
        for path, val in (d.get("_flat_attrs") or {}).items():
            if path.startswith("_"):
                continue
            values.setdefault(path, set())

            def add_val(x):
                s = norm(x)
                if s:
                    values[path].add(s)

            if isinstance(val, list):
                for x in val:
                    add_val(x)
            elif isinstance(val, dict):
                for k in val.keys():
                    add_val(k)
            else:
                add_val(val)
    return values

# ============================ índices ============================

def build_bm25(docs: List[Dict[str, Any]]):
    tokens_all = [d["_bm25_text"].split() for d in docs]
    return BM25Okapi(tokens_all)

def build_embeddings(docs: List[Dict[str, Any]], model_name: str, M: int = 32):
    t0 = time.time()
    model = SentenceTransformer(model_name, device=DEVICE)
    corpus = [d["_raw_text"] for d in docs]
    vecs = model.encode(corpus, batch_size=64, normalize_embeddings=True, show_progress_bar=False)
    vecs = np.asarray(vecs, dtype="float32")
    d = vecs.shape[1]
    index = faiss.IndexHNSWFlat(d, M, faiss.METRIC_INNER_PRODUCT)
    faiss.normalize_L2(vecs)
    try:
        index.hnsw.efConstruction = 200
    except Exception:
        pass
    index.add(vecs)
    print(f"[TIMING][build_embeddings] total={(time.time() - t0):.3f}s")
    return model, index

# ============================ facets ============================

TIPO_SYNONYMS = {
    "lab": "laboratorio", "laboratorio": "laboratorio", "laboratorios": "laboratorio",
    "sala": "sala", "salas": "sala",
    "oficina": "oficina", "oficinas": "oficina",
    "departamento": "departamento", "departamentos": "departamento",
    "ducto": "ducto", "ductos": "ducto",
    "cuarto": "cuarto", "cuartos": "cuarto",
    "infraestructuravertical": "infraestructuravertical",
    "escalera": "infraestructuravertical", "escaleras": "infraestructuravertical",
    "ascensor": "infraestructuravertical", "ascensores": "infraestructuravertical",
    "bano": "baño", "banio": "baño", "banos": "baño", "banios": "baño",
    "baño": "baño", "baños": "baño",
    "sshh": "baño", "ssh": "baño", "wc": "baño",
    "servicio": "baño", "servicios": "baño",
    "higienico": "baño", "higienicos": "baño",
    "sanitario": "baño", "sanitarios": "baño",
}

def parse_piso(text: str) -> Optional[int]:
    ORD = {
        "primer": 1, "primero": 1, "primera": 1, "1er": 1, "1ro": 1, "1ra": 1,
        "segundo": 2, "segunda": 2, "2do": 2, "2da": 2,
        "tercero": 3, "tercera": 3, "tercer": 3, "3er": 3, "3ro": 3, "3ra": 3,
        "cuarto": 4, "cuarta": 4, "4to": 4, "4ta": 4,
        "quinto": 5, "quinta": 5, "5to": 5, "5ta": 5,
    }
    t = norm(text)
    m = re.search(r"\bpiso\s+(\d+)\b", t) or re.search(r"\b(\d+)\s*(?:do|da|º|°|o|a)?\s+piso\b", t)
    if m:
        return int(m.group(1))
    for w, n in ORD.items():
        if re.search(rf"\bpiso\s+{re.escape(w)}\b", t) or re.search(rf"\b{re.escape(w)}\s+piso\b", t):
            return n
    return None

def extract_person_query(text: str) -> str:
    t = norm(text)
    t = re.sub(r"\b(ing\.?|msc\.?|phd\.?|dr\.?)\b", " ", t)
    stop = {
        "que","sabe","saber","sabes","sobre","acerca","del","de","la","el","los","las","en",
        "cuanto","cuantos","cuantas","laboratorio","lab","laboratorios","espacio","espacios",
        "aula","aulas","sala","salas","quien","quienes","esta","encargado","encargada",
    }
    toks = [tok for tok in re.findall(r"[a-z0-9]+", t) if tok not in stop]
    if len(toks) >= 2:
        return " ".join(toks[-3:])
    return " ".join(toks)

def parse_facets_universal(query: str, docs: List[Dict[str, Any]], attr_catalog: Dict[str, set]) -> Dict[str, Any]:
    qn = norm(query)
    facets: Dict[str, Any] = {}
    toks = re.findall(r"[a-záéíóúñ\.]+", qn)

    tipos = []
    for tok in toks:
        base = TIPO_SYNONYMS.get(tok, None)
        if base and base not in tipos:
            tipos.append(base)
    if tipos:
        facets["tipo"] = tipos

    mb = re.search(r"\b(?:bloque|edificio)\s*([a-z]?\d+)\b", qn)
    if mb:
        b = mb.group(1).upper()
        if b.isdigit():
            b = "A" + b
        facets["bloque"] = b

    p = parse_piso(qn)
    if p is not None:
        facets["piso"] = p

    q_persona = extract_person_query(qn)
    people = set()
    if q_persona:
        for d in docs:
            for nm in (d.get("encargados") or []):
                if nm and jaccard_tokens(nm, q_persona) >= 0.3:
                    people.add(nm)
    if people:
        facets["encargado"] = sorted(people)

    carreras = set()
    facults = set()
    for d in docs:
        car = d.get("carrera")
        if isinstance(car, list):
            for x in car:
                if norm(x) in qn:
                    carreras.add(x)
        elif car and norm(car) in qn:
            carreras.add(car)
        fac = d.get("facultad")
        if fac and norm(fac) in qn:
            facults.add(fac)
    if carreras:
        facets["carrera"] = sorted(carreras)
    if facults:
        facets["facultad"] = sorted(facults)

    return facets

# ============================ híbrido ============================

@dataclass
class RAGConfig:
    top_k: int = 12
    pool_k: int = 250
    bm25_cand: int = 500
    emb_cand: int = 300
    min_score_bm25: float = 0.0
    min_score_emb: float = 0.0
    ctx_chars: int = 2800  # default
    use_embeddings: bool = True
    use_reranker: bool = True
    use_facets: bool = True
    generate_enabled: bool = True
    faiss_M: int = 32
    faiss_ef_search: int = 150
    rerank_top_k: int = 150
    rerank_alpha: float = 0.85
    rerank_bonus_weight: float = 0.15
    kind_prior_weight: float = 0.08
    person_prior_weight: float = 0.12
    enable_fuzzy_bonus: bool = True
    fuzzy_min_sim: float = 0.30
    min_best_emb_for_any: float = 0.45
    min_emb_gap: float = 0.05
    facet_mode: str = "soft"   # soft | strict
    strict_keep_min: int = 3
    debug: bool = False

def build_bm25_candidates(qn: str, bm25: BM25Okapi, k: int, min_score: float, cfg: RAGConfig, docs: List[Dict[str, Any]]) -> List[Tuple[int, float]]:
    toks = tokenize_simple(qn)
    if not toks:
        return []
    scores = bm25.get_scores(toks)
    if float(np.max(scores)) <= 0.0:
        return []
    if cfg.enable_fuzzy_bonus and toks:
        tokq = " ".join(toks)
        bonus = np.zeros_like(scores, dtype="float32")
        for i, d in enumerate(docs):
            smax = 0.0
            smax = max(smax, jaccard_trigram(tokq, d.get("_name_norm", "")))
            smax = max(smax, jaccard_trigram(tokq, norm(d.get("codigo") or "")))
            if smax >= cfg.fuzzy_min_sim:
                bonus[i] = 0.15 * smax
        scores = scores + bonus

    order = np.argsort(scores)[::-1]
    out = []
    for i in order:
        s = float(scores[i])
        if s < min_score:
            continue
        out.append((int(i), s))
        if len(out) >= k:
            break
    return out

def build_embeddings_candidates(query: str, emb_model, emb_index, k: int, min_score: float, cfg: RAGConfig) -> List[Tuple[int, float]]:
    if emb_index is None or emb_model is None:
        return []
    qn = norm(query.strip())
    q = ("query: " + qn) if "e5" in EMB_MODEL_NAME.lower() else qn
    qv = emb_model.encode([q], normalize_embeddings=True, show_progress_bar=False)[0].astype("float32")
    import faiss as _faiss
    _faiss.normalize_L2(qv.reshape(1, -1))
    D, I = emb_index.search(qv.reshape(1, -1), k)
    return [(int(idx), float(s)) for idx, s in zip(I[0].tolist(), D[0].tolist()) if idx >= 0 and s >= min_score]

def rrf_fuse(list_of_lists: List[List[Tuple[int, float]]], k: int, k_rrf: int = 60) -> List[int]:
    ranks: Dict[int, float] = {}
    for l in list_of_lists:
        for r, (idx, _) in enumerate(l):
            ranks[idx] = ranks.get(idx, 0.0) + 1.0 / (k_rrf + r + 1.0)
    fused = sorted(ranks.items(), key=lambda kv: -kv[1])
    return [i for i, _ in fused[:k]]

def snippet_compacto(d: Dict[str, Any]) -> Optional[str]:
    if d.get("kind") != "espacio":
        return None
    partes = []
    if d.get("codigo"):
        partes.append(f"Código={d['codigo']}")
    _, name_display = dedup_type_name(d.get("tipo"), d.get("nombre"))
    if name_display:
        partes.append(f"Nombre={name_display}")

    ubic = []
    if d.get("bloque"):
        ubic.append(f"Bloque {d['bloque']}")
    if d.get("piso") is not None:
        ubic.append(f"Piso {d['piso']}")
    if ubic:
        partes.append("Ubicación=" + " ; ".join(ubic))

    dirp = []
    if d.get("direccionRelativa"):
        dirp.append(d["direccionRelativa"])
    if d.get("direccionOrientativa"):
        dirp.append(d["direccionOrientativa"])
    if dirp:
        partes.append("Dirección=" + " ; ".join(dirp))

    if d.get("carrera"):
        partes.append("Carrera=" + ("; ".join(d["carrera"]) if isinstance(d["carrera"], list) else str(d["carrera"])))
    if d.get("facultad"):
        partes.append(f"Facultad={d['facultad']}")
    if d.get("capacidad"):
        partes.append(f"Capacidad={d['capacidad']}")
    if d.get("encargados"):
        partes.append("Encargados=" + "; ".join(d["encargados"]))

    return " | ".join(partes)

def lexical_bonus(q: str, d: Dict[str, Any]) -> float:
    qn = norm(q)
    name = d.get("_name_norm") or ""
    code = norm(d.get("codigo") or "")
    return max([jaccard_trigram(qn, name), jaccard_trigram(qn, code)])

def value_matches(doc_val, wanted_list_norm: List[str]) -> bool:
    if not wanted_list_norm:
        return True
    if doc_val is None:
        return False
    cand = [norm(x) for x in (doc_val if isinstance(doc_val, list) else [doc_val])]
    wanted = set(norm(v) for v in wanted_list_norm)
    return any(v in wanted for v in cand)

def doc_facet_match_score(d: Dict[str, Any], facets: Dict[str, Any]) -> float:
    wants = 0
    hits = 0

    if "tipo" in facets:
        wants += 1
        hits += int(norm(d.get("tipo")) in [norm(x) for x in facets["tipo"]])

    if "bloque" in facets:
        wants += 1
        hits += int(norm(d.get("bloque")) == norm(facets["bloque"]))

    if "piso" in facets:
        wants += 1
        try:
            hits += int(int(d.get("piso")) == int(facets["piso"]))
        except Exception:
            pass

    if "carrera" in facets:
        wants += 1
        hits += int(value_matches(d.get("carrera"), facets["carrera"]))

    if "facultad" in facets:
        wants += 1
        hits += int(value_matches(d.get("facultad"), facets["facultad"]))

    if "encargado" in facets:
        wants += 1
        encs = [norm(x) for x in (d.get("encargados") or [])]
        qenc = [norm(x) for x in facets["encargado"]]
        hits += int(any(x in encs for x in qenc))

    if wants == 0:
        return 0.0
    return hits / wants

# ============================ engine ============================

class RAGEngine:
    def __init__(self, cfg: RAGConfig = None, evaluation_mode: bool = False):
        self.cfg = cfg or RAGConfig()
        self.evaluation_mode = evaluation_mode

        self.docs: List[Dict[str, Any]] = []
        self.attr_catalog: Dict[str, set] = {}
        self.bm25 = None
        self.emb_model = None
        self.emb_index = None
        self.reranker = None
        self.llm = None

        self.prompt_agent = Path(PROMPT_PATH_AGENT).read_text(encoding="utf-8") if Path(PROMPT_PATH_AGENT).exists() else "{{HECHOS}}\n\nPregunta: {{PREGUNTA}}"
        self.prompt_eval  = Path(PROMPT_PATH_EVAL).read_text(encoding="utf-8") if Path(PROMPT_PATH_EVAL).exists() else "{{HECHOS}}\n\nPregunta: {{PREGUNTA}}"

        self.last_evidence_map: Dict[str, Dict[str, Any]] = {}
        self.last_context_eids: List[str] = []
        self.last_facets: Dict[str, Any] = {}
        self.last_retriever_by_doc: Dict[int, str] = {}

        self.rebuild_all()

    # --- toggles con carga/descarga segura ---
    def set_reranker_enabled(self, enabled: bool) -> str:
        enabled = bool(enabled)
        self.cfg.use_reranker = enabled
        if not enabled:
            self.reranker = None
            return "[OK] rerank=OFF"
        if self.reranker is None:
            try:
                self.reranker = CrossEncoder(RERANKER_MODEL, device=DEVICE)
            except Exception as e:
                self.cfg.use_reranker = False
                self.reranker = None
                return f"[ERR] no se pudo cargar reranker: {e}"
        return "[OK] rerank=ON"

    def set_facets_enabled(self, enabled: bool) -> str:
        self.cfg.use_facets = bool(enabled)
        return f"[OK] faceted={'ON' if self.cfg.use_facets else 'OFF'}"

    def rebuild_all(self):
        t0 = time.time()
        self.docs = load_docs(JSON_PATH)
        self.attr_catalog = collect_attr_catalog(self.docs)
        self.bm25 = build_bm25(self.docs)

        if self.cfg.use_embeddings:
            self.emb_model, self.emb_index = build_embeddings(self.docs, EMB_MODEL_NAME, M=self.cfg.faiss_M)
            try:
                if hasattr(self.emb_index, "hnsw"):
                    self.emb_index.hnsw.efSearch = self.cfg.faiss_ef_search
            except Exception:
                pass
        else:
            self.emb_model, self.emb_index = None, None

        if self.cfg.use_reranker:
            try:
                self.reranker = CrossEncoder(RERANKER_MODEL, device=DEVICE)
            except Exception:
                self.reranker = None
                self.cfg.use_reranker = False
        else:
            self.reranker = None

        if Llama is not None and Path(GGUF_PATH).exists():
            self.llm = Llama(
                model_path=str(GGUF_PATH),
                n_ctx=N_CTX,
                n_threads=N_THREADS,
                n_gpu_layers=N_GPU_LAYERS,
                logits_all=False,
                verbose=False,
            )
        else:
            self.llm = None

        print(f"[OK] Índices listos en {time.time() - t0:.2f}s. Docs: {len(self.docs)} | LLM={'ON' if self.llm else 'OFF'} | Rerank={'ON' if self.cfg.use_reranker else 'OFF'}")

    def pin_exact_ids(self, query: str) -> List[int]:
        q_raw = query or ""
        qn = norm(q_raw)
        q_codes = set(t.lower() for t in CODE_RE.findall(q_raw))
        hits = []
        for i, d in enumerate(self.docs):
            code = (d.get("codigo") or "").lower().strip()
            if code and code in q_codes:
                hits.append(i)
                continue
            for al in (d.get("_aliases_norm") or []):
                if al and (al in qn or jaccard_trigram(al, qn) >= 0.82):
                    hits.append(i)
                    break
        return list(dict.fromkeys(hits))

    def retrieve_pool(self, query: str):
        qn = norm(query)
        bm25_list = build_bm25_candidates(qn, self.bm25, self.cfg.bm25_cand, self.cfg.min_score_bm25, self.cfg, self.docs)
        emb_list = build_embeddings_candidates(query, self.emb_model, self.emb_index, self.cfg.emb_cand, self.cfg.min_score_emb, self.cfg) if self.cfg.use_embeddings else []

        bm25_set = {idx for idx, _ in bm25_list}
        emb_set  = {idx for idx, _ in emb_list}

        retr: Dict[int, str] = {}
        for idx in (bm25_set | emb_set):
            in_bm25 = idx in bm25_set
            in_emb  = idx in emb_set
            if in_bm25 and in_emb:
                retr[idx] = "BM25 + E5+FAISS"
            elif in_bm25:
                retr[idx] = "BM25"
            else:
                retr[idx] = "E5+FAISS"

        emb_scores = [s for _, s in emb_list]
        best_emb = max(emb_scores, default=0.0)
        if emb_scores:
            sorted_emb = sorted(emb_scores, reverse=True)
            median_emb = sorted_emb[len(sorted_emb)//2]
            gap = best_emb - median_emb
        else:
            median_emb, gap = 0.0, 0.0

        if (not bm25_list) and emb_list:
            if best_emb < self.cfg.min_best_emb_for_any or gap < self.cfg.min_emb_gap:
                return [], {}, [], {"bm25_n": len(bm25_list), "emb_n": len(emb_list), "best_emb": best_emb, "median_emb": median_emb, "gap": gap}

        if not bm25_list and not emb_list:
            return [], {}, [], {"bm25_n": 0, "emb_n": 0, "best_emb": 0.0, "median_emb": 0.0, "gap": 0.0}

        fused_ids = rrf_fuse([bm25_list, emb_list], k=self.cfg.pool_k, k_rrf=60)
        pins = self.pin_exact_ids(query)
        for idx in pins:
            retr[idx] = retr.get(idx, "BM25")
        pool = list(dict.fromkeys(pins + fused_ids))[: self.cfg.pool_k]

        facets = parse_facets_universal(query, self.docs, self.attr_catalog) if self.cfg.use_facets else {}

        self.last_retriever_by_doc = retr
        stats = {"bm25_n": len(bm25_list), "emb_n": len(emb_list), "pins_n": len(pins), "pool_n": len(pool)}
        return pool, facets, pins, stats

    def rerank_order(self, query: str, idxs: List[int]) -> List[int]:
        if not self.cfg.use_reranker or not self.reranker or not idxs:
            return idxs
        R = min(self.cfg.rerank_top_k, len(idxs))
        to_rerank = idxs[:R]
        rest = idxs[R:]

        pairs = [(query, self.docs[i].get("_raw_text", "")) for i in to_rerank]
        s = np.asarray(self.reranker.predict(pairs, batch_size=(32 if DEVICE == "cuda" else 8)), dtype="float32")
        ce = (s - s.min()) / (s.max() - s.min() + 1e-8) if len(s) > 1 else np.array([1.0], dtype="float32")

        bonus = np.zeros_like(ce)
        prior = np.zeros_like(ce)
        for k, idx in enumerate(to_rerank):
            d = self.docs[idx]
            bonus[k] = lexical_bonus(query, d)
            prior[k] = self.cfg.kind_prior_weight

        final = self.cfg.rerank_alpha * ce + self.cfg.rerank_bonus_weight * bonus + prior
        order = np.argsort(final)[::-1]
        return [to_rerank[i] for i in order] + rest

    def apply_facets_mode(self, ranked_pool: List[int], facets: Dict[str, Any], pins: List[int]) -> List[int]:
        if not facets or (not self.cfg.use_facets):
            return list(dict.fromkeys(pins + ranked_pool))

        scored = [(i, doc_facet_match_score(self.docs[i], facets)) for i in ranked_pool]
        scored.sort(key=lambda kv: kv[1], reverse=True)
        ordered = [i for i, _ in scored]

        if self.cfg.facet_mode == "soft":
            return list(dict.fromkeys(pins + ordered))

        strict = [i for i, s in scored if s >= 0.999]
        strict = list(dict.fromkeys(pins + strict))
        if len(strict) >= max(1, self.cfg.strict_keep_min):
            return strict
        return list(dict.fromkeys(pins + ordered))

    def build_context_with_evidence(self, ordered_idxs: List[int], facets: Dict[str, Any]) -> str:
        self.last_evidence_map = {}
        self.last_context_eids = []
        self.last_facets = dict(facets or {})

        ctx_lines = []
        total = 0
        eid = 1

        max_docs = max(1, int(self.cfg.top_k))
        for idx in ordered_idxs[:max_docs]:
            s = snippet_compacto(self.docs[idx])
            if not s:
                continue
            line = f"[E{eid}] {s}"
            if total + len(line) + 1 > int(self.cfg.ctx_chars):
                break

            d = self.docs[idx]
            key = f"E{eid}"
            self.last_evidence_map[key] = {
                "eid": key,
                "doc_idx": idx,
                "snippet": s,
                "codigo": d.get("codigo"),
                "nombre": d.get("nombre"),
                "tipo": d.get("tipo"),
                "bloque": d.get("bloque"),
                "piso": d.get("piso"),
                "retriever": self.last_retriever_by_doc.get(idx, "—"),
            }
            self.last_context_eids.append(key)
            ctx_lines.append(line)
            total += len(line) + 1
            eid += 1

        head = ""
        if (not self.evaluation_mode) and facets:
            head = f"[FACETS] activos={list(facets.keys())}\n"
        return head + "\n".join(ctx_lines)

    def build_prompt(self, context_text: str, query: str) -> str:
        tpl = self.prompt_eval if self.evaluation_mode else self.prompt_agent
        return tpl.replace("{{HECHOS}}", context_text).replace("{{PREGUNTA}}", query)

    def generate(self, prompt: str) -> str:
        if not self.llm:
            return "Gen OFF (LLM no disponible)."
        temp = TEMPERATURE_EVAL if self.evaluation_mode else TEMPERATURE_AGENT
        out = self.llm(prompt=prompt, max_tokens=MAX_TOKENS, temperature=temp, top_p=1.0, repeat_penalty=1.1)
        return out["choices"][0]["text"].strip()

    def audit_evidence(self, eids: List[str], facets: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
        out = {}
        want_bloque = norm(facets.get("bloque")) if facets.get("bloque") else None
        want_piso = facets.get("piso", None)
        want_tipos = [norm(x) for x in facets.get("tipo", [])] if facets.get("tipo") else []

        for eid in eids:
            m = self.last_evidence_map.get(eid, {})
            ok = True
            reasons = []

            if want_bloque and norm(m.get("bloque")) != want_bloque:
                ok = False
                reasons.append(f"bloque={m.get('bloque')}≠{facets.get('bloque')}")

            if want_piso is not None and m.get("piso") is not None:
                try:
                    if int(m.get("piso")) != int(want_piso):
                        ok = False
                        reasons.append(f"piso={m.get('piso')}≠{want_piso}")
                except Exception:
                    pass

            if want_tipos and norm(m.get("tipo")) not in want_tipos:
                ok = False
                reasons.append(f"tipo={m.get('tipo')}∉{facets.get('tipo')}")

            out[eid] = {"ok": ok, "reasons": reasons}
        return out

    def _append_citations_inline(self, text: str, used: List[str]) -> str:
        used = [u for u in used if u in self.last_evidence_map]
        if not used:
            return (text or "").strip()
        cites = " ".join([f"[{u}]" for u in used])
        t = (text or "").rstrip()

        if t.endswith(cites):
            return t

        t = re.sub(r"(\s*(\[[eE]\d+\]\s*)+)$", "", t).rstrip()
        return (t + ("\n\n" if t else "") + cites).strip()

    def answer_stream(self, query: str):
        q = (query or "").strip()
        if not q:
            yield {"status": "Escribe una consulta.", "final": True, "answer": "Escribe una consulta.", "evidence_table": [], "facets": {}}
            return

        yield {"status": "1/5 Normalizando consulta…", "final": False}
        yield {"status": "2/5 Recuperando candidatos (BM25/embeddings)…", "final": False}
        pool, facets, pins, _stats = self.retrieve_pool(q)

        if not pool and not pins:
            yield {
                "status": "Listo. (sin resultados)",
                "final": True,
                "answer": "No consta en el contexto lo que se pregunta.",
                "evidence_table": [],
                "facets": facets,
            }
            return

        yield {"status": "3/5 Rerank + facetas…", "final": False}
        ranked = self.rerank_order(q, pool)
        ordered = self.apply_facets_mode(ranked, facets, pins)

        yield {"status": "4/5 Construyendo evidencias…", "final": False}
        ctx = self.build_context_with_evidence(ordered, facets)

        if not self.cfg.generate_enabled:
            used = self.last_context_eids[: min(8, len(self.last_context_eids))]
            audit = self.audit_evidence(used, facets) if facets else {eid: {"ok": True, "reasons": []} for eid in used}
            table = []
            for eid in used:
                m = self.last_evidence_map.get(eid, {})
                a = audit.get(eid, {"ok": True, "reasons": []})
                table.append({
                    "EID": eid,
                    "OK": "✅" if a["ok"] else "⚠",
                    "Motivos": "; ".join(a["reasons"]) if a["reasons"] else "",
                    "Retriever": m.get("retriever"),
                    "Bloque": m.get("bloque"),
                    "Piso": m.get("piso"),
                    "Tipo": m.get("tipo"),
                    "Código": m.get("codigo"),
                    "Nombre": m.get("nombre"),
                    "Snippet": m.get("snippet"),
                })

            ans = self._append_citations_inline(ctx, used)
            yield {"status": "Listo. (gen OFF)", "final": True, "answer": ans, "evidence_table": table, "facets": facets}
            return

        yield {"status": "5/5 Generando respuesta (LLM)…", "final": False}
        prompt = self.build_prompt(ctx, q)
        text = self.generate(prompt)

        used_nums = sorted(set(re.findall(r"\bE(\d+)\b", text)))
        used = [f"E{n}" for n in used_nums if f"E{n}" in self.last_evidence_map]
        if not used:
            used = self.last_context_eids[: min(8, len(self.last_context_eids))]

        audit = self.audit_evidence(used, facets) if facets else {eid: {"ok": True, "reasons": []} for eid in used}
        table = []
        for eid in used:
            m = self.last_evidence_map.get(eid, {})
            a = audit.get(eid, {"ok": True, "reasons": []})
            table.append({
                "EID": eid,
                "OK": "✅" if a["ok"] else "⚠",
                "Motivos": "; ".join(a["reasons"]) if a["reasons"] else "",
                "Retriever": m.get("retriever"),
                "Bloque": m.get("bloque"),
                "Piso": m.get("piso"),
                "Tipo": m.get("tipo"),
                "Código": m.get("codigo"),
                "Nombre": m.get("nombre"),
                "Snippet": m.get("snippet"),
            })

        text = self._append_citations_inline(text, used)
        yield {"status": "Listo.", "final": True, "answer": text, "evidence_table": table, "facets": facets}

# ============================ comandos (para GUI) ============================

HELP_TEXT = """Comandos:
  :help               -> muestra ayuda
  :mode agent|eval    -> modo agente / evaluación
  :facet soft|strict  -> faceted soft o strict
  :facets on|off      -> encender/apagar faceted (uso de facetas)
  :rerank on|off      -> encender/apagar rerank (CrossEncoder)
  :k N                -> top_k
  :gen on|off         -> LLM ON/OFF
  :exit               -> marcar salida (no apaga servidor; engine queda cargado)
"""

def parse_command(line: str) -> Tuple[str, List[str]]:
    parts = line.strip().split()
    cmd = parts[0].lower()
    args = parts[1:]
    return cmd, args

def _parse_on_off(s: str) -> Optional[bool]:
    s = (s or "").strip().lower()
    if s in {"on", "1", "true", "si", "sí", "enable", "enabled"}:
        return True
    if s in {"off", "0", "false", "no", "disable", "disabled"}:
        return False
    return None

def handle_command(engine: RAGEngine, line: str, ui: str = "gui") -> str:
    cmd, args = parse_command(line)

    if cmd == ":help":
        return HELP_TEXT

    if cmd == ":config":
        return str(engine.cfg)

    if cmd == ":rebuild":
        engine.rebuild_all()
        return "[OK] rebuild completo"

    if cmd == ":mode" and args:
        v = args[0].lower()
        if v in {"agent", "eval"}:
            engine.evaluation_mode = (v == "eval")
            return f"[OK] modo={'EVALUACIÓN' if engine.evaluation_mode else 'AGENTE'}"
        return "[ERR] Uso: :mode agent|eval"

    if cmd == ":facet" and args:
        v = args[0].lower()
        if v in {"soft", "strict"}:
            engine.cfg.facet_mode = v
            return f"[OK] facet_mode={v}"
        return "[ERR] Uso: :facet soft|strict"

    if cmd == ":facets" and args:
        flag = _parse_on_off(args[0])
        if flag is None:
            return "[ERR] Uso: :facets on|off"
        return engine.set_facets_enabled(flag)

    if cmd == ":rerank" and args:
        flag = _parse_on_off(args[0])
        if flag is None:
            return "[ERR] Uso: :rerank on|off"
        return engine.set_reranker_enabled(flag)

    if cmd == ":k" and args and args[0].isdigit():
        engine.cfg.top_k = int(args[0])
        return f"[OK] top_k={engine.cfg.top_k}"

    if cmd == ":gen" and args:
        flag = _parse_on_off(args[0])
        if flag is None:
            return "[ERR] Uso: :gen on|off"
        engine.cfg.generate_enabled = flag
        return f"[OK] gen={'ON' if engine.cfg.generate_enabled else 'OFF'}"

    if cmd == ":exit":
        return "__EXIT__"

    return "[ERR] Comando no reconocido. Usa :help"

# ============================ GUI ============================

GUI_CSS = """
#app-title { font-weight: 800; font-size: 18px; margin: 0 0 6px 0; }
#app-sub { opacity: .8; font-size: 12px; margin: 0 0 10px 0; }
"""

def launch_gui(share: bool = False, server_port: int = 7860):
    global engine
    if engine is None:
        engine = RAGEngine(cfg=RAGConfig())

    def hist_append_messages(hist, user_msg=None, bot_msg=None):
        h = list(hist or [])
        if user_msg is not None:
            h.append({"role": "user", "content": str(user_msg)})
        if bot_msg is not None:
            h.append({"role": "assistant", "content": str(bot_msg)})
        return h

    def set_last_assistant(hist, content: str):
        h = list(hist or [])
        if h and h[-1].get("role") == "assistant":
            h[-1]["content"] = str(content)
        else:
            h.append({"role": "assistant", "content": str(content)})
        return h

    def run_stream(user_in, hist, facet_mode_in, top_k_in, detail_in, gen_in):
        t = (user_in or "").strip()
        if not t:
            yield hist, "**Estado:** Escribe una consulta.", [], ""
            return

        if t.startswith(":"):
            msg = handle_command(engine, t, ui="gui")
            if msg == "__EXIT__":
                msg = "Sesión marcada como finalizada. Puedes cerrar la pestaña. (engine queda cargado.)"
            h = hist_append_messages(hist, user_msg=t, bot_msg=msg)
            yield h, "**Estado:** Listo.", [], ""
            return

        engine.cfg.facet_mode = str(facet_mode_in)
        engine.cfg.top_k = int(top_k_in)
        engine.cfg.generate_enabled = bool(gen_in)
        engine.cfg.ctx_chars = clamp_int(detail_in, SAFE_CTX_MIN, SAFE_CTX_MAX, engine.cfg.ctx_chars)

        h = hist_append_messages(hist, user_msg=t, bot_msg="Procesando…")
        yield h, "**Estado:** Iniciando…", [], f"Detalle efectivo: {engine.cfg.ctx_chars} chars (cap={SAFE_CTX_MAX})"

        last_rows = []
        for step in engine.answer_stream(t):
            status = step.get("status", "…")
            is_final = bool(step.get("final", False))

            if not is_final:
                yield h, f"**Estado:** {status}", last_rows, f"Detalle efectivo: {engine.cfg.ctx_chars} chars (cap={SAFE_CTX_MAX})"
                continue

            ans = (step.get("answer") or "").strip()
            facets = step.get("facets") or {}
            if facets:
                ans = ans + f"\n\nFacetas: {facets}"

            h = set_last_assistant(h, ans)

            table = (step.get("evidence_table") or [])[: int(engine.cfg.top_k)]
            rows = []
            for r in table:
                rows.append([
                    str(r.get("EID","")),
                    str(r.get("OK","")),
                    str(r.get("Motivos","")),
                    str(r.get("Retriever","")),
                    str(r.get("Bloque","")),
                    str(r.get("Piso","")),
                    str(r.get("Tipo","")),
                    str(r.get("Código","")),
                    str(r.get("Nombre","")),
                ])

            last_rows = rows
            yield h, f"**Estado:** {status}", rows, f"Detalle efectivo: {engine.cfg.ctx_chars} chars (cap={SAFE_CTX_MAX})"

    with gr.Blocks(title="Agente LLM de Localización de espacios del Edificio 2 - UNL") as demo:
        gr.HTML(f"<style>{GUI_CSS}</style>")
        gr.Markdown("## Agente LLM de Localización de espacios del Edificio 2 - UNL")
        gr.Markdown("Pregunta → respuesta con citas. Ayuda: `:help`")

        state_hist = gr.State([])

        with gr.Row():
            with gr.Column(scale=3):
                try:
                    chat = gr.Chatbot(label="Chat", height=360, type="messages")
                except TypeError:
                    chat = gr.Chatbot(label="Chat", height=360)

                user_text = gr.Textbox(label="Escribe tu consulta", lines=2)
                with gr.Row():
                    btn_send = gr.Button("Enviar")
                    btn_clear = gr.Button("Limpiar")

                status = gr.Markdown("**Estado:** Listo.")
                detail_eff = gr.Markdown(f"Detalle efectivo: {RAGConfig().ctx_chars} chars (cap={SAFE_CTX_MAX})")

            with gr.Column(scale=2):
                with gr.Accordion("Configuración", open=True):
                    facet_mode = gr.Radio(choices=["soft", "strict"], value="soft", label="Faceted (modo)")
                    top_k = gr.Slider(1, 30, value=12, step=1, label="Top-K (evidencias)")
                    detail = gr.Slider(800, 6000, value=2800, step=100, label="Detalle (evidencias)")
                    gen_on = gr.Checkbox(value=True, label="Generar con LLM")

                gr.Markdown("### Evidencias usadas")
                ev_df = gr.Dataframe(
                    headers=["EID","OK","Motivos","Retriever","Bloque","Piso","Tipo","Código","Nombre"],
                    datatype=["str","str","str","str","str","str","str","str","str"],
                    row_count=0,
                    col_count=(9, "fixed"),
                    interactive=False,
                )

        def clear_all():
            return [], [], "**Estado:** Listo.", [], f"Detalle efectivo: {engine.cfg.ctx_chars if engine else RAGConfig().ctx_chars} chars (cap={SAFE_CTX_MAX})"

        btn_send.click(
            run_stream,
            inputs=[user_text, state_hist, facet_mode, top_k, detail, gen_on],
            outputs=[chat, status, ev_df, detail_eff],
        )
        user_text.submit(
            run_stream,
            inputs=[user_text, state_hist, facet_mode, top_k, detail, gen_on],
            outputs=[chat, status, ev_df, detail_eff],
        )

        def sync_hist(chat_value):
            return chat_value

        chat.change(sync_hist, inputs=[chat], outputs=[state_hist])

        btn_clear.click(
            clear_all,
            outputs=[chat, state_hist, status, ev_df, detail_eff],
        )

    try:
        demo.queue(concurrency_count=1, max_size=64)
    except Exception:
        pass

    demo.launch(share=share, server_port=server_port)

# ============================ boot ============================

if __name__ == "__main__":
    launch_gui(share=False, server_port=7860)


[RAG] DEVICE = cuda
[TIMING][build_embeddings] total=7.750s
[OK] Índices listos en 12.68s. Docs: 26 | LLM=ON | Rerank=ON




* Running on local URL:  http://127.0.0.1:7860
* To create a public link, set `share=True` in `launch()`.


In [None]:
# ==================== BLOQUE 2: Evaluación (ROUGE-1 + Faithfulness) ====================
# Compatible con tu engine actual (RAGEngine(cfg=..., evaluation_mode=...))
# Requiere que este bloque se ejecute DESPUÉS de haber definido RAGConfig, RAGEngine, DEVICE, JSON_PATH, etc.

import json, csv, unicodedata, re
from collections import defaultdict, Counter
from typing import Any

# -------------------- archivo de evaluación --------------------
EVAL_FILE = "eval.jsonl"

CSV_OUT_DETALLE        = "resultados_eval_detallado.csv"
CSV_OUT_RESUMEN_GLOBAL = "resultados_eval_resumen_global.csv"
CSV_OUT_RESUMEN_TIPO   = "resultados_eval_resumen_por_tipo.csv"
CSV_OUT_DEBUG          = "resultados_eval_debug.csv"  # opcional (incluye ctx)

# ---------------------------------------------------------
# 1) Carga de casos de evaluación
# ---------------------------------------------------------
eval_items = []
with open(EVAL_FILE, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        eval_items.append(json.loads(line))

print(f"[EVAL] Casos cargados: {len(eval_items)}")

# ---------------------------------------------------------
# 2) Instanciar motor en modo evaluación
# ---------------------------------------------------------
# NOTA:
# - Si quieres evaluar el "sistema final", pon use_reranker=True y use_facets=True
# - Si quieres evaluar sin esos módulos (ablation), deja False como abajo.
cfg_eval = RAGConfig(
    top_k=12,
    pool_k=250,
    bm25_cand=500,
    emb_cand=300,
    min_score_bm25=0.0,
    min_score_emb=0.0,
    ctx_chars=2200,
    use_embeddings=True,
    use_reranker=True,     # <- cambia a True si evalúas con reranker
    use_facets=True,       # <- cambia a True si evalúas con facetas
    generate_enabled=True,  # <- True para generar con LLM
    rerank_top_k=(150 if DEVICE == "cuda" else 60),
    rerank_alpha=0.85,
    rerank_bonus_weight=0.15,
    kind_prior_weight=0.08,
    person_prior_weight=0.12,
    faiss_M=32,
    faiss_ef_search=150,
    facet_mode="strict",
    strict_keep_min=3,      # <- (antes lo tenías como min_faceted; en tu engine es strict_keep_min)
    debug=False,
)

engine_eval = RAGEngine(cfg=cfg_eval, evaluation_mode=True)

if not getattr(engine_eval, "llm", None):
    print("[WARN] LLM no disponible (no se cargó GGUF / llama_cpp). La evaluación generará 'Gen OFF'.")

# ---------------------------------------------------------
# 3) Utilidades de normalización + ROUGE-1 clásico
# ---------------------------------------------------------
def strip_accents(s: str) -> str:
    s = unicodedata.normalize("NFD", s)
    return "".join(c for c in s if unicodedata.category(c) != "Mn")

def normalize_basic(text: str) -> str:
    """
    Normalización ligera para ROUGE-1:
    - lower
    - sin acentos
    - sin puntuación (deja letras/números/espacios)
    - colapsa espacios
    - quita 'respuesta:' al inicio (si aparece)
    """
    if text is None:
        return ""
    t = str(text).strip().lower()
    t = strip_accents(t)
    t = re.sub(r"^respuesta\s*:\s*", "", t)
    t = re.sub(r"[^a-z0-9áéíóúñ]+", " ", t)
    t = re.sub(r"\s+", " ", t).strip()
    return t

def rouge1_scores(pred: str, gold: str):
    """
    ROUGE-1 clásico (unigrams):
    Devuelve: precision, recall, f1, pred_only_tokens, gold_only_tokens
    """
    ng = normalize_basic(gold)
    npred = normalize_basic(pred)

    gold_tokens = ng.split() if ng else []
    pred_tokens = npred.split() if npred else []

    if not gold_tokens and not pred_tokens:
        return 1.0, 1.0, 1.0, [], []
    if not gold_tokens or not pred_tokens:
        return 0.0, 0.0, 0.0, pred_tokens, gold_tokens

    gold_counts = Counter(gold_tokens)
    pred_counts = Counter(pred_tokens)

    overlap = 0
    for tok, c in pred_counts.items():
        overlap += min(c, gold_counts.get(tok, 0))

    precision = overlap / sum(pred_counts.values()) if pred_counts else 0.0
    recall    = overlap / sum(gold_counts.values()) if gold_counts else 0.0
    f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0

    pred_only = sorted((pred_counts - gold_counts).elements())
    gold_only = sorted((gold_counts - pred_counts).elements())

    return precision, recall, f1, pred_only, gold_only

# ---------------------------------------------------------
# 4) Faithfulness = cobertura de evidencia en contexto
# ---------------------------------------------------------
STOPWORDS = {
    "el","la","los","las","un","una","unos","unas",
    "de","del","al","a","y","o","u","en","por","para","con",
    "que","qué","quien","quién","donde","dónde","como","cómo",
    "es","son","está","estan","esta","están",
    "laboratorio","laboratorios","sala","salas","departamento","departamentos",
    "baño","banio","banos","banios","cuarto","cuartos",
    "piso","pisos","bloque","bloques","lado","lados",
    "tiene","hay","se","encuentra","queda",
    "si","sí","no","respuesta"
}

def tokenize_content(text: str):
    t = normalize_basic(text)
    toks = t.split()
    return [tok for tok in toks if tok not in STOPWORDS]

def faithfulness_score(pred: str, context: str):
    """
    Faithfulness simple:
    score = (#tokens_de_respuesta_que_aparecen_en_contexto) / (#tokens_de_respuesta)
    Devuelve: score, supported_tokens, unsupported_tokens
    """
    ans_tokens = tokenize_content(pred)
    ctx_tokens = set(tokenize_content(context))

    if not ans_tokens:
        return 1.0, [], []

    supported = [t for t in ans_tokens if t in ctx_tokens]
    unsupported = [t for t in ans_tokens if t not in ctx_tokens]
    score = len(supported) / len(ans_tokens) if ans_tokens else 0.0
    return score, supported, unsupported

# ---------------------------------------------------------
# 5) Bucle de evaluación (ejecuta pipeline real del engine)
# ---------------------------------------------------------
global_rouge_f1_sum = 0.0
global_rouge_p_sum  = 0.0
global_faith_sum    = 0.0
n_cases = 0

per_type_scores = defaultdict(list)  # type -> list[(precision, f1, faith)]
rows_for_csv = []
debug_rows_for_csv = []

for idx, item in enumerate(eval_items, start=1):
    qid   = item.get("id", f"case_{idx}")
    q     = item.get("question") or item.get("query") or ""
    gold  = item.get("gold", "")
    qtype = item.get("type", "OTRO")

    print(f"\n==================== Caso {idx} / {len(eval_items)} ====================")
    print(f"ID: {qid}")
    print(f"PREGUNTA: {q}")
    print(f"GOLD: {gold}")

    # ---- pipeline manual para capturar contexto + pred ----
    pool, facets, pins, _stats = engine_eval.retrieve_pool(q)
    ranked = engine_eval.rerank_order(q, pool)
    ordered = engine_eval.apply_facets_mode(ranked, facets, pins)
    ctx = engine_eval.build_context_with_evidence(ordered, facets)
    prompt = engine_eval.build_prompt(ctx, q)
    pred = engine_eval.generate(prompt)

    # ---- métricas ----
    r_p, r_r, r_f1, pred_only, gold_only = rouge1_scores(pred, gold)
    faith, supported_tokens, unsupported_tokens = faithfulness_score(pred, ctx)

    global_rouge_f1_sum += r_f1
    global_rouge_p_sum  += r_p
    global_faith_sum    += faith
    n_cases += 1

    per_type_scores[qtype].append((r_p, r_f1, faith))

    print(f"ROUGE-1 precision: {r_p:.4f}")
    print(f"ROUGE-1 F1       : {r_f1:.4f}")
    print(f"Faithfulness     : {faith:.4f}")
    if unsupported_tokens:
        print(f"TOKENS NO RESPALDADOS: {unsupported_tokens}")

    avg_r_f1 = global_rouge_f1_sum / n_cases
    avg_r_p  = global_rouge_p_sum  / n_cases
    avg_f    = global_faith_sum    / n_cases
    print("----------------------------------------------")
    print(f"PROMEDIO ROUGE-1 PRECISION HASTA AHORA: {avg_r_p:.4f}")
    print(f"PROMEDIO ROUGE-1 F1 HASTA AHORA       : {avg_r_f1:.4f}")
    print(f"PROMEDIO FAITHFULNESS HASTA AHORA     : {avg_f:.4f}")

    rows_for_csv.append({
        "id": qid,
        "type": qtype,
        "question": q,
        "gold": gold,
        "pred": pred,
        "rouge1_precision": r_p,
        "rouge1_recall": r_r,
        "rouge1_f1": r_f1,
        "faithfulness": faith,
        "tiene_tokens_no_respaldo": int(len(unsupported_tokens) > 0),
    })

    debug_rows_for_csv.append({
        "id": qid,
        "type": qtype,
        "question": q,
        "gold": gold,
        "pred": pred,
        "rouge1_f1": r_f1,
        "faithfulness": faith,
        "rouge_pred_only_tokens": " ".join(pred_only),
        "rouge_gold_only_tokens": " ".join(gold_only),
        "faith_supported_tokens": " ".join(supported_tokens),
        "faith_unsupported_tokens": " ".join(unsupported_tokens),
        "context_used": ctx,
        "facets_detected": json.dumps(facets, ensure_ascii=False),
        "stats": json.dumps(_stats, ensure_ascii=False),
    })

# ---------------------------------------------------------
# 6) Resumen global y por tipo
# ---------------------------------------------------------
print("\n==================== RESUMEN FINAL ====================")
print(f"Casos evaluados: {n_cases}")
avg_rouge_f1 = global_rouge_f1_sum / n_cases if n_cases else 0.0
avg_rouge_p  = global_rouge_p_sum  / n_cases if n_cases else 0.0
avg_faith    = global_faith_sum    / n_cases if n_cases else 0.0
print(f"ROUGE-1 precision promedio: {avg_rouge_p:.4f}")
print(f"ROUGE-1 F1 promedio       : {avg_rouge_f1:.4f}")
print(f"Faithfulness promedio     : {avg_faith:.4f}")

resumen_global = [{
    "casos_evaluados": n_cases,
    "rouge1_precision_promedio": avg_rouge_p,
    "rouge1_f1_promedio": avg_rouge_f1,
    "faithfulness_promedio": avg_faith,
}]

print("\n==================== PROMEDIOS POR TIPO ====================")
resumen_por_tipo = []
for t, vals in per_type_scores.items():
    if not vals:
        continue
    p_mean = sum(v[0] for v in vals) / len(vals)
    f1_mean = sum(v[1] for v in vals) / len(vals)
    faith_mean = sum(v[2] for v in vals) / len(vals)
    resumen_por_tipo.append({
        "type": t,
        "casos": len(vals),
        "rouge1_precision_promedio": p_mean,
        "rouge1_f1_promedio": f1_mean,
        "faithfulness_promedio": faith_mean,
    })
    print(
        f"Tipo {t}: casos={len(vals)} | "
        f"ROUGE-1 precision={p_mean:.4f} | "
        f"ROUGE-1 F1={f1_mean:.4f} | "
        f"Faithfulness={faith_mean:.4f}"
    )

# ---------------------------------------------------------
# 7) Guardar CSVs
# ---------------------------------------------------------
if rows_for_csv:
    detalle_fields = list(rows_for_csv[0].keys())
    with open(CSV_OUT_DETALLE, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=detalle_fields)
        writer.writeheader()
        writer.writerows(rows_for_csv)
    print(f"[EVAL] Resultados detallados guardados en: {CSV_OUT_DETALLE}")

if resumen_global:
    with open(CSV_OUT_RESUMEN_GLOBAL, "w", encoding="utf-8", newline="") as f:
        fieldnames = list(resumen_global[0].keys())
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(resumen_global)
    print(f"[EVAL] Resumen global guardado en: {CSV_OUT_RESUMEN_GLOBAL}")

if resumen_por_tipo:
    with open(CSV_OUT_RESUMEN_TIPO, "w", encoding="utf-8", newline="") as f:
        fieldnames = list(resumen_por_tipo[0].keys())
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(resumen_por_tipo)
    print(f"[EVAL] Resumen por tipo guardado en: {CSV_OUT_RESUMEN_TIPO}")

if debug_rows_for_csv:
    debug_fields = list(debug_rows_for_csv[0].keys())
    with open(CSV_OUT_DEBUG, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=debug_fields)
        writer.writeheader()
        writer.writerows(debug_rows_for_csv)
    print(f"[EVAL] Debug detallado guardado en: {CSV_OUT_DEBUG}")


[EVAL] Casos cargados: 69
[TIMING][build_embeddings] total=18.131s
[OK] Índices listos en 23.55s. Docs: 26 | LLM=ON | Rerank=ON

ID: NOMBRE_A211
PREGUNTA: ¿Cómo se llama el laboratorio A211?
GOLD: Laboratorio Integrado de Manufactura.
ROUGE-1 precision: 1.0000
ROUGE-1 F1       : 1.0000
Faithfulness     : 1.0000
----------------------------------------------
PROMEDIO ROUGE-1 PRECISION HASTA AHORA: 1.0000
PROMEDIO ROUGE-1 F1 HASTA AHORA       : 1.0000
PROMEDIO FAITHFULNESS HASTA AHORA     : 1.0000

ID: NOMBRE_A212
PREGUNTA: ¿Cómo se llama el laboratorio A212?
GOLD: Laboratorio de Energia y Fluidos.
ROUGE-1 precision: 1.0000
ROUGE-1 F1       : 1.0000
Faithfulness     : 1.0000
----------------------------------------------
PROMEDIO ROUGE-1 PRECISION HASTA AHORA: 1.0000
PROMEDIO ROUGE-1 F1 HASTA AHORA       : 1.0000
PROMEDIO FAITHFULNESS HASTA AHORA     : 1.0000

ID: NOMBRE_A221
PREGUNTA: ¿Cómo se llama el laboratorio A221?
GOLD: Laboratorio de Desarrollo de Software.
ROUGE-1 precision: 1.0

In [3]:
#Limpieza de csv para el análisis estadístico
import csv
import re
import unicodedata

CSV_IN  = "resultados_eval_detallado.csv"
CSV_OUT = "resultados_evaluacion_final_mejorado.csv"

CUT_MARKERS = [
    r"\bPREGUNTA\s*:\s*",
    r"\bHECHOS\s*:\s*",
    r"\bFACETS\s*:\s*",
    r"\bCONTEXTO\s*:\s*",
]

def normalize_unicode(s: str) -> str:
    # Normaliza unicode y elimina controles raros
    s = unicodedata.normalize("NFC", s)
    s = re.sub(r"[\u0000-\u001F\u007F]", " ", s)  # control chars
    s = re.sub(r"\s+", " ", s).strip()
    return s

def clean_text(x) -> str:
    if x is None:
        return ""
    s = str(x)

    # 1) Unifica saltos de línea
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = re.sub(r"\n+", " ", s)

    # 2) Si hubo “fuga” del prompt (PREGUNTA/HECHOS/etc), corta desde ahí
    for m in CUT_MARKERS:
        hit = re.search(m, s, flags=re.IGNORECASE)
        if hit:
            s = s[:hit.start()].strip()

    # 3) Limpieza estética: separadores “;” -> coma (más humano)
    s = re.sub(r"\s*;\s*", ", ", s)

    # 4) Quita prefijos típicos
    s = re.sub(r"^(respuesta\s*:\s*)", "", s, flags=re.IGNORECASE)

    # 5) Normaliza unicode/espacios finales
    s = normalize_unicode(s)

    return s

rows_out = []
with open(CSV_IN, "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        pred_raw = row.get("pred", "")
        gold_raw = row.get("gold", "")

        pred = clean_text(pred_raw)
        gold = clean_text(gold_raw)

        # bandera simple de “fuga” (por si quieres reportarlo como limitación)
        leak = int(bool(re.search(r"(PREGUNTA\s*:|HECHOS\s*:|FACETS\s*:)", str(pred_raw), flags=re.IGNORECASE)))

        r_f1 = float(row.get("rouge1_f1", 0) or 0)
        faith = float(row.get("faithfulness", 0) or 0)

        rows_out.append({
            "id": clean_text(row.get("id")),
            "tipo_pregunta": clean_text(row.get("type")),
            "pregunta": clean_text(row.get("question")),
            "respuesta_referencia": gold,
            "respuesta_agente": pred,
            # métricas (dos formatos: 0-1 y %)
            "rouge1_f1": round(r_f1, 4),
            "rouge1_f1_pct": round(100.0 * r_f1, 2),
            "faithfulness": round(faith, 4),
            "faithfulness_pct": round(100.0 * faith, 2),
            "prompt_leak": leak,
        })

# (Opcional) ordena por id para que sea más presentable
rows_out.sort(key=lambda r: (r["id"] or ""))

with open(CSV_OUT, "w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=[
            "id","tipo_pregunta","pregunta",
            "respuesta_referencia","respuesta_agente",
            "rouge1_f1","rouge1_f1_pct",
            "faithfulness","faithfulness_pct",
            "prompt_leak"
        ],
        delimiter=";"
    )
    writer.writeheader()
    writer.writerows(rows_out)

print(f"CSV mejorado generado en: {CSV_OUT}")


CSV mejorado generado en: resultados_evaluacion_final_mejorado.csv


In [4]:
#Calculo de la media, desviación estandar e IC a 95%
import csv, math

PATH = "resultados_evaluacion_final_mejorado.csv"

vals = []
with open(PATH, "r", encoding="utf-8") as f:
    reader = csv.DictReader(f, delimiter=";")
    for r in reader:
        vals.append(float(r["rouge1_f1"]))

n = len(vals)
mean = sum(vals) / n
var = sum((x - mean)**2 for x in vals) / n
std = math.sqrt(var)

z = 1.96
ci_low = mean - z * std / math.sqrt(n)
ci_high = mean + z * std / math.sqrt(n)

print(f"N = {n}")
print(f"Media ROUGE-1 F1 = {mean:.4f}")
print(f"Desviación estándar = {std:.4f}")
print(f"IC 95% = [{ci_low:.4f}, {ci_high:.4f}]")


N = 69
Media ROUGE-1 F1 = 0.8762
Desviación estándar = 0.3157
IC 95% = [0.8018, 0.9507]
