In [1]:
import os
import re
import math
import json
import unicodedata
from typing import Dict, List, Tuple, Union

import pandas as pd
from copy import copy
from rapidfuzz import fuzz
from ultralytics import YOLO
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter, range_boundaries
from openpyxl.styles.borders import Border, Side

from express import OCR_PATTERN as OCR_PATTERNS


MODEL_PATH      = r'C:\Users\juans\Documents\proarchitecg\version_2_docker\model_clasification_image_v2\runs\classify\person_cls\weights\best.pt'
IMAGES_ROOT     = r'D:\historias\dev\imagenes_por_doc\LETRA B\ACTA N° 71\1'
OCR_ROOT        = r'D:\historias\dev\ocr_por_doc\LETRA B\ACTA N° 71\1'
OUTPUT_FILE     = r'C:\Users\juans\Documents\proarchitecg\version_2_docker\resultados_completos_v_final.xlsx'
TEMPLATE_PATH   = r'C:\Users\juans\Downloads\dev_prev\FORMATO HOJA DE CONTROL DOCUMENTAL.xlsx'
OUTPUT_DIR_CTRL = r'C:\Users\juans\Downloads\define\answer'

CONF_THRESH = 0.5
SCORING_MIN = 2  # (no se usa en la versión actual; mantenido por compatibilidad)

# Diccionarios globales para patrones (se llenan en main)
OCR_COMPILED: Dict[str, List[re.Pattern]] = {}
OCR_SEEDS:    Dict[str, List[str]]        = {}

In [2]:
def normalize_text(txt: str) -> str:
    """
    Normaliza texto para matching:
    - NFKD y elimina diacríticos
    - Solo deja [a-z0-9 y espacio]
    - Colapsa espacios y pasa a minúsculas
    """
    if not isinstance(txt, str):
        txt = "" if txt is None else str(txt)
    nk = unicodedata.normalize("NFKD", txt)
    no_diac = "".join(ch for ch in nk if not unicodedata.combining(ch))
    ascii_txt = no_diac.encode("ASCII", "ignore").decode().lower()
    ascii_txt = re.sub(r"[^a-z0-9\s]", " ", ascii_txt)
    ascii_txt = re.sub(r"\s+", " ", ascii_txt).strip()
    return ascii_txt


def regex_to_seed(pat: Union[str, re.Pattern]) -> str:
    """
    Extrae una 'semilla' legible para fuzzy desde un patrón:
    - Si es re.Pattern toma .pattern
    - Elimina metacaracteres y normaliza
    """
    raw = pat.pattern if isinstance(pat, re.Pattern) else str(pat)
    raw = raw.replace(r"\b", " ")
    raw = re.sub(r"[^a-zA-Z0-9\s]", " ", raw)
    raw = re.sub(r"\s+", " ", raw).strip()
    return normalize_text(raw)


def prepare_patterns(raw_dict: Dict[str, Union[str, re.Pattern, List[Union[str, re.Pattern]]]]
                    ) -> Tuple[Dict[str, List[re.Pattern]], Dict[str, List[str]]]:
    """
    Acepta {label: [patrones]} como strings (normalizados) o re.Patterns.
    Devuelve:
      - compiled[label] -> list[re.Pattern] (sobre texto ya normalizado)
      - seeds[label]    -> list[str] para fuzzy
    """
    compiled: Dict[str, List[re.Pattern]] = {}
    seeds:    Dict[str, List[str]]        = {}

    for label, pats in raw_dict.items():
        arr = pats if isinstance(pats, list) else [pats]
        comp_list: List[re.Pattern] = []
        seed_list: List[str] = []

        for pat in arr:
            if isinstance(pat, re.Pattern):
                comp_list.append(pat)
                seed_list.append(regex_to_seed(pat))
            else:
                literal = normalize_text(pat)
                if not literal:
                    continue
                seed_list.append(literal)
                comp_list.append(re.compile(rf"\b{re.escape(literal)}\b"))

        if comp_list:
            compiled[label] = comp_list
            seeds[label]    = [s for s in seed_list if s]

    return compiled, seeds

# helpers

In [3]:
# %% [markdown]
# # 3) Helpers de IO / OCR

# %%
def normalize_basename(path: str) -> str:
    base = os.path.basename(path)
    stem, _ = os.path.splitext(base)
    return normalize_text(stem)


def load_ocr_records(jpath: str):
    """
    Carga un OCR JSON y devuelve lista de registros con posible forma:
      - [ {pagina, imagen, texto}, ... ]
      - { "paginas"/"pages"/"data"/"items"/"ocr": [ {...}, ... ] }
      - JSON Lines (una por línea)
      - Un único objeto {pagina, imagen, texto}
    """
    try:
        with open(jpath, encoding='utf-8') as f:
            data = json.load(f)
    except json.JSONDecodeError:
        # NDJSON / JSONL
        recs = []
        with open(jpath, encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    recs.append(json.loads(line))
                except json.JSONDecodeError:
                    pass
        return recs

    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        for key in ("paginas", "pages", "data", "items", "ocr"):
            if key in data and isinstance(data[key], list):
                return data[key]
        return [data]
    return []


def map_ocr_to_indexes(recs, texts_by_page: Dict[int, str], texts_by_file: Dict[str, str]):
    """
    Agrega registros OCR a dos índices:
      - texts_by_page[int] = texto
      - texts_by_file[nombre_archivo_normalizado] = texto
    """
    added_p, added_f = 0, 0
    for rec in recs:
        if not isinstance(rec, dict):
            continue

        txt = rec.get("texto", "") or rec.get("text", "") or ""

        # (1) mapear por número de página
        pg_int = None
        if "pagina" in rec:
            try:
                pg_int = int(str(rec["pagina"]).strip())
            except Exception:
                pg_int = None
        elif "page" in rec:
            try:
                pg_int = int(str(rec["page"]).strip())
            except Exception:
                pg_int = None
        if pg_int is not None:
            texts_by_page[pg_int] = txt
            added_p += 1

        # (2) mapear por nombre de archivo normalizado
        imgf = rec.get("imagen") or rec.get("image") or ""
        if imgf:
            texts_by_file[normalize_basename(imgf)] = txt
            added_f += 1
            # inferir página del nombre
            pg_from_img = extract_page_number(os.path.basename(imgf))
            if isinstance(pg_from_img, (int, float)) and not math.isnan(pg_from_img):
                texts_by_page[int(pg_from_img)] = txt
                added_p += 1

    return added_p, added_f


# Busqueda

In [4]:
# %% [markdown]
# # 4) Búsqueda de imágenes/JSON y helpers

# %%
def find_persona_images(root: str) -> Dict[str, List[str]]:
    """
    Recorre la carpeta raíz y agrupa imágenes por persona (nombre de carpeta base).
    """
    persona_images: Dict[str, List[str]] = {}
    for dirpath, _, files in os.walk(root):
        imgs = [os.path.join(dirpath, f)
                for f in files
                if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        if imgs:
            pname = normalize_text(os.path.basename(dirpath))
            persona_images.setdefault(pname, []).extend(imgs)
    return persona_images


def build_ocr_map(root: str) -> Dict[str, str]:
    """
    Indexa archivos .json (OCR) por nombre de archivo (sin extensión) normalizado.
    """
    json_map: Dict[str, str] = {}
    for dirpath, _, files in os.walk(root):
        for fn in files:
            if fn.lower().endswith('.json'):
                key = normalize_text(os.path.splitext(fn)[0])
                json_map[key] = os.path.join(dirpath, fn)
    return json_map


def match_json_for_persona(json_map: Dict[str, str], persona: str, thresh: int = 70):
    """
    Devuelve la ruta del JSON cuyo nombre más se parece a 'persona' (exacto o fuzzy).
    """
    if persona in json_map:
        return json_map[persona]
    best_score, best_key = 0, None
    for k in json_map:
        s = fuzz.partial_ratio(persona, k)
        if s > best_score:
            best_score, best_key = s, k
    if best_score >= thresh and best_key:
        return json_map[best_key]
    return None


def extract_page_number(fn: str) -> float:
    """
    Extrae un número de página desde 'pagina_12.png' u otro entero en el nombre.
    """
    m = re.search(r'pagina[_-]?(\d+)', fn, re.IGNORECASE) or re.search(r'(\d+)', fn)
    return int(m.group(1)) if m else math.nan


# %% [markdown]
# # 5) Predicción visual, fuzzy y clasificador

# %%
def visual_predict(model: YOLO, img_path: str, strict: bool = True):
    """
    Predicción visual (clasificador Ultralytics).
    """
    res = model.predict(source=img_path, device='cpu', task='classify', verbose=False)[0]
    probs = getattr(res, 'probs', None)
    arr = probs.data.tolist() if hasattr(probs, 'data') else list(probs or [])
    if not arr or (strict and max(arr) < CONF_THRESH):
        return None, (max(arr) if arr else 0.0)
    idx = arr.index(max(arr))
    return model.names[idx], max(arr)


def fuzzy_ocr_label(txt_norm: str,
                    label_seeds: Dict[str, List[str]],
                    threshold: int = 75) -> Tuple[str, float]:
    """
    Fuzzy con semillas 'planas' (sin regex). Usa partial_ratio.
    """
    best_label, best_score = "", 0
    for label, terms in label_seeds.items():
        for t in terms:
            if not t:
                continue
            s = fuzz.partial_ratio(txt_norm, t)
            if s > best_score:
                best_label, best_score = label, s
    return (best_label, best_score / 100.0) if best_score >= threshold else ("", 0.0)


def classify(text: str, model: YOLO, img_path: str):
    """
    Pipeline de clasificación:
      1) Visual estricto
      2) OCR scoring (cuenta matches por label)
      3) OCR regex rápido (primer match)
      4) Fuzzy (semillas planas)
    """
    txt = normalize_text(text or "")

    # 1) Visual
    lbl, cf = visual_predict(model, img_path, strict=True)
    if lbl:
        return lbl, cf, "visual"

    # 2) OCR scoring
    scores = {dt: 0 for dt in OCR_COMPILED}
    for dt, pats in OCR_COMPILED.items():
        for p in pats:
            if p.search(txt):
                scores[dt] += 1
    if scores:
        th = {dt: max(1, len(OCR_COMPILED[dt]) // 2) for dt in OCR_COMPILED}
        best_dt, cnt = max(scores.items(), key=lambda x: x[1])
        if cnt >= th[best_dt]:
            conf = cnt / max(1, len(OCR_COMPILED[best_dt]))
            return best_dt, conf, "ocr_scoring"

    # 3) OCR regex rápido
    for dt, pats in OCR_COMPILED.items():
        for p in pats:
            if p.search(txt):
                return dt, 1.0, "ocr_regex"

    # 4) Fuzzy
    lbl_f, cf_f = fuzzy_ocr_label(txt, OCR_SEEDS, threshold=75)
    if lbl_f:
        return lbl_f, cf_f, "ocr_fuzzy"

    return "", 0.0, "none"


In [5]:
# %% [markdown]
# # 6) Utilidades de Excel

# %%
def copy_row_format(ws, src_row: int, tgt_row: int, max_col: int = 13, row_height: float = 48):
    ws.row_dimensions[tgt_row].height = row_height
    thin = Side(border_style="thin", color="000000")
    full_border = Border(left=thin, right=thin, top=thin, bottom=thin)

    for col in range(1, max_col + 1):
        src = ws.cell(row=src_row, column=col)
        tgt = ws.cell(row=tgt_row, column=col)
        if src.has_style:
            tgt.font          = copy(src.font)
            tgt.fill          = copy(src.fill)
            tgt.number_format = copy(src.number_format)
            tgt.protection    = copy(src.protection)
            tgt.alignment     = copy(src.alignment)
        tgt.border = full_border

    for m in list(ws.merged_cells.ranges):
        if m.min_row == src_row == m.max_row:
            c1 = get_column_letter(m.min_col)
            c2 = get_column_letter(m.max_col)
            ws.merge_cells(f"{c1}{tgt_row}:{c2}{tgt_row}")


def remove_holes(ws, hole_ranges: List[str]):
    """
    Descombina y borra TODO en los rangos indicados (p.ej. ["B57:B59","C57:F59","D60:F60"])
    """
    parsed = []
    for rng in hole_ranges:
        min_col, min_row, max_col, max_row = range_boundaries(rng)
        parsed.append((min_row, max_row, min_col, max_col))

    to_unmerge = []
    for m in list(ws.merged_cells.ranges):
        for min_row, max_row, min_col, max_col in parsed:
            if not (m.max_row < min_row or m.min_row > max_row
                    or m.max_col < min_col or m.min_col > max_col):
                to_unmerge.append(m.coord)
                break
    for coord in to_unmerge:
        ws.unmerge_cells(coord)

    for rng in hole_ranges:
        for row in ws[rng]:
            for cell in row:
                cell.value = None


def generate_control_sheet(df_perso: pd.DataFrame, persona: str):
    """
    Genera y guarda la hoja de control para una persona (si no existe).
    """
    os.makedirs(OUTPUT_DIR_CTRL, exist_ok=True)
    out = os.path.join(OUTPUT_DIR_CTRL, f"{persona}_hoja_control.xlsx")
    if os.path.exists(out):
        print(f"⚠️ Ya existe hoja de control para '{persona}', omitiendo.")
        return

    wb = load_workbook(TEMPLATE_PATH)
    ws = wb.active
    START_ROW = 18

    # Quitar huecos específicos
    holes = ["B57:B59", "C57:F59", "D60:F60"]
    remove_holes(ws, holes)

    # Buscar pie “NOMBRE Y APELLIDOS”
    footer = None
    for row in ws.iter_rows(min_row=START_ROW, max_row=ws.max_row):
        for c in row:
            if isinstance(c.value, str) and "NOMBRE Y APELLIDOS" in c.value.upper():
                footer = c.row
                break
        if footer:
            break
    footer = footer or (START_ROW + 38)

    # Detectar filas con contenido en A
    content_rows = [
        r for r in range(START_ROW, footer)
        if ws.cell(row=r, column=1).value not in (None, "")
    ]
    if not content_rows:
        raise RuntimeError("No encontré filas con contenido en la plantilla.")
    last_content = content_rows[-1]

    # Insertar filas necesarias
    template_n = len(content_rows)
    n_pages    = len(df_perso)
    if n_pages > template_n:
        extras = n_pages - template_n
        ws.insert_rows(footer, amount=extras)
        for i in range(extras):
            dst = footer + i
            copy_row_format(ws, last_content, dst, max_col=13, row_height=48)

    # Volcar datos
    for idx, rec in enumerate(df_perso.sort_values('posicion').itertuples(), start=1):
        r = START_ROW + idx - 1
        ws.cell(row=r, column=1, value=idx)               # A
        ws.cell(row=r, column=5, value=rec.predicted)     # E
        ws.cell(row=r, column=6, value=int(rec.posicion)) # F
        ws.cell(row=r, column=7, value=int(rec.posicion)) # G

    wb.save(out)
    print("✅ Control inmediato:", out)


In [6]:
# %% [markdown]
# # 7) Main y ejecución

# %%
def main():
    global OCR_COMPILED, OCR_SEEDS  # para que classify() use los compilados

    os.makedirs(OUTPUT_DIR_CTRL, exist_ok=True)

    print("⏳ Cargando modelo YOLO...")
    model = YOLO(MODEL_PATH)
    print("✅ Modelo cargado.")

    print("⏳ Compilando patrones OCR...")
    OCR_COMPILED, OCR_SEEDS = prepare_patterns(OCR_PATTERNS)
    if not OCR_COMPILED:
        raise RuntimeError("OCR_COMPILED vacío: revisa doc_type_patterns.OCR_PATTERNS")
    print(f"✅ Patrones compilados: {len(OCR_COMPILED)} tipos.")

    # Indexación
    print("⏳ Indexando OCR JSON...")
    json_map = build_ocr_map(OCR_ROOT)
    print(f"✅ JSONs indexados: {len(json_map)}")

    print("⏳ Buscando imágenes por persona...")
    persona_images = find_persona_images(IMAGES_ROOT)
    print(f"✅ Personas detectadas: {len(persona_images)}")

    all_rows, gid = [], 1

    # Procesamiento por persona
    for pkey, img_paths in persona_images.items():
        print(f"\n👤 Procesando persona '{pkey}' con {len(img_paths)} imágenes…")

        # Índices para el OCR
        texts_by_page: Dict[int, str] = {}
        texts_by_file: Dict[str, str]  = {}

        # Cargar y mapear OCR JSON
        jpath = match_json_for_persona(json_map, pkey)
        if jpath:
            try:
                recs = load_ocr_records(jpath)
                added_p, added_f = map_ocr_to_indexes(recs, texts_by_page, texts_by_file)
                print(f"   🔎 OCR JSON: {os.path.basename(jpath)} | "
                      f"registros={len(recs)} | páginas={len(texts_by_page)} (+{added_p}) | "
                      f"archivos={len(texts_by_file)} (+{added_f})")
                if not texts_by_page and not texts_by_file and recs:
                    sample = recs[0]
                    print(f"   ℹ️ Estructura ejemplo del JSON: keys={list(sample.keys())[:8]}")
            except Exception as e:
                print(f"   ⚠️ Error leyendo/mapeando JSON '{jpath}': {e}")
        else:
            print("   ⚠️ No se encontró JSON OCR para esta persona (se continúa con visual/fuzzy/regex).")

        # Clasificar imágenes
        persona_rows = []
        for img in sorted(img_paths, key=extract_page_number):
            pg_val = extract_page_number(img)
            pg_int = int(pg_val) if isinstance(pg_val, (int, float)) and not math.isnan(pg_val) else None

            key_norm = normalize_basename(img)
            txt = texts_by_file.get(key_norm, "")
            if not txt and pg_int is not None:
                txt = texts_by_page.get(pg_int, "")

            if not txt:
                print(f"   ⚠️ OCR vacío para '{os.path.basename(img)}' (pg={pg_int}). "
                      f"Usaré solo visual + fuzzy/regex sobre texto vacío.")

            lbl, sc, ly = classify(txt, model, img)

            rec = {
                'id':        gid,
                'persona':   pkey,
                'imagen':    img,
                'posicion':  pg_val,
                'predicted': lbl,
                'score':     sc,
                'layer':     ly
            }
            all_rows.append(rec)
            persona_rows.append(rec)
            gid += 1

        # Generar hoja de control por persona inmediatamente
        if persona_rows:
            df_perso = pd.DataFrame(persona_rows)
            df_perso['correct'] = df_perso['persona'] == df_perso['predicted']  # compat
            generate_control_sheet(df_perso, pkey)

    # Consolidado global
    print("\n⏳ Generando DataFrame y guardando:", OUTPUT_FILE)
    df = pd.DataFrame(all_rows)
    if not df.empty:
        df['correct'] = df['persona'] == df['predicted']
        df.to_excel(OUTPUT_FILE, index=False)
        print("✅ Consolidado en", OUTPUT_FILE)
    else:
        print("⚠️ No se generaron filas; revisa rutas y extensiones.")

    print("🎉 Proceso completado.")


# %%
if __name__ == "__main__":
    main()


⏳ Cargando modelo YOLO...
✅ Modelo cargado.
⏳ Compilando patrones OCR...
✅ Patrones compilados: 37 tipos.
⏳ Indexando OCR JSON...
✅ JSONs indexados: 9
⏳ Buscando imágenes por persona...
✅ Personas detectadas: 9

👤 Procesando persona 'babilonia negrette harold' con 8 imágenes…
   🔎 OCR JSON: Babilonia Negrette Harold.json | registros=8 | páginas=8 (+16) | archivos=8 (+8)
⚠️ Ya existe hoja de control para 'babilonia negrette harold', omitiendo.

👤 Procesando persona 'bacca castro diego armando' con 18 imágenes…
   🔎 OCR JSON: Bacca Castro Diego Armando.json | registros=18 | páginas=18 (+36) | archivos=18 (+18)
⚠️ Ya existe hoja de control para 'bacca castro diego armando', omitiendo.

👤 Procesando persona 'bacca castro luis fernando' con 25 imágenes…
   🔎 OCR JSON: Bacca Castro Luis Fernando.json | registros=25 | páginas=25 (+50) | archivos=25 (+25)
⚠️ Ya existe hoja de control para 'bacca castro luis fernando', omitiendo.

👤 Procesando persona 'bacca de valencia adelaida del carmen 1' c