In [1]:
# === Visualizador robusto de Mentions/Entities sobre el texto del doc ===
from pathlib import Path
import json, re
from typing import List, Dict, Tuple
from IPython.display import display, HTML

LABEL_COLORS = {
    "PERSON": "#ffcc80",  # naranja claro
    "ORG":    "#ff7f0e",  # naranja
    "EMAIL":  "#9467bd",
    "URL":    "#d62728",
    "DATE":   "#2ca02c",
    "MONEY":  "#1f77b4",
    "TITLE":  "#bcbd22",
    "DEGREE": "#17becf",
    "LOC":    "#1abc9c",
    "PRODUCT":"#8c564b",
    "ID":     "#7f7f7f",
    "OTHER":  "#aaaaaa",
}

def _read_json(p: Path) -> dict:
    return json.loads(p.read_text(encoding="utf-8"))

def load_sentences(doc_id: str, sentences_dir="outputs_sentences") -> dict:
    p = Path(sentences_dir) / f"{doc_id}_sentences.json"
    if not p.exists(): raise FileNotFoundError(p)
    return _read_json(p)

def load_mentions(doc_id: str, mentions_dir="outputs_mentions") -> dict:
    p = Path(mentions_dir) / f"{doc_id}_mentions.json"
    if not p.exists(): raise FileNotFoundError(p)
    return _read_json(p)

def load_entities(doc_id: str, entities_dir="outputs_entities") -> dict:
    p = Path(entities_dir) / f"{doc_id}_entities.json"
    if not p.exists(): raise FileNotFoundError(p)
    return _read_json(p)

def build_text_and_offsets(sentences_doc: dict) -> Tuple[str, List[int], List[str]]:
    sents = [s.get("text","") for s in sentences_doc.get("sentences",[])]
    offsets = []
    parts = []
    cur = 0
    for i, s in enumerate(sents):
        offsets.append(cur)
        parts.append(s)
        cur += len(s)
        if i < len(sents) - 1:
            parts.append("\n")
            cur += 1
    return "".join(parts), offsets, sents

# ---------- Normalización suave para búsqueda ----------
def _norm(s: str) -> str:
    s = s.replace("\u00A0", " ")
    s = re.sub(r"\s+", " ", s, flags=re.M)
    return s.strip()

def _build_norm_map(original: str) -> Tuple[str, List[int]]:
    """
    Devuelve (texto_normalizado, map_idx) donde map_idx[i_norm] = i_original.
    Simplifica: colapsa runs de whitespace pero mantiene un mapeo claro.
    """
    norm_chars = []
    idx_map = []
    i = 0
    L = len(original)
    while i < L:
        ch = original[i]
        if ch.isspace():
            # colapsar espacios consecutivos a 1 espacio en la normalización
            norm_chars.append(" ")
            idx_map.append(i)
            while i < L and original[i].isspace():
                i += 1
            continue
        else:
            norm_chars.append(ch)
            idx_map.append(i)
            i += 1
    normed = "".join(norm_chars)
    # Recorta extremos si inicia/termina con espacio
    if normed and normed[0] == " ":
        normed = normed[1:]
        idx_map = idx_map[1:]
    if normed and normed[-1] == " ":
        normed = normed[:-1]
        idx_map = idx_map[:-1]
    return normed, idx_map

def _search_span_in_text(surface: str, doc_text: str) -> Tuple[int,int] | None:
    """Búsqueda tolerante: normaliza ambos y usa regex con límites suaves."""
    if not surface: return None
    norm_doc, idx_map = _build_norm_map(doc_text)
    norm_surface = _norm(surface)
    if not norm_surface: return None
    # \b no siempre sirve con diacríticos; usamos patrón laxo con espacios colapsados
    pat = re.escape(norm_surface)
    m = re.search(pat, norm_doc, flags=re.IGNORECASE)
    if not m:
        # prueba con boundaries laxos: ignora puntuación adyacente
        pat = r"(?<!\w)" + re.escape(norm_surface) + r"(?!\w)"
        m = re.search(pat, norm_doc, flags=re.IGNORECASE)
    if not m:
        return None
    s_norm, e_norm = m.start(), m.end()
    # mapear a índices originales
    if s_norm >= len(idx_map) or e_norm-1 >= len(idx_map):
        return None
    s_orig = idx_map[s_norm]
    e_orig = idx_map[e_norm-1] + 1
    return (s_orig, e_orig)

# ---------- Construcción de spans para Mentions ----------
def spans_from_mentions(mentions_doc: dict, sentences_doc: dict, doc_text: str) -> List[Dict]:
    _, offsets, _ = build_text_and_offsets(sentences_doc)
    spans = []

    for m in mentions_doc.get("entities", []):
        lbl = (m.get("canonical_label") or m.get("label") or "OTHER").upper()
        txt = (m.get("text") or "").strip()
        si  = m.get("sentence_idx")
        span = m.get("char_span")

        if isinstance(si, int) and isinstance(span, (list,tuple)) and len(span)==2:
            s, e = int(span[0]), int(span[1])
            if 0 <= si < len(offsets):
                s, e = offsets[si] + s, offsets[si] + e
                spans.append({"start": s, "end": e, "label": lbl, "text": txt})
                continue

        # fallback: busca en todo el doc
        hit = _search_span_in_text(txt, doc_text)
        if hit:
            spans.append({"start": hit[0], "end": hit[1], "label": lbl, "text": txt})

    # quitar solapes (simple: greedy por longitud)
    spans.sort(key=lambda x: (x["start"], -(x["end"]-x["start"])))
    cleaned, last_end = [], -1
    for s in spans:
        if s["start"] >= last_end:
            cleaned.append(s); last_end = s["end"]
    return cleaned

# ---------- Construcción de spans para Entities (normalizadas) ----------
def spans_from_entities(entities_doc: dict, mentions_doc: dict, sentences_doc: dict, doc_text: str) -> List[Dict]:
    # index por mention_id
    by_id = { str(m.get("id")): m for m in mentions_doc.get("entities", []) }
    _, offsets, _ = build_text_and_offsets(sentences_doc)

    spans = []
    for ent in entities_doc.get("entities", []):
        lbl = (ent.get("type") or "OTHER").upper()
        mlist = ent.get("mentions") or []
        anchor_span = None
        anchor_text = None
        # 1) intentar con la primera mención que tenga indices
        for mid in mlist:
            md = by_id.get(str(mid))
            if not md: continue
            anchor_text = (md.get("text") or "").strip()
            si = md.get("sentence_idx")
            span = md.get("char_span")
            if isinstance(si, int) and isinstance(span, (list,tuple)) and len(span)==2 and 0 <= si < len(offsets):
                s, e = offsets[si] + int(span[0]), offsets[si] + int(span[1])
                anchor_span = (s, e)
                break
        # 2) fallback: buscar superficie de la entidad (name/value/raw) en doc
        if not anchor_span:
            candidates = [
                (ent.get("name") or "").strip(),
                (ent.get("value") or "").strip(),
            ]
            # prueba attrs obvias
            a = ent.get("attrs") or {}
            for k in ("org_core","org_key","email_norm","url_norm","given_name","family_name"):
                v = (a.get(k) or "").strip()
                if v and v not in candidates: candidates.append(v)
            for surf in candidates:
                hit = _search_span_in_text(surf, doc_text)
                if hit:
                    anchor_text = surf
                    anchor_span = hit
                    break
        if anchor_span:
            spans.append({"start": anchor_span[0], "end": anchor_span[1], "label": lbl, "text": anchor_text})

    # quitar solapes
    spans.sort(key=lambda x: (x["start"], -(x["end"]-x["start"])))
    cleaned, last_end = [], -1
    for s in spans:
        if s["start"] >= last_end:
            cleaned.append(s); last_end = s["end"]
    return cleaned

# ---------- Render ----------
def render_spans(doc_text: str, spans: list[dict], title=None):
    try:
        from spacy import displacy
        payload = {
            "text": doc_text,
            "ents": [{"start": s["start"], "end": s["end"], "label": s["label"]} for s in spans],
            "title": title,
        }
        html = displacy.render(payload, style="ent", options={"colors": LABEL_COLORS}, manual=True, jupyter=False)
        display(HTML(html))
    except Exception:
        # Fallback simple: inserta <mark> (sin solapamientos)
        html = doc_text
        for e in sorted(spans, key=lambda x: x["start"], reverse=True):
            start, end, label = e["start"], e["end"], e["label"]
            color = LABEL_COLORS.get(label, "#ffff00")
            snippet = (
                f"<mark style=\"background:{color}; padding:0 2px; border-radius:3px\">"
                f"{html[start:end]} <small>({label})</small></mark>"
            )
            html = html[:start] + snippet + html[end:]
        display(HTML(f"<div style='font-family:monospace; white-space:pre-wrap'>{html}</div>"))


def visualize_mentions(doc_id: str,
                       sentences_dir="outputs_sentences",
                       mentions_dir="outputs_mentions",
                       title=None):
    sdoc = load_sentences(doc_id, sentences_dir)
    doc_text, _, _ = build_text_and_offsets(sdoc)
    mdoc = load_mentions(doc_id, mentions_dir)
    spans = spans_from_mentions(mdoc, sdoc, doc_text)
    render_spans(doc_text, spans, title or f"{doc_id} · Mentions")

def visualize_entities(doc_id: str,
                       sentences_dir="outputs_sentences",
                       mentions_dir="outputs_mentions",
                       entities_dir="outputs_entities",
                       title=None):
    sdoc = load_sentences(doc_id, sentences_dir)
    doc_text, _, _ = build_text_and_offsets(sdoc)
    mdoc = load_mentions(doc_id, mentions_dir)
    edoc = load_entities(doc_id, entities_dir)
    spans = spans_from_entities(edoc, mdoc, sdoc, doc_text)
    render_spans(doc_text, spans, title or f"{doc_id} · Entities")


In [2]:
# Reemplaza con tu DOC-* real (sin sufijo)
doc_id = "DOC-0EBB567B9D70"

# 1) Ver menciones crudas (NER/RE) pintadas en el texto completo
visualize_mentions(doc_id)    # como en tu screenshot
visualize_entities(doc_id)    # entidades normalizadas


In [3]:
# === JUPYTER CELL 2A: métricas usando el módulo normalizer.metrics ===
from pathlib import Path
import json
from normalizer.metrics import summary as entities_summary

p = Path("outputs_entities") / f"{doc_id}_entities.json"
de = json.loads(p.read_text(encoding="utf-8"))

s = entities_summary(de)
s  # se imprime un dict con KPIs útiles


{'merge_savings': {'input_mentions': 302,
  'entities': 201,
  'saved': 101,
  'saved_rate': 0.3344},
 'type_distribution': {'DATE': 2,
  'MONEY': 1,
  'ORG': 54,
  'PERSON': 88,
  'LOC': 55,
  'DEGREE': 1},
 'date_precision': {'year': 2},
 'money_currency': {},
 'email_domains_top5': [],
 'url_domains_top5': [],
 'person_single_token_suspects': ['Artaud',
  'Allí',
  'Diana',
  'Paz',
  'Pizarnik',
  'Lucidez',
  'también',
  'hará',
  'Acaso',
  'Sólo',
  'Partir',
  'He',
  'Tiritantes',
  'Perdidas',
  'Abrázalo',
  'Dile',
  'Acurrucado',
  'Estallará',
  'alejandra',
  'Mañana',
  'Arcano',
  'Pero',
  'Mis',
  'Recuerdo',
  'Detrás',
  'Le',
  'miedo',
  'Aurora',
  'Aprenderé',
  'Dama',
  'Tiresias',
  'Caen',
  'Posesiones',
  'Versos',
  'Pudiera',
  'Deja',
  'ángel',
  'Alguien',
  'ILENCIOS',
  'Quien',
  'Habla',
  'Voy',
  'resucito'],
 'person_org_conflicts': 0,
 'key_uniqueness': {'DATE': 1.0,
  'MONEY': 1.0,
  'ORG': 1.0,
  'PERSON': 1.0,
  'LOC': 1.0,
  'DEGREE': 1.

In [5]:
# === JUPYTER CELL 2B: resumen inline (sin normalizer.metrics) ===
import json
from pathlib import Path
from collections import Counter, defaultdict

def quick_entities_summary(de_doc: dict):
    ents = de_doc.get("entities", [])
    counters = de_doc.get("meta", {}).get("counters", {})
    input_mentions = int(counters.get("input_mentions", 0))
    entities = int(counters.get("entities", len(ents)))
    saved = max(0, input_mentions - entities)
    saved_rate = (saved / input_mentions) if input_mentions else 0.0

    type_dist = Counter([e.get("type","OTHER") for e in ents])

    date_prec = Counter()
    for e in ents:
        if e.get("type") == "DATE":
            date_prec[str(e.get("attrs", {}).get("precision","unknown")).lower()] += 1

    money_stats = defaultdict(list)
    for e in ents:
        if e.get("type") == "MONEY":
            a = e.get("attrs", {}) or {}
            cur = str(a.get("currency","UNK"))
            val = a.get("normalized_value", None)
            if isinstance(val, (int,float)):
                money_stats[cur].append(float(val))
    money_out = {}
    for cur, vals in money_stats.items():
        if vals:
            money_out[cur] = {"count": len(vals), "min": min(vals), "max": max(vals), "avg": sum(vals)/len(vals)}
        else:
            money_out[cur] = {"count": 0, "min": None, "max": None, "avg": None}

    return {
        "merge_savings": {"input_mentions": input_mentions, "entities": entities, "saved": saved, "saved_rate": round(saved_rate,4)},
        "type_distribution": dict(type_dist),
        "date_precision": dict(date_prec),
        "money_currency": money_out,
    }

p = Path("outputs_entities") / f"{doc_id}_entities.json"
de = json.loads(p.read_text(encoding="utf-8"))
quick_entities_summary(de)


{'merge_savings': {'input_mentions': 302,
  'entities': 201,
  'saved': 101,
  'saved_rate': 0.3344},
 'type_distribution': {'DATE': 2,
  'MONEY': 1,
  'ORG': 54,
  'PERSON': 88,
  'LOC': 55,
  'DEGREE': 1},
 'date_precision': {'year': 2},
 'money_currency': {}}