# 🧩 Limpieza de Leyes — Parser general (MX) → JSON estructurado

**Objetivo:** convertir archivos de texto *limpios* (derivados de PDF) de leyes/decretos mexicanos en un **JSON homogéneo**, listo para análisis a gran escala y construcción de un corpus.

**Flujo de alto nivel**
1) **Entrada:** `clean_XXXX.txt` (texto limpio por documento)  
2) **Partición:** Decreto / Parte normativa / Transitorios  
3) **Extracción:** Capítulos → Artículos → Fracciones (+ Transitorios anidados)  
4) **Validación:** reglas básicas de estructura y contenido  
5) **Salida:** `Refined/json/<base>.json` + `manifest.csv` / `manifest.jsonl`

**Salidas clave**
- `Refined/json/`: JSON por documento y manifiestos
- `Refined/errores/`: reportes de validación
- `Refined/logs.txt`: bitácora (se crea en pasos posteriores)

> Nota: Este cuaderno es incremental. Primero configuramos rutas (paso 1), luego funciones y parsing, y por último procesamiento por lotes y QA.


In [2]:
# === Paso 1: Configuración de rutas del proyecto ===
from pathlib import Path
import os
from datetime import datetime

# ============== Directory Configuration ==============

# Base working directory (current notebook location)
BASE_DIR     = Path.cwd()

# === INPUT PATHS ===
RAW_DIR      = BASE_DIR / "Raw"                 # Source PDF files location
CATALOG_CSV  = RAW_DIR / "index.csv"            # Metadata catalog for PDFs

# === OUTPUT ROOT ===
OUTPUT_DIR   = BASE_DIR / "Refined"             # All processed outputs go here
TEMP_DIR     = BASE_DIR / "temp"                # Temporary processing files

# === INTERMEDIATE PROCESSING DIRECTORIES ===
RAW_TXT_DIR  = OUTPUT_DIR / TEMP_DIR / "raw_txt"   # Step 1: Raw PDF text extraction
CLEAN_DIR    = OUTPUT_DIR / TEMP_DIR / "clean"     # Step 2: Cleaned text files

# === DOCUMENT TYPE SEPARATION (Step 3 outputs) ===
LEY_DIR      = OUTPUT_DIR / "leyes"             # Main law content
DECR_DIR     = OUTPUT_DIR / "decretos"          # Government decree sections  
TRANS_DIR    = OUTPUT_DIR / "transitorios"      # Transitional provisions

# === FINAL OUTPUTS ===
JSON_DIR     = OUTPUT_DIR / "json"              # Structured JSON documents
ERRORES_DIR  = OUTPUT_DIR / "errores"           # Error logs and validation reports
LOGS_PATH    = OUTPUT_DIR / "logs.txt"          # Bitácora (se usará en el paso 3)

# Create all necessary directories (parents=True creates nested paths)
for d in [OUTPUT_DIR, TEMP_DIR, RAW_TXT_DIR, CLEAN_DIR, LEY_DIR, DECR_DIR, TRANS_DIR, JSON_DIR, ERRORES_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# --- utilidades cortas de sanity-check ---
def _check_writable(dirpath: Path) -> bool:
    try:
        probe = dirpath / ".write_test.tmp"
        probe.write_text("ok", encoding="utf-8")
        probe.unlink(missing_ok=True)
        return True
    except Exception:
        return False

def _summary():
    items = [
        ("Base directory", BASE_DIR),
        ("Input PDFs", RAW_DIR),
        ("Catalog", CATALOG_CSV),
        ("Temp/raw_txt", RAW_TXT_DIR),
        ("Temp/clean", CLEAN_DIR),
        ("Leyes", LEY_DIR),
        ("Decretos", DECR_DIR),
        ("Transitorios", TRANS_DIR),
        ("Final JSON output", JSON_DIR),
        ("Errores", ERRORES_DIR),
    ]
    print("Directory structure created successfully!\n")
    for k, v in items:
        print(f"{k:>18}: {v}")
    print("\nWritable checks:")
    for label, path in [("OUTPUT_DIR", OUTPUT_DIR), ("JSON_DIR", JSON_DIR), ("ERRORES_DIR", ERRORES_DIR)]:
        print(f"  - {label:<12} {'OK' if _check_writable(path) else 'NO WRITE PERMISSION'}")

_summary()


Directory structure created successfully!

    Base directory: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14
        Input PDFs: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Raw
           Catalog: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Raw\index.csv
      Temp/raw_txt: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\temp\raw_txt
        Temp/clean: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\temp\clean
             Leyes: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Refined\leyes
          Decretos: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Refined\decretos
      Transitorios: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Refined\transitorios
 Final JSON output: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\Refined\json
           Errores: c:\Users\braul\Documents\_ITAMLaptop\Datalab\DataMakers\Leyes\14\

In [3]:
# === Paso 2: Dependencias y chequeos de entorno (ampliado y documentado) ===

# Librerías estándar
import sys, os, re, json, unicodedata
from pathlib import Path
from datetime import datetime

# (Opcionales) Utilidades de análisis/QA tabular
try:
    import pandas as pd
except Exception as e:
    print("Aviso: pandas no disponible. Algunas vistas de QA usarán listas/dicts.")

def _pkg_version(name: str) -> str:
    """
    Devuelve la versión instalada de un paquete o indicadores útiles.
    - 'not-installed' si no está instalado.
    - 'unknown' si no se pudo consultar.
    """
    try:
        from importlib.metadata import version, PackageNotFoundError
    except Exception:
        return "unknown"
    try:
        return version(name)
    except PackageNotFoundError:
        return "not-installed"

# ---- Resumen del entorno (ayuda a reproducir/parchear) ----
print("=== Environment ===")
print("Python:", sys.version.split()[0])
print("pandas:", _pkg_version("pandas"))
print("pydantic:", _pkg_version("pydantic"))

# -----------------------------------------------------------------------------
# Utilidades de normalización (acentos fuera + minúsculas)
#   - strip_accents: remueve diacríticos para facilitar matching insensible a acentos
#   - norm_lower: aplica strip_accents y lower; úsalo antes de comparar con listas simples
# -----------------------------------------------------------------------------
def strip_accents(s: str) -> str:
    """
    Elimina acentos/diacríticos usando Unicode NFD.
    'ARTÍCULO' -> 'ARTICULO'
    """
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def norm_lower(s: str) -> str:
    """
    Normaliza a minúsculas y sin acentos:
    'Sección Única' -> 'seccion unica'
    """
    return strip_accents(s).lower()

# -----------------------------------------------------------------------------
# Patrones base HIER (rápidos) para detectar que una línea "parece" jerárquica:
#   Estos son términos-canarios que suelen abrir encabezados formales.
#   Se aplican sobre texto normalizado (norm_lower(line)).
# -----------------------------------------------------------------------------
HIER = [
    "disposiciones preliminares", "disposiciones generales",
    "libro", "titulo", "capitulo", "seccion",
    "articulo", "art.",                        # incluye “art.” como abreviatura
    "capitulo unico", "seccion unica", "titulo preliminar",
]

# Compilamos un regex que matchea cualquiera de los términos al inicio de línea.
# OJO: este HIER_RE se debe usar contra norm_lower(line) (no contra la línea cruda).
HIER_RE = re.compile(
    r"^\s*(?:%s)\b" % "|".join(re.escape(h) for h in HIER),
    flags=re.I
)

# -----------------------------------------------------------------------------
# Patrones robustos para 'Transitorios':
#   - Soporta escritura espaciada 'T R A N S I T O R I O S'
#   - Reconoce variantes con/ sin acentos y con prefijo 'DE LOS ARTÍCULOS ...'
# -----------------------------------------------------------------------------
TRANS_RE = re.compile(
    r"^\s*(?:"
    r"t\s*r\s*a\s*n\s*s\s*i\s*t\s*o\s*r\s*i\s*o\s*s"   # T R A N S I T O R I O S (espaciado)
    r"|t\s*r\s*a\s*n\s*s\s*i\s*t\s*o\s*r\s*i\s*o"      # T R A N S I T O R I O  (espaciado)
    r"|transitorios?"                                  # transitorio / transitorios
    r"|articulos?\s+transitorios?"                     # articulos transitorios / artículos transitorios
    r"|de\s+los\s+articulos?\s+transitorios?"          # de los artículos transitorios
    r")\b",
    re.I
)

# =============================================================================
# REFUERZO: Encabezados jerárquicos "formales" con ordinales (romanos, dígitos y palabras)
#    Ejemplos cubiertos:
#      - "Sección décima segunda Del servicio y atención al público"
#      - "SECCION DECIMA SEGUNDA ..."
#      - "CAPÍTULO IV De los Efectos ..."
#      - "Capitulo 4 Disposiciones Generales"
#      - "TÍTULO IX Disposiciones finales"
#      - "Libro Segundo Del Proceso"
# -----------------------------------------------------------------------------
# 1) Romanos y dígitos para numerales simples
ROMAN_NUM  = r"[IVXLCDM]+"     # IV, IX, XII...
ARABIC_NUM = r"\d{1,3}"        # 1, 2, 12, 123

# 2) Ordinales en palabras (masculino/femenino), con compuestos:
#    - Masculino: PRIMERO, DÉCIMO PRIMERO, VIGÉSIMO TERCERO, ÚNICO, ...
#    - Femenino:  PRIMERA, DÉCIMA SEGUNDA, VIGÉSIMA CUARTA, ÚNICA, ...
#    Incluimos variantes con/sin acento: SÉPTIMO/SEPTIMO, DÉCIMO/DECIMO, VIGÉSIMO/VIGESIMO, ÚNICO/UNICO...
ORD_MASC = (
    r"(?:"
    r"ÚNICO|UNICO|PRIMERO|SEGUNDO|TERCERO|CUARTO|QUINTO|SEXTO|S[ÉE]PTIMO|OCTAVO|NOVENO|"
    r"D[ÉE]CIMO(?:\s+(?:PRIMERO|SEGUNDO|TERCERO|CUARTO|QUINTO|SEXTO|S[ÉE]PTIMO|OCTAVO|NOVENO))?|"
    r"VIG[ÉE]SIMO(?:\s+(?:PRIMERO|SEGUNDO|TERCERO|CUARTO))?"
    r")"
)

ORD_FEM = (
    r"(?:"
    r"ÚNICA|UNICA|PRIMERA|SEGUNDA|TERCERA|CUARTA|QUINTA|SEXTA|S[ÉE]PTIMA|OCTAVA|NOVENA|"
    r"D[ÉE]CIMA(?:\s+(?:PRIMERA|SEGUNDA|TERCERA|CUARTA|QUINTA|SEXTA|S[ÉE]PTIMA|OCTAVA|NOVENA))?|"
    r"VIG[ÉE]SIMA(?:\s+(?:PRIMERA|SEGUNDA|TERCERA|CUARTA))?"
    r")"
)

# 3) Encabezados robustos por tipo de nivel (permiten Romano | Dígito | Ordinal en palabras)
SECCION_HEADER_RE  = re.compile(rf"^\s*SECC?I[ÓO]N\s+(?:{ROMAN_NUM}|{ARABIC_NUM}|{ORD_FEM})\b.*", re.I)
CAPITULO_HEADER_RE = re.compile(rf"^\s*CAP[ÍI]TULO\s+(?:{ROMAN_NUM}|{ARABIC_NUM}|{ORD_MASC})\b.*", re.I)
TITULO_HEADER_RE   = re.compile(rf"^\s*T[ÍI]TULO\s+(?:{ROMAN_NUM}|{ARABIC_NUM}|{ORD_MASC})\b.*", re.I)
LIBRO_HEADER_RE    = re.compile(rf"^\s*LIBRO\s+(?:{ROMAN_NUM}|{ARABIC_NUM}|{ORD_MASC})\b.*", re.I)

# 4) Unión en un solo patrón que reconoce "línea de encabezado jerárquico formal"
HIER_HEADER_RE = re.compile(
    r"(?:"
    + SECCION_HEADER_RE.pattern + "|"
    + CAPITULO_HEADER_RE.pattern + "|"
    + TITULO_HEADER_RE.pattern   + "|"
    + LIBRO_HEADER_RE.pattern
    + r")", re.I
)

def is_hier_header(line: str) -> bool:
    """
    True si la línea parece encabezado de jerarquía.
    Combina:
      - HIER_RE aplicado a la línea normalizada (rápido, canario)
      - HIER_HEADER_RE (robusto, valida 'SECCIÓN|CAPÍTULO|TÍTULO|LIBRO + numeral/ordinal')
    """
    return bool(HIER_RE.match(norm_lower(line))) or bool(HIER_HEADER_RE.match(line))

# -----------------------------------------------------------------------------
# Autotests (puedes comentar al terminar de integrar)
#   Muestran cómo responden los patrones frente a casos frecuentes.
# -----------------------------------------------------------------------------
print("\n=== Legal structure patterns loaded ===")
print(f"Hierarchy base terms: {len(HIER)}")
print("Matching: case-insensitive + accent-insensitive via norm_lower(...)")
print("Transitional sections: normal + spaced + 'de los artículos transitorios'")

_tests = [
    "Sección décima segunda Del servicio y atención al público",
    "SECCION DECIMA SEGUNDA Del servicio y atencion al publico",
    "Sección primera Disposiciones iniciales",
    "CAPÍTULO IV De los Efectos de la Declaración de Ausencia",
    "Capitulo 4 Disposiciones Generales",
    "TÍTULO IX Disposiciones finales",
    "Libro Segundo Del Proceso",
    "Artículo 3.- Del objeto",
    "T R A N S I T O R I O S",
    "DE LOS ARTÍCULOS TRANSITORIOS",
]
print("\n=== Quick tests ===")
for s in _tests:
    ls = norm_lower(s)                     # normalizamos para HIER_RE base
    base_hit  = bool(HIER_RE.match(ls))    # canario rápido
    strong_hit = bool(HIER_HEADER_RE.match(s))  # formal (sin normalizar, pero con clases acento)
    is_trans = bool(TRANS_RE.match(s))
    print(f"{s!r:85}  HIER_BASE={base_hit:<5}  HIER_HEADER={strong_hit:<5}  TRANS={is_trans:<5}")


=== Environment ===
Python: 3.11.5
pandas: 2.2.3
pydantic: 2.11.5

=== Legal structure patterns loaded ===
Hierarchy base terms: 11
Matching: case-insensitive + accent-insensitive via norm_lower(...)
Transitional sections: normal + spaced + 'de los artículos transitorios'

=== Quick tests ===
'Sección décima segunda Del servicio y atención al público'                            HIER_BASE=1      HIER_HEADER=1      TRANS=0    
'SECCION DECIMA SEGUNDA Del servicio y atencion al publico'                            HIER_BASE=1      HIER_HEADER=1      TRANS=0    
'Sección primera Disposiciones iniciales'                                              HIER_BASE=1      HIER_HEADER=1      TRANS=0    
'CAPÍTULO IV De los Efectos de la Declaración de Ausencia'                             HIER_BASE=1      HIER_HEADER=1      TRANS=0    
'Capitulo 4 Disposiciones Generales'                                                   HIER_BASE=1      HIER_HEADER=1      TRANS=0    
'TÍTULO IX Disposiciones finale

In [4]:
# === Paso 3: Logger minimalista (mejorado y documentado) ===
from typing import Optional, Literal
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
import os, json, traceback

# ──────────────────────────────────────────────────────────────────────────────
# Paths de log
#   - LOGS_PATH sale de la celda de rutas (Paso 1). Si no existe, hacemos fallback.
#   - Además generamos un .jsonl opcional (uno por línea en JSON, fácil de parsear).
# ──────────────────────────────────────────────────────────────────────────────
try:
    LOGS_PATH  # definido en Paso 1
except NameError:
    LOGS_PATH = Path.cwd() / "Refined" / "logs.txt"
    LOGS_PATH.parent.mkdir(parents=True, exist_ok=True)

LOGS_JSONL_PATH = LOGS_PATH.with_suffix(".jsonl")  # p.ej. Refined/logs.jsonl

# ──────────────────────────────────────────────────────────────────────────────
# Niveles de log y umbral configurable
#   - Puedes ajustar el umbral con la variable de entorno LEY_LOG_LEVEL (DEBUG|INFO|WARN|ERROR)
#   - Ej.: os.environ["LEY_LOG_LEVEL"] = "DEBUG"
# ──────────────────────────────────────────────────────────────────────────────
Level = Literal["DEBUG", "INFO", "WARN", "ERROR", "START", "SUCCESS", "STEP"]
_LEVELS_ORDER = {"DEBUG":10, "INFO":20, "WARN":30, "ERROR":40, "START":15, "SUCCESS":25, "STEP":18}

def _get_env_log_level() -> str:
    lvl = os.getenv("LEY_LOG_LEVEL", "INFO").upper().strip()
    return lvl if lvl in _LEVELS_ORDER else "INFO"

_CURRENT_LEVEL_NAME = _get_env_log_level()
_CURRENT_LEVEL = _LEVELS_ORDER[_CURRENT_LEVEL_NAME]

def set_log_level(level_name: str) -> None:
    """Permite cambiar el umbral en tiempo de ejecución: 'DEBUG'|'INFO'|'WARN'|'ERROR'"""
    global _CURRENT_LEVEL_NAME, _CURRENT_LEVEL
    level_name = level_name.upper().strip()
    if level_name in _LEVELS_ORDER:
        _CURRENT_LEVEL_NAME = level_name
        _CURRENT_LEVEL = _LEVELS_ORDER[level_name]
        print(f"[logger] Nivel de log ajustado a {_CURRENT_LEVEL_NAME}")
    else:
        print(f"[logger] Nivel no reconocido: {level_name}. Se mantiene {_CURRENT_LEVEL_NAME}")

def get_log_level() -> str:
    return _CURRENT_LEVEL_NAME

# ──────────────────────────────────────────────────────────────────────────────
# Salida en JSONL (activable por env LEY_LOG_JSONL=1); útil para análisis posterior.
#   - Cada línea es un JSON con: ts, level, file_base, msg
# ──────────────────────────────────────────────────────────────────────────────
def _jsonl_enabled() -> bool:
    return os.getenv("LEY_LOG_JSONL", "0").strip() in {"1", "true", "TRUE", "yes", "YES"}

# ──────────────────────────────────────────────────────────────────────────────
# Utilidades internas de escritura (texto y jsonl)
# ──────────────────────────────────────────────────────────────────────────────
def _write_text_line(path: Path, line: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(line)

def _write_jsonl(path: Path, rec: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

def _should_log(level: str) -> bool:
    # START/STEP/SUCCESS también respetan el umbral (insertados entre INFO y DEBUG)
    return _LEVELS_ORDER.get(level, 999) >= _CURRENT_LEVEL

# ──────────────────────────────────────────────────────────────────────────────
# API principal de logging
# ──────────────────────────────────────────────────────────────────────────────
def log(
    msg: str,
    level: Level = "INFO",
    file_base: Optional[str] = None,
    also_print: bool = True,
) -> None:
    """
    Escribe una línea con timestamp en Refined/logs.txt y opcionalmente en stdout.
    - level: 'DEBUG'|'INFO'|'WARN'|'ERROR'|'START'|'SUCCESS'|'STEP'
    - file_base: ej. '0008' para identificar el documento
    - also_print: si True, imprime en stdout además de registrar en archivo
    """
    if not _should_log(level):
        return

    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    prefix = f"[{ts}] {level}"
    if file_base:
        prefix += f" [{file_base}]"
    line = f"{prefix} {msg}\n"

    # Texto plano
    _write_text_line(LOGS_PATH, line)

    # JSONL estructurado (si está activado por env)
    if _jsonl_enabled():
        rec = {"ts": ts, "level": level, "file_base": file_base, "msg": msg}
        _write_jsonl(LOGS_JSONL_PATH, rec)

    if also_print:
        # stdout (útil cuando corres en ipynb o en CI)
        print(line, end="")

# Azúcar sintáctico (helpers convenientes)
def log_debug(msg: str, base: Optional[str] = None):   log(msg, "DEBUG",   base)
def log_info(msg: str, base: Optional[str] = None):    log(msg, "INFO",    base)
def log_warn(msg: str, base: Optional[str] = None):    log(msg, "WARN",    base)
def log_error(base: str, err: str):                    log(err, "ERROR",   base)
def log_start(base: str, msg: str = "INICIO procesamiento"): log(msg, "START", base)
def log_success(base: str, extra: str = ""):
    txt = f"SUCCESS {extra}".strip()
    log(txt, "SUCCESS", base)

def log_exception(base: str, e: Exception, msg_prefix: str = "", include_trace: bool = True) -> None:
    """
    Registra una excepción con su tipo y mensaje; opcionalmente agrega el traceback completo.
    """
    parts = [msg_prefix] if msg_prefix else []
    parts.append(f"{type(e).__name__}: {e}")
    if include_trace:
        parts.append(traceback.format_exc().strip())
    log("\n".join(parts), "ERROR", base)

# ──────────────────────────────────────────────────────────────────────────────
# Context manager con cronometraje:
#   Uso:
#     with log_step("Parsear parte normativa", base="0008"):
#         ... trabajo ...
# ──────────────────────────────────────────────────────────────────────────────
@contextmanager
def log_step(step_name: str, base: Optional[str] = None):
    t0 = datetime.now()
    log(f"{step_name} ...", level="STEP", file_base=base)
    try:
        yield
        dt = (datetime.now() - t0).total_seconds()
        log(f"{step_name} ✓ ({dt:.2f}s)", level="STEP", file_base=base)
    except Exception as e:
        # Guardamos tiempo consumido antes del error y re-lanzamos
        dt = (datetime.now() - t0).total_seconds()
        log(f"{step_name} ✗ ({dt:.2f}s) -> {type(e).__name__}: {e}", level="ERROR", file_base=base)
        raise

# ──────────────────────────────────────────────────────────────────────────────
# Smoke test (puedes dejarlo o comentarlo luego)
# ──────────────────────────────────────────────────────────────────────────────
log_info("BOOT OK — logger activo")
log_debug("Este DEBUG solo aparece si LEY_LOG_LEVEL=DEBUG")


[2025-09-08 09:21:11] INFO BOOT OK — logger activo


In [5]:
# === Paso 4: Utilidades de texto (normalización ligera y helpers de líneas) ===
from typing import Iterable, Iterator, List, Optional, Tuple
import unicodedata
import re

# Nota:
# - NO alteramos el contenido "semántico" del documento, solo ofrecemos funciones
#   para normalizar espacios invisibles, iterar líneas útiles y detectar encabezados
#   de forma consistente.
# - Estas utilidades se usan en los pasos de split y parseo posteriores.

# Caracteres "espacio" típicos en PDFs/Word que conviene estandarizar.
#   \u00A0 = NBSP (espacio duro), \u2007 = figure space, \u202F = narrow no-break space
_SPACE_ALIASES = {
    "\u00A0": " ",  # NBSP
    "\u2007": " ",  # figure space
    "\u202F": " ",  # narrow NBSP
}

# Guiones/dashes variados que aparecen mezclados: los reducimos a "-" o "—" según necesidad.
# Aquí, por seguridad, NO hacemos reemplazo por defecto; dejamos utilidades por si las necesitamos.
_DASH_ALIASES = {
    "\u2010": "-",  # hyphen
    "\u2011": "-",  # non-breaking hyphen
    "\u2012": "-",  # figure dash
    "\u2013": "-",  # en dash
    "\u2014": "—",  # em dash (lo mantenemos como em dash)
    "\u2212": "-",  # minus sign (a veces aparece en lugar de guion)
}

def replace_space_aliases(s: str) -> str:
    """Reemplaza espacios 'raros' por espacio normal. No toca tabs ni saltos de línea."""
    if not s:
        return s
    for bad, good in _SPACE_ALIASES.items():
        s = s.replace(bad, good)
    return s

def replace_dash_aliases(s: str) -> str:
    """Unifica guiones/dashes a un set mínimo. Útil si vas a detectar tokens con guion."""
    if not s:
        return s
    for bad, good in _DASH_ALIASES.items():
        s = s.replace(bad, good)
    return s

def normalize_line_ws(line: str, squeeze_inner: bool = True) -> str:
    """
    Normaliza espacios de UNA línea:
      - Reemplaza NBSP y similares por espacio normal.
      - Opcional: colapsa secuencias de espacios internos a 1 (no toca tabs).
      - Stripa extremos.
    """
    if line is None:
        return ""
    out = replace_space_aliases(line)
    # No cambiamos tabs por defecto, pero puedes activarlo si tu limpieza lo requiere:
    # out = out.replace("\t", " ")
    if squeeze_inner:
        out = re.sub(r"[ ]{2,}", " ", out)
    return out.strip()

def clean_text_light(raw: str) -> str:
    """
    Limpieza MUY ligera del texto completo (no refluye párrafos, no quita saltos).
    - Normaliza alias de espacios y guiones comunes.
    - Quita \r (CR) de Windows.
    - Aplica strip de espacios a cada línea.
    """
    if raw is None:
        return ""
    txt = raw.replace("\r", "")
    # Opcional: si tus fuentes traen muchos guiones raros, descomenta:
    # txt = replace_dash_aliases(txt)
    lines = [normalize_line_ws(ln, squeeze_inner=True) for ln in txt.split("\n")]
    return "\n".join(lines)

def is_blank(s: Optional[str]) -> bool:
    """True si la cadena es None, vacía o solo espacios (tras normalizar espacios invisibles)."""
    if s is None:
        return True
    return len(replace_space_aliases(s).strip()) == 0

def nonempty_lines(text: str) -> List[str]:
    """Devuelve las líneas NO vacías, ya con normalización ligera de espacios."""
    if not text:
        return []
    return [ln for ln in clean_text_light(text).split("\n") if not is_blank(ln)]

def iter_nonempty_lines(text: str) -> Iterator[Tuple[int, str]]:
    """Itera (índice, línea) para líneas no vacías, con índice relativo al texto limpio."""
    if not text:
        return iter(())
    for i, ln in enumerate(clean_text_light(text).split("\n")):
        if not is_blank(ln):
            yield i, ln

def next_nonempty_index(lines: List[str], start: int) -> Optional[int]:
    """
    Dada una lista de líneas (ya 'limpias'), devuelve el índice de la siguiente línea no vacía
    a partir de 'start' (incluido). Si no hay, devuelve None.
    """
    n = len(lines)
    j = max(0, start)
    while j < n and is_blank(lines[j]):
        j += 1
    return j if j < n else None

def prev_nonempty_index(lines: List[str], start: int) -> Optional[int]:
    """Versión hacia atrás: busca la anterior línea no vacía antes de 'start' (exclusivo)."""
    j = min(len(lines) - 1, start - 1)
    while j >= 0 and is_blank(lines[j]):
        j -= 1
    return j if j >= 0 else None

# ──────────────────────────────────────────────────────────────────────────────
# Wrappers de detección para reutilizar patrones del Paso 2
#   - Usan HIER_RE / HIER_HEADER_RE / TRANS_RE si están definidos.
#   - Son tolerantes: si el patrón aún no existe (celda no ejecutada), devuelven False.
# ──────────────────────────────────────────────────────────────────────────────
def is_hier_candidate(line: str) -> bool:
    """
    Heurística rápida: ¿Esta línea 'parece' ser encabezado jerárquico?
    Usa las funciones/patrones del Paso 2 si existen; si no, False.
    """
    try:
        # is_hier_header combina HIER_RE + HIER_HEADER_RE
        return bool(is_hier_header(line))
    except NameError:
        # Fallback mínimo si alguien llama esto antes del Paso 2
        return False

def is_trans_header(line: str) -> bool:
    """¿Es encabezado de Transitorios? Usa TRANS_RE del Paso 2 si existe; si no, False."""
    try:
        return bool(TRANS_RE.match(line))
    except NameError:
        return False

def soft_wrap_join(text: str, enable: bool = False) -> str:
    """
    Experimental: une "soft line-wraps" dentro de párrafos.
    POR DEFECTO está desactivado (enable=False) porque tus clean_*.txt ya vienen por párrafo.
    Si lo activas, intenta:
      - Unir líneas que NO parezcan encabezados
      - Mantener saltos antes/depués de encabezados (Capítulo/Sección/Artículo/Transitorios)
    """
    if not enable or not text:
        return text
    out_lines: List[str] = []
    buf: List[str] = []

    def flush_buf():
        if buf:
            # Une con un espacio simple para evitar palabras pegadas
            out_lines.append(normalize_line_ws(" ".join(buf), squeeze_inner=True))
            buf.clear()

    lines = clean_text_light(text).split("\n")
    for ln in lines:
        if is_blank(ln):
            flush_buf()
            out_lines.append("")  # conserva línea en blanco
            continue
        # Si detectamos un encabezado jerárquico o de transitorios, cerramos el párrafo previo
        if is_hier_candidate(ln) or is_trans_header(ln):
            flush_buf()
            out_lines.append(ln)
            continue
        # Caso normal: agregamos a buffer del mismo párrafo
        buf.append(ln)
    flush_buf()
    return "\n".join(out_lines)

def peek_lines(lines: List[str], i: int, k: int = 2) -> List[Tuple[int, str]]:
    """
    Devuelve una ventana de contexto de ±k líneas alrededor de i (para debug).
    Útil cuando queremos loggear un encabezado y ver su vecindad.
    """
    n = len(lines)
    lo, hi = max(0, i - k), min(n, i + k + 1)
    return list(zip(range(lo, hi), lines[lo:hi]))

# ──────────────────────────────────────────────────────────────────────────────
# Pruebas rápidas (puedes comentarlas una vez integrado)
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    _demo = (
        "   SECCIÓN   décima   segunda   Del servicio y atención al público  \n"
        "Este es un párrafo de ejemplo con   espacios   raros.\n"
        " \n"
        "CAPÍTULO IV De los Efectos de la Declaración de Ausencia\n"
        "Otro párrafo.\n"
        "T R A N S I T O R I O S\n"
    )
    print("— Limpieza ligera —")
    print(clean_text_light(_demo))
    print("\n— is_hier_candidate / is_trans_header —")
    for idx, ln in iter_nonempty_lines(_demo):
        print(f"[{idx:02d}] HIER={is_hier_candidate(ln)} TRANS={is_trans_header(ln)}  | {ln}")


— Limpieza ligera —
SECCIÓN décima segunda Del servicio y atención al público
Este es un párrafo de ejemplo con espacios raros.

CAPÍTULO IV De los Efectos de la Declaración de Ausencia
Otro párrafo.
T R A N S I T O R I O S


— is_hier_candidate / is_trans_header —
[00] HIER=True TRANS=False  | SECCIÓN décima segunda Del servicio y atención al público
[01] HIER=False TRANS=False  | Este es un párrafo de ejemplo con espacios raros.
[03] HIER=True TRANS=False  | CAPÍTULO IV De los Efectos de la Declaración de Ausencia
[04] HIER=False TRANS=False  | Otro párrafo.
[05] HIER=False TRANS=True  | T R A N S I T O R I O S


## Paso 5

In [6]:
# === Paso 5: Patrones para Artículos / Fracciones / Ordinales (Transitorios) ===
import re
from typing import Optional, Tuple

# ──────────────────────────────────────────────────────────────────────────────
# ARTÍCULOS (permanentes)
#  - Encabezados válidos: "Artículo 1", "Artículo 1.", "ARTÍCULO 12", "Art. 3",
#    "Artículo 5o.", "Artículo 5º", "Artículo 10-Bis", "ART. 20 A", "Artículo 2 Ter".
#  - Reglas del corpus: guardamos el número de artículo como **entero** (1,2,3...).
#    Sufijos (Bis/Ter/Quáter/A/B/...) no se guardan en el campo "Artículo" pero
#    los capturamos como metadato opcional para **deduplicación** posterior.
#    (Así evitamos fundir "1" y "1 Bis" en un solo artículo al hacer merges).
# ──────────────────────────────────────────────────────────────────────────────

# Indicador ordinal a veces impreso tras el dígito: "1o.", "1º", "1°"
_ORDINAL_MARK = r"(?:[oº°]\.?)?"

# Sufijos comunes en MX/ES (con variantes de acento), y letras A–Z con/ sin guion
#   - bis/ter/quáter/quater/quinquies/sexies/septies/octies/nonies/decies...
#   - también "A", "B", "C" y formas "1-A", "1 A"
_ART_SUFFIX = (
    r"(?:"
    r"(?:"  # lista de cultismos
    r"bis|ter|qu[áa]ter|quinquies|sexies|septies|octies|nonies|decies"
    r")"
    r"|[A-Z])"  # letras sueltas (A, B, C...)
)

# Separador entre número y sufijo (opcional): espacio o guion
_NUM_SUFFIX_SEP = r"(?:\s*-\s*|\s+)"

# Núcleo del encabezado de artículo: número + (ordinal-mark) + (sufijo opcional)
_ART_NUM_CORE = rf"(?P<num>\d{{1,4}}){_ORDINAL_MARK}(?:{_NUM_SUFFIX_SEP}(?P<suf>{_ART_SUFFIX}))?"

# Prefijo "Artículo"/"Art." con o sin acentos/puntos
_ART_LABEL = r"(?:art[íi]?culo|art\.?)"

# Puntuación/finalización de encabezado (tolerante)
_END_PUNCT = r"(?:[\.\-–—:)\]]\s*)?"

# Regex final para encabezados de Artículo (match al **inicio** de línea)
ARTICLE_RE = re.compile(
    rf"^\s*{_ART_LABEL}\s+{_ART_NUM_CORE}\s*{_END_PUNCT}",
    re.IGNORECASE
)

def is_article_header(line: str) -> bool:
    """True si la línea parece encabezado de Artículo."""
    return bool(ARTICLE_RE.match(line))

def extract_article_num_suffix(line: str) -> Optional[Tuple[int, Optional[str]]]:
    """
    Devuelve (numero_entero, sufijo|None) si la línea es encabezado de artículo.
    - numero_entero: int (ej. 10 para '10', '10º', '10o.')
    - sufijo: str|None (ej. 'Bis', 'A', 'Ter'); se devuelve normalizado en Mayúsculas iniciales.
    """
    m = ARTICLE_RE.match(line)
    if not m:
        return None
    num = int(m.group("num"))
    suf = m.group("suf")
    if suf:
        # Normaliza sufijo: 'quater' -> 'Quater', 'quáter' -> 'Quáter', 'a' -> 'A'
        suf = suf.strip()
        suf = suf[0].upper() + suf[1:].lower() if len(suf) > 1 else suf.upper()
    return num, suf

# ──────────────────────────────────────────────────────────────────────────────
# FRACCIONES dentro de artículos
#  - Formatos frecuentes: "I.", "II)", "III.-", "IV —", "V—", "Fracción VI.", etc.
#  - Permitimos prefijo "Fracción " opcional y capturamos el romano como "Fracción".
# ──────────────────────────────────────────────────────────────────────────────

# Romano "bien formado" (hasta miles, suficientemente amplio para la práctica)
_ROMAN = r"M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})"
# Delimitadores típicos tras el romano
_DELIM = r"(?:[\.\-–—:)\]]\s*|\s{2,})"

FRACCION_RE = re.compile(
    rf"^\s*(?:fracci[oó]n\s+)?(?P<roman>{_ROMAN})\s*{_DELIM}",
    re.IGNORECASE
)

def is_fraction_header(line: str) -> Optional[str]:
    """
    Si la línea inicia una fracción, devuelve el romano normalizado (ej. 'IV').
    Si no, devuelve None.
    """
    m = FRACCION_RE.match(line)
    if not m:
        return None
    return m.group("roman").upper()

# ──────────────────────────────────────────────────────────────────────────────
# ORDINALES de Transitorios (encabezados de artículos transitorios)
#  - Reconoce 'Único/Única', 'Primero/Primera', ..., 'Décima Segunda', 'Vigésimo Cuarto', etc.
#  - También forma con prefijo 'Artículo': 'Artículo Primero', 'Artículo Único'.
# ──────────────────────────────────────────────────────────────────────────────

# Ordinales en palabras (masc/fem) con compuestos; compatibilidad sin acento.
_ORD_MASC = (
    r"(?:"
    r"ÚNICO|UNICO|PRIMERO|SEGUNDO|TERCERO|CUARTO|QUINTO|SEXTO|S[ÉE]PTIMO|OCTAVO|NOVENO|"
    r"D[ÉE]CIMO(?:\s+(?:PRIMERO|SEGUNDO|TERCERO|CUARTO|QUINTO|SEXTO|S[ÉE]PTIMO|OCTAVO|NOVENO))?|"
    r"VIG[ÉE]SIMO(?:\s+(?:PRIMERO|SEGUNDO|TERCERO|CUARTO))?"
    r")"
)
_ORD_FEM = (
    r"(?:"
    r"ÚNICA|UNICA|PRIMERA|SEGUNDA|TERCERA|CUARTA|QUINTA|SEXTA|S[ÉE]PTIMA|OCTAVA|NOVENA|"
    r"D[ÉE]CIMA(?:\s+(?:PRIMERA|SEGUNDA|TERCERA|CUARTA|QUINTA|SEXTA|S[ÉE]PTIMA|OCTAVA|NOVENA))?|"
    r"VIG[ÉE]SIMA(?:\s+(?:PRIMERA|SEGUNDA|TERCERA|CUARTA))?"
    r")"
)
_ORD_TOKEN = rf"(?:{_ORD_MASC}|{_ORD_FEM})"

# 'Artículo' opcional delante del ordinal textual
ORD_WITH_PREFIX_RE = re.compile(
    rf"^\s*(?:art[íi]?culo\s+)?(?P<ord>{_ORD_TOKEN})\b",
    re.IGNORECASE
)

def extract_transitory_ordinal(line: str) -> Optional[str]:
    """
    Devuelve el ordinal textual normalizado (capitalizado) si la línea
    inicia un artículo transitorio (p. ej. 'Único', 'Primero', 'Décima Segunda').
    Si no, devuelve None.
    """
    m = ORD_WITH_PREFIX_RE.match(line)
    if not m:
        return None
    raw = m.group("ord").strip()
    # Normaliza: 'primero' -> 'Primero', 'dECIMA sEGUNDA' -> 'Décima Segunda' (best-effort)
    def _cap_token(tok: str) -> str:
        if not tok:
            return tok
        # conserva mayúsculas/acentos razonablemente
        return tok[0].upper() + tok[1:].lower()
    return " ".join(_cap_token(t) for t in raw.split())


# --- REEMPLAZO en Paso 5: Romanos NO vacíos + FRACCION_RE estricta ---

# Requiere al menos un símbolo romano (lookahead); evita coincidencias vacías.
_ROMAN_NZ = r"(?=[MDCLXVI])M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})"

# Delimitadores tras el romano (puntuación habitual o espacios largos en “fracciones en línea”)
_DELIM = r"(?:[\.\-–—:)\]]\s*|\s{2,})"

FRACCION_RE = re.compile(
    rf"^\s*(?:fracci[oó]n\s+)?(?P<roman>{_ROMAN_NZ})\s*{_DELIM}",
    re.IGNORECASE
)


# ──────────────────────────────────────────────────────────────────────────────
# PRUEBAS RÁPIDAS (puedes comentar una vez integradas)
# ──────────────────────────────────────────────────────────────────────────────
_tests_articles = [
    "Artículo 1. Disposiciones generales",
    "ARTÍCULO 12 De las definiciones",
    "Art. 3) Del objeto",
    "Artículo 5o. De la finalidad",
    "Artículo 5º De la finalidad",
    "Artículo 10-Bis. Procedimiento",
    "ARTÍCULO 20 A — Plazos",
    "Artículo 2 Ter: Requisitos",
]
print("\n=== Tests: Artículos ===")
for s in _tests_articles:
    print(f"{s!r:40}  -> is_header={is_article_header(s)}  extract={extract_article_num_suffix(s)}")

_tests_fracciones = [
    "I.- Requisitos",
    "II) Plazos",
    "III. Autoridades competentes",
    "Fracción IV.— Obligaciones",
    "V — Sanciones",
    "VI] (caso raro pero visto) Texto",
]
print("\n=== Tests: Fracciones ===")
for s in _tests_fracciones:
    print(f"{s!r:35}  -> frac={is_fraction_header(s)}")

_tests_ord = [
    "Artículo Único.",
    "Único",
    "Artículo Primero",
    "Primero.",
    "Décima Segunda",
    "Artículo vigésimo cuarto",
]
print("\n=== Tests: Ordinales Transitorios ===")
for s in _tests_ord:
    print(f"{s!r:28}  -> ord={extract_transitory_ordinal(s)}")
    




=== Tests: Artículos ===
'Artículo 1. Disposiciones generales'     -> is_header=True  extract=(1, None)
'ARTÍCULO 12 De las definiciones'         -> is_header=True  extract=(12, 'D')
'Art. 3) Del objeto'                      -> is_header=True  extract=(3, None)
'Artículo 5o. De la finalidad'            -> is_header=True  extract=(5, 'D')
'Artículo 5º De la finalidad'             -> is_header=True  extract=(5, 'D')
'Artículo 10-Bis. Procedimiento'          -> is_header=True  extract=(10, 'Bis')
'ARTÍCULO 20 A — Plazos'                  -> is_header=True  extract=(20, 'A')
'Artículo 2 Ter: Requisitos'              -> is_header=True  extract=(2, 'Ter')

=== Tests: Fracciones ===
'I.- Requisitos'                     -> frac=I
'II) Plazos'                         -> frac=II
'III. Autoridades competentes'       -> frac=III
'Fracción IV.— Obligaciones'         -> frac=IV
'V — Sanciones'                      -> frac=V
'VI] (caso raro pero visto) Texto'   -> frac=VI

=== Tests: Ordinales Trans

In [7]:
# === Paso 6: Lectura de archivos y naming (robusto y documentado) ===
from pathlib import Path
from typing import List, Tuple, Optional, Iterable, Set
import re, os

# Nota: esta celda asume que ya ejecutaste:
#  - Paso 1 (rutas: BASE_DIR, CLEAN_DIR, etc.)
#  - Paso 3 (logger: log_info, log_error, log_start, etc.)  ← si no, usa print.

# ──────────────────────────────────────────────────────────────────────────────
# 1) Extractor de "base" numérico estable
#    - Preferimos la primera secuencia de 3–6 dígitos del nombre (stem).
#    - Si no hay dígitos, usamos un slug reproducible del stem.
# ──────────────────────────────────────────────────────────────────────────────

_BASE_ID_RE = re.compile(r"(\d{3,6})(?!\d)")

def slugify_stem(path: Path) -> str:
    """
    Convierte el 'stem' a un slug ASCII estable (sin espacios/acentos), útil como fallback.
    p.ej. "Ley_de_Salud" -> "ley_de_salud"
    """
    import unicodedata
    stem = path.stem
    norm = unicodedata.normalize("NFD", stem)
    ascii_only = "".join(c for c in norm if unicodedata.category(c) != "Mn")
    ascii_only = re.sub(r"[^A-Za-z0-9._-]+", "_", ascii_only)
    ascii_only = re.sub(r"_+", "_", ascii_only).strip("_").lower()
    return ascii_only or "doc"

def extract_file_base(path: Path) -> str:
    """
    Devuelve un identificador base:
      - Primera coincidencia de 3–6 dígitos en el stem (p.ej. 'clean_0008' -> '0008').
      - Si no hay dígitos, usa slugify_stem(path).
    """
    m = _BASE_ID_RE.search(path.stem)
    if m:
        return m.group(1)
    return slugify_stem(path)

# ──────────────────────────────────────────────────────────────────────────────
# 2) Lector de texto con fallback de codificaciones
#    - Intenta UTF-8 y variantes, luego ISO-8859-1/Windows-1252.
#    - Devuelve (texto, encoding_usado).
# ──────────────────────────────────────────────────────────────────────────────

_DEF_ENCODINGS = ("utf-8", "utf-8-sig", "cp1252", "latin-1")

def load_text(path: Path, encodings: Iterable[str] = _DEF_ENCODINGS) -> Tuple[str, str]:
    """
    Lee el archivo de texto probando múltiples codificaciones.
    Levanta FileNotFoundError si no existe. Si todas fallan, vuelve a lanzar la última excepción.
    """
    last_exc: Optional[Exception] = None
    for enc in encodings:
        try:
            txt = path.read_text(encoding=enc)
            # Normalización menor de saltos de línea (\r\n -> \n) sin tocar contenido
            txt = txt.replace("\r\n", "\n").replace("\r", "\n")
            # Log mínimo (si hay logger)
            try:
                log_info(f"Leído con encoding={enc}", base=extract_file_base(path))
            except Exception:
                pass
            return txt, enc
        except Exception as e:
            last_exc = e
            continue
    # Si llegamos aquí, todas fallaron
    try:
        log_error(extract_file_base(path), f"Fallo de lectura con {encodings}: {last_exc}")
    except Exception:
        pass
    raise last_exc  # re-lanza el último error para que el pipeline lo maneje

# ──────────────────────────────────────────────────────────────────────────────
# 3) Descubrimiento de archivos de entrada
#    - Busca por patrón (default: "clean_*.txt") en rutas típicas:
#      [cwd, BASE_DIR, CLEAN_DIR, /mnt/data].
#    - Devuelve lista de Paths únicos y existentes (ordenada).
# ──────────────────────────────────────────────────────────────────────────────

def _existing_dir(p: Optional[Path]) -> Optional[Path]:
    try:
        return p if p and p.exists() and p.is_dir() else None
    except Exception:
        return None

def resolve_input(pattern: str = "clean_*.txt") -> List[Path]:
    """
    Localiza archivos que coincidan con 'pattern' en rutas comunes.
    Evita duplicados por path absoluto y devuelve lista ordenada.
    """
    roots: List[Path] = []
    # Preferencias: cwd (notebook actual), CLEAN_DIR (si existe), BASE_DIR, /mnt/data (si existe)
    roots.append(Path.cwd())
    try:
        roots.append(CLEAN_DIR)  # definido en Paso 1
    except NameError:
        pass
    try:
        roots.append(BASE_DIR)   # definido en Paso 1
    except NameError:
        pass
    mnt_data = Path("/mnt/data")
    if _existing_dir(mnt_data):
        roots.append(mnt_data)

    seen: Set[str] = set()
    out: List[Path] = []
    for root in roots:
        if not _existing_dir(root):
            continue
        for p in sorted(root.glob(pattern)):
            ap = str(p.resolve())
            if ap not in seen and p.is_file():
                seen.add(ap)
                out.append(p)
    # Log de conteo
    try:
        log_info(f"resolve_input: {len(out)} archivos encontrados para patrón='{pattern}'", base=None)
    except Exception:
        pass
    return out

# ──────────────────────────────────────────────────────────────────────────────
# 4) Validación rápida de contenido limpio
#    - Evita pasar archivos vacíos o triviales al parser.
# ──────────────────────────────────────────────────────────────────────────────

def validate_clean_text(text: str, min_chars: int = 50) -> Tuple[bool, str]:
    """
    Regresa (ok, motivo). ok=False si:
      - texto vacío o None
      - longitud < min_chars (por default 50)
    """
    if text is None:
        return False, "texto=None"
    if len(text.strip()) == 0:
        return False, "texto_vacio"
    if len(text) < min_chars:
        return False, f"texto_tamano_insuficiente(<{min_chars})"
    return True, "ok"

# ──────────────────────────────────────────────────────────────────────────────
# 5) Mini smoke test (puedes comentar después)
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    files = resolve_input("clean_*.txt")
    print(f"[Paso 6] Encontrados: {len(files)} archivos")
    for p in files[:3]:
        base = extract_file_base(p)
        try:
            txt, enc = load_text(p)
            ok, why = validate_clean_text(txt)
            print(f"  - {p.name}  base={base}  enc={enc}  valido={ok} ({why})")
        except Exception as e:
            print(f"  - {p.name}  base={base}  ERROR: {e}")


[2025-09-08 09:21:11] INFO resolve_input: 301 archivos encontrados para patrón='clean_*.txt'
[Paso 6] Encontrados: 301 archivos
[2025-09-08 09:21:11] INFO [0001] Leído con encoding=utf-8
  - clean_0001.txt  base=0001  enc=utf-8  valido=True (ok)
[2025-09-08 09:21:11] INFO [0002] Leído con encoding=utf-8
  - clean_0002.txt  base=0002  enc=utf-8  valido=True (ok)
[2025-09-08 09:21:11] INFO [0003] Leído con encoding=utf-8
  - clean_0003.txt  base=0003  enc=utf-8  valido=True (ok)


## Paso 7

In [8]:
# === Paso 7: Split en Decreto / Parte Normativa / Transitorios (robusto y documentado) ===
from typing import Optional, Dict, Tuple, List
import re

def _find_first_normative_idx(lines: List[str], prefer_strict: bool = True) -> int:
    """
    Devuelve el índice de la primera línea que parece el inicio de la parte normativa.
    Heurística:
      1) Prefiere encabezados jerárquicos "formales" (Sección/Capítulo/Título/Libro + ordinal),
         o una línea que sea encabezado de Artículo.
      2) Si prefer_strict=False y no se encontró, cae a un match más laxo (HIER base).
    """
    # 1) Strict: encabezado jerárquico formal OR encabezado de artículo
    for i, ln in enumerate(lines):
        if is_hier_candidate(ln):           # usa HIER_RE + HIER_HEADER_RE (Paso 2/4)
            return i
        if ARTICLE_RE.match(ln):            # Paso 5
            return i

    # 2) Fallback laxo (si se permite): términos canarios al inicio (norm_lower)
    if not prefer_strict:
        for i, ln in enumerate(lines):
            try:
                if HIER_RE.match(norm_lower(ln)):   # Paso 2
                    return i
            except NameError:
                break
    return -1


def _find_first_trans_idx(lines: List[str], start: int = 0) -> int:
    """
    Devuelve el índice de la primera línea que parece encabezado de Transitorios
    a partir de 'start'. Si no hay, devuelve -1.
    """
    for i in range(max(0, start), len(lines)):
        if is_trans_header(lines[i]):       # Paso 4 (usa TRANS_RE)
            return i
    return -1


def split_decreto_norma_trans(
    raw_text: str,
    *,
    prefer_strict: bool = True,
    log_base: Optional[str] = None,
    return_spans: bool = False
) -> Dict[str, object]:
    """
    Separa el documento en tres bloques:
      - 'decreto': todo antes del primer encabezado normativo real
      - 'norma'  : desde el encabezado normativo hasta antes de 'Transitorios' (si existen)
      - 'trans'  : desde el encabezado de 'Transitorios' hasta el final (si existe)

    Parámetros:
      - prefer_strict: si True, la detección del inicio normativo exige encabezado formal o artículo;
                       si False, permite caer a HIER base como último recurso.
      - log_base     : id opcional para el logger (p.ej. "0008")
      - return_spans : si True, además devuelve índices (start,end) por sección (útil para debugging)

    Notas:
      - Conservamos la línea de encabezado de 'TRANSITORIOS' dentro del bloque 'trans',
        porque el parser de transitorios (Paso 12) la puede remover con seguridad.
      - No reflow: no modificamos saltos de línea del bloque; sólo hacemos una limpieza ligera previa.
    """
    # 0) Limpieza MUY ligera por línea (NBSP -> space, colapsar espacios internos, strip)
    text = clean_text_light(raw_text)
    lines = text.split("\n")
    n = len(lines)

    # 1) Localizar inicio de parte normativa
    nidx = _find_first_normative_idx(lines, prefer_strict=prefer_strict)

    # 2) Localizar 'Transitorios' a partir de nidx (si hay parte normativa)
    tidx = _find_first_trans_idx(lines, start=max(0, nidx)) if nidx != -1 else -1

    # 3) Construir secciones
    if nidx == -1:
        # No se detectó parte normativa → todo es decreto, no hay norma/trans
        decreto = text.strip()
        norma   = ""
        trans   = ""
        spans = {"decreto": (0, n), "norma": None, "trans": None}
    else:
        # Decreto = [0, nidx)
        decreto = "\n".join(lines[:nidx]).strip()
        if tidx == -1:
            # No hay transitorios → norma = [nidx, end)
            norma = "\n".join(lines[nidx:]).strip()
            trans = ""
            spans = {"decreto": (0, nidx), "norma": (nidx, n), "trans": None}
        else:
            # Hay transitorios → norma = [nidx, tidx) ; trans = [tidx, end)
            norma = "\n".join(lines[nidx:tidx]).strip()
            trans = "\n".join(lines[tidx:]).strip()
            spans = {"decreto": (0, nidx), "norma": (nidx, tidx), "trans": (tidx, n)}

    # 4) Logs mínimos (si tienes el logger cargado)
    try:
        if log_base:
            # Resumen de líneas por bloque
            dl = decreto.count("\n") + (1 if decreto else 0)
            nl = norma.count("\n") + (1 if norma else 0)
            tl = trans.count("\n") + (1 if trans else 0)
            log_info(f"split: decreto={dl} líneas, norma={nl} líneas, trans={tl} líneas", base=log_base)
            if nidx == -1:
                log_warn("No se detectó inicio normativo", base=log_base)
            if nidx != -1 and tidx == -1:
                log_info("Documento sin Transitorios", base=log_base)
    except Exception:
        pass

    out = {"decreto": decreto, "norma": norma, "trans": trans}
    if return_spans:
        out["spans"] = spans
    return out


# ──────────────────────────────────────────────────────────────────────────────
# PRUEBAS RÁPIDAS (puedes comentar una vez integrado)
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    demo = (
        "PODER EJECUTIVO DEL ESTADO\n"
        "Periódico Oficial, 2012\n"
        "---------------------------------\n"
        "CAPÍTULO IV De los Efectos de la Declaración de Ausencia\n"
        "Artículo 1. Del objeto\n"
        "I. Primera fracción\n"
        "II) Segunda fracción\n"
        "Sección décima segunda Del servicio y atención al público\n"
        "Artículo 2.- Otro artículo\n"
        "T R A N S I T O R I O S\n"
        "Único. El presente Decreto entrará en vigor...\n"
    )
    res = split_decreto_norma_trans(demo, prefer_strict=True, log_base=None, return_spans=True)
    print("— Decreto —")
    print(res["decreto"][:200], "...\n")
    print("— Norma —")
    print(res["norma"][:300], "...\n")
    print("— Transitorios —")
    print(res["trans"][:200], "...\n")
    print("Spans:", res.get("spans"))


— Decreto —
PODER EJECUTIVO DEL ESTADO
Periódico Oficial, 2012
--------------------------------- ...

— Norma —
CAPÍTULO IV De los Efectos de la Declaración de Ausencia
Artículo 1. Del objeto
I. Primera fracción
II) Segunda fracción
Sección décima segunda Del servicio y atención al público
Artículo 2.- Otro artículo ...

— Transitorios —
T R A N S I T O R I O S
Único. El presente Decreto entrará en vigor... ...

Spans: {'decreto': (0, 3), 'norma': (3, 9), 'trans': (9, 12)}


In [9]:
# === Paso 8: Heurísticas de Título y Año_publicación (documentado y robusto) ===
from typing import Optional, Tuple, List, Dict
import re
from datetime import datetime

# -----------------------------------------------------------------------------
# Patrones básicos para reconocer líneas "tipo título"
#   - Coinciden al inicio de línea con palabras clave típicas de encabezado oficial
#   - Evitamos confundir con "Capítulo/Sección/Título/Artículo" gracias a filtros abajo
# -----------------------------------------------------------------------------
TITLE_HEAD_RE = re.compile(
    r"^\s*(ley|c[oó]digo|reglamento|estatuto|bando|constituci[oó]n|decreto)\b",
    re.IGNORECASE
)

# Meses en español para fechas tipo "12 de mayo de 2015"
MESES_ES = r"(enero|febrero|marzo|abril|mayo|junio|julio|agosto|septiembre|setiembre|octubre|noviembre|diciembre)"
# Años plausibles (1850–2100)
YEAR_RE = re.compile(r"\b(18|19|20)\d{2}\b")
# Frases de contexto que suelen acompañar a la fecha de publicación/vigencia
YEAR_CONTEXT_RE = re.compile(
    rf"(diario\s+oficial|per[ií]odico\s+oficial|publicad[oa]|se\s+publica|expedid[aoa]|promulgad[oa]|entra\s+en\s+vigor|vigencia|fecha)",
    re.IGNORECASE
)
# Fecha completa: "12 de mayo de 2015" (captura el año al final)
FECHA_COMPLETA_RE = re.compile(rf"\b\d{{1,2}}\s+de\s+{MESES_ES}\s+de\s+(?P<anio>(18|19|20)\d{{2}})\b", re.IGNORECASE)

def _uppercase_ratio(s: str) -> float:
    """Porcentaje aproximado de letras A–Z en mayúsculas (ignora no letras)."""
    letters = [c for c in s if c.isalpha()]
    if not letters:
        return 0.0
    upp = sum(1 for c in letters if c.isupper())
    return upp / len(letters)

def _is_heading_like(line: str) -> bool:
    """
    True si la línea parece encabezado jerárquico (Capítulo/Sección/Título/Libro/Artículo).
    Usa funciones/patrones de pasos previos si existen; si no, aplica filtros básicos.
    """
    try:
        # Paso 2/4 (robusto): combina base + formales
        if 'is_hier_header' in globals() and is_hier_header(line):
            return True
    except Exception:
        pass
    # Fallback: evita falsos positivos obvios
    if re.match(r"^\s*(cap[íi]tulo|secc?i[óo]n|t[íi]tulo)\b", line, re.IGNORECASE):
        return True
    try:
        if 'ARTICLE_RE' in globals() and ARTICLE_RE.match(line):
            return True
    except Exception:
        pass
    return False

def _score_title_candidate(line: str, idx: int, nidx: int) -> float:
    """
    Asigna un puntaje a una línea candidata a Título:
     - Palabra clave al inicio (LEY/REGLAMENTO/CÓDIGO/...) → +10
     - Relación longitud razonable (10–220) → +3 (si cumple)
     - Ratio mayúsculas alto (≥ 0.65) → +4 (títulos suelen ir versales)
     - Proximidad a inicio de normativa (<= 20 líneas) → +3
     - Presencia de conectores 'de', 'del', 'de la', 'para' → +2
     - Penalizaciones si parece un encabezado jerárquico → -100 (descarta)
    """
    if _is_heading_like(line):
        return -100.0
    score = 0.0
    if TITLE_HEAD_RE.match(line):
        score += 10.0
    L = len(line.strip())
    if 10 <= L <= 220:
        score += 3.0
    if _uppercase_ratio(line) >= 0.65:
        score += 4.0
    if nidx >= 0 and abs(idx - nidx) <= 20:
        score += 3.0
    if re.search(r"\b(de|del|de la|para)\b", line, re.IGNORECASE):
        score += 2.0
    return score

def _find_first_normative_idx_for_title(lines: List[str]) -> int:
    """
    Clon ligero del buscador de inicio normativo (Paso 7), para no depender
    estrictamente del orden de ejecución si alguien llama esto antes.
    Prefiere encabezados jerárquicos o artículos.
    """
    # Estricto
    for i, ln in enumerate(lines):
        if _is_heading_like(ln):
            return i
    try:
        if 'HIER_RE' in globals():
            for i, ln in enumerate(lines):
                if HIER_RE.match(norm_lower(ln)):  # Paso 2
                    return i
    except Exception:
        pass
    return -1

def guess_title_and_year(
    raw_text: str,
    *,
    context_window: int = 50,
    log_base: Optional[str] = None
) -> Tuple[str, Optional[int]]:
    """
    Detecta:
      - Título (línea que mejor parece encabezado oficial del documento)
      - Año_publicación (entero) a partir de fechas/indicios cercanos al encabezado o al inicio normativo.

    Estrategia:
      1) Limpieza ligera y split por líneas.
      2) Ubicar (mejor estimación de) inicio normativo (nidx).
      3) Buscar candidatos a Título:
         - ventanas: primeras `context_window` líneas y ±20 alrededor de nidx.
         - filtrar encabezados jerárquicos/Artículos.
         - puntuar con _score_title_candidate y elegir el mejor.
      4) Buscar años:
         - escanear primeras 200 líneas y ±40 alrededor de nidx.
         - favorecer líneas con contexto YEAR_CONTEXT_RE y fechas completas.
         - elegir el año con mayor peso; en empate, el mayor ≤ año_actual.
    """
    text = clean_text_light(raw_text)
    lines = [ln for ln in text.split("\n")]

    # 1) Hallar un índice razonable de inicio normativo
    #    (usa Paso 7 si existe; si no, el clon interno)
    try:
        nidx = _find_first_normative_idx(lines=True and lines, prefer_strict=True)  # Paso 7
    except Exception:
        nidx = _find_first_normative_idx_for_title(lines)

    # 2) Candidatos a Título: primeras N líneas + ventana alrededor de nidx
    cand_idxs: List[int] = list(range(0, min(len(lines), context_window)))
    if nidx >= 0:
        lo = max(0, nidx - 20)
        hi = min(len(lines), nidx + 21)
        cand_idxs.extend(range(lo, hi))
    cand_idxs = sorted(set(cand_idxs))

    candidates: List[Tuple[float, int, str]] = []
    for i in cand_idxs:
        ln = lines[i].strip()
        if not ln:
            continue
        if _is_heading_like(ln):
            continue
        if TITLE_HEAD_RE.match(ln):
            score = _score_title_candidate(ln, i, nidx)
            if score > 0:
                candidates.append((score, i, ln))
        else:
            # Acepta candidatos "fuertes" aunque no empiecen por palabra clave,
            # si están muy cerca del inicio normativo y son todo mayúsculas.
            if nidx >= 0 and abs(i - nidx) <= 5 and _uppercase_ratio(ln) >= 0.85 and 5 <= len(ln) <= 180:
                score = _score_title_candidate(ln, i, nidx) - 2.0  # pequeña penalización por no tener keyword
                candidates.append((score, i, ln))

    # 3) Elegir mejor Título
    title = ""
    if candidates:
        # Orden: score desc, cercanía a nidx asc, índice asc
        candidates.sort(key=lambda t: (-t[0], abs(t[1] - (nidx if nidx >= 0 else t[1])), t[1]))
        title = candidates[0][2].strip()

    # 4) Detección de Año_publicación
    #    Construimos un “mapa de pesos” por año, según contexto y cercanía.
    year_weights: Dict[int, float] = {}

    def _bump(y: int, w: float):
        if 1850 <= y <= 2100:
            year_weights[y] = year_weights.get(y, 0.0) + w

    # Ventanas para años: primeras 200 líneas y ±40 alrededor de nidx
    year_idxs: List[int] = list(range(0, min(len(lines), 200)))
    if nidx >= 0:
        lo = max(0, nidx - 40)
        hi = min(len(lines), nidx + 41)
        year_idxs.extend(range(lo, hi))
    year_idxs = sorted(set(year_idxs))

    for i in year_idxs:
        ln = lines[i]
        # Fechas completas — más confiables → +3 (y +2 extra si DOF/Periódico)
        for m in FECHA_COMPLETA_RE.finditer(ln):
            y = int(m.group("anio"))
            w = 3.0 + (2.0 if YEAR_CONTEXT_RE.search(ln) else 0.0)
            # Más peso si cerca del nidx (<= 20 líneas)
            if nidx >= 0 and abs(i - nidx) <= 20:
                w += 1.0
            _bump(y, w)
        # Años sueltos con contexto
        if YEAR_CONTEXT_RE.search(ln):
            for m in YEAR_RE.finditer(ln):
                y = int(m.group(0))
                w = 2.0
                if nidx >= 0 and abs(i - nidx) <= 20:
                    w += 1.0
                _bump(y, w)
        # Años sueltos sin contexto (peso bajo)
        else:
            for m in YEAR_RE.finditer(ln):
                y = int(m.group(0))
                _bump(y, 0.5)

    # Elegimos el año con mayor peso; si empata, el más reciente <= año actual.
    year: Optional[int] = None
    if year_weights:
        items = sorted(year_weights.items(), key=lambda kv: (-kv[1], -kv[0]))
        # No permitir “años futuros” (por si hay fechas de reforma 2099 etc.)
        current_year = datetime.now().year
        for y, _w in items:
            if y <= current_year:
                year = y
                break
        if year is None:  # si todos fueran > current_year (muy raro), toma el top
            year = items[0][0]

    # 5) Logs opcionales
    try:
        if log_base:
            if title:
                log_info(f"Título detectado: {title}", base=log_base)
            else:
                log_warn("No se detectó Título", base=log_base)
            if year:
                log_info(f"Año_publicación detectado: {year}", base=log_base)
            else:
                log_info("Sin Año_publicación inferible", base=log_base)
    except Exception:
        pass

    return title, year


# ──────────────────────────────────────────────────────────────────────────────
# PRUEBA RÁPIDA (puedes comentar una vez integrado)
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    demo = (
        "PODER EJECUTIVO DEL ESTADO\n"
        "Periódico Oficial, jueves 14 de marzo de 2013\n"
        "LEY DE ATENCIÓN A LAS PERSONAS\n"
        "CAPÍTULO I Disposiciones Generales\n"
        "Artículo 1. Del objeto\n"
        "T R A N S I T O R I O S\n"
        "Único. El presente Decreto entrará en vigor el 15 de marzo de 2013.\n"
    )
    t, y = guess_title_and_year(demo, context_window=40, log_base=None)
    print(f"[Paso 8] Título='{t}' | Año={y}")


[Paso 8] Título='LEY DE ATENCIÓN A LAS PERSONAS' | Año=2013


## Paso 9

In [10]:
# === Paso 9: Split de Artículos y extracción de Fracciones (robusto y documentado) ===
from typing import List, Dict, Tuple, Optional
import re

# ---------------------------------------------------------------------------
# 9.0 Ajuste importante: fracciones con romano NO vacío
#     - Reemplazamos el patrón de fracciones para exigir al menos un símbolo romano.
#     - Esto evita falsos positivos cuando una línea indentada se confundía con "fracción vacía".
# ---------------------------------------------------------------------------

# Requiere al menos un símbolo romano (lookahead) y una forma válida a continuación
_ROMAN_NZ = r"(?=[MDCLXVI])M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})"
# Delimitadores tras el romano (puntuación típica o espacios largos en “fracciones en línea”)
_FRAC_DELIM = r"(?:[\.\-–—:)\]]\s*|\s{2,})"

# OVERRIDE de FRACCION_RE (si ya existía de Paso 5, lo sustituimos aquí)
FRACCION_RE = re.compile(
    rf"^\s*(?:fracci[oó]n\s+)?(?P<roman>{_ROMAN_NZ})\s*{_FRAC_DELIM}",
    re.IGNORECASE
)

# ---------------------------------------------------------------------------
# 9.1 Partir la parte normativa en BLOQUES de artículo
#     - Detecta encabezados con ARTICLE_RE (Paso 5).
#     - Conserva cualquier texto que venga "pegado" al encabezado (después del punto o guion)
#       como parte del cuerpo del artículo.
#     - Devuelve una lista de dicts con número, posible sufijo (p.ej. 'Bis', 'A'), y texto.
# ---------------------------------------------------------------------------

def _extract_article_num_suffix(line: str) -> Optional[Tuple[int, Optional[str], str]]:
    """
    Extrae (numero, sufijo, trailing) del encabezado:
      - numero: int
      - sufijo: 'Bis'/'Ter'/'A'... o None
      - trailing: texto que viene en la MISMA línea después del encabezado (cuerpo inicial)
    """
    m = ARTICLE_RE.match(line)
    if not m:
        return None
    num = int(m.group("num"))
    suf = m.group("suf")
    if suf:
        suf = suf.strip()
        suf = suf[0].upper() + suf[1:].lower() if len(suf) > 1 else suf.upper()
    trailing = line[m.end():].strip()
    return num, suf, trailing

def split_articles_blocks(norma_text: str) -> List[Dict[str, object]]:
    """
    Divide un texto normativo (de un capítulo o sub-bloque) en artículos.
    Retorna lista de objetos:
      {
        "num": int,               # número de artículo
        "sufijo": str|None,       # 'Bis', 'A', 'Ter'..., si existe
        "header": str,            # línea de encabezado completa
        "texto": str              # cuerpo del artículo (sin el encabezado), listo para fracciones
      }
    """
    lines = norma_text.split("\n")
    out: List[Dict[str, object]] = []
    current: Dict[str, object] = {}
    body_lines: List[str] = []

    def _push_current():
        if current:
            current["texto"] = "\n".join(body_lines).strip()
            out.append(current.copy())

    for ln in lines:
        if ARTICLE_RE.match(ln):
            # Comienza un nuevo artículo → empuja el anterior si existía
            _push_current()
            body_lines = []

            num, suf, trailing = _extract_article_num_suffix(ln)
            current = {
                "num": num,
                "sufijo": suf,           # Útil para deduplicación fina (no va al JSON final)
                "header": ln.strip(),
                "texto": ""              # se rellena al cerrar
            }
            if trailing:
                body_lines.append(trailing)
        else:
            # Línea perteneciente al cuerpo del artículo actual (si hay)
            if current:
                body_lines.append(ln)

    _push_current()
    return out

# ---------------------------------------------------------------------------
# 9.2 Extraer FRACCIONES de un cuerpo de artículo
#     - Soporta fracciones en su propia línea: "I.", "II)", "III.-", "Fracción IV.—"
#     - También fracciones "en línea" (varias en una sola línea separadas por espacios largos).
#     - Devuelve (texto_sin_fracciones, lista_de_fracciones[{Fracción, Texto}]).
# ---------------------------------------------------------------------------

# Delimitador interno para "romper" fracciones en línea:
# Insertamos saltos antes de un posible encabezado de fracción cuando haya al menos
# dos espacios previos (caso típico en textos justificados).
_INLINE_FRAC_BREAK = re.compile(
    r"\s{2,}(?=(?:Fracci[oó]n\s+)?"
    + FRACCION_RE.pattern.replace(r"^\s*", "")  # reutilizamos patrón sin anclaje
    + ")", re.IGNORECASE
)

def _prebreak_inline_fractions(text: str) -> str:
    """
    Inserta saltos de línea antes de encabezados de fracción "en línea" para
    que FRACCION_RE pueda detectarlas por inicio de línea.
    """
    lines = text.split("\n")
    out_lines = []
    for ln in lines:
        # Ojo: no colapsamos todo; sólo prepartimos fracciones en línea
        ln2 = _INLINE_FRAC_BREAK.sub("\n", ln)
        out_lines.append(ln2)
    return "\n".join(out_lines)

def parse_fractions_from_body(article_body: str) -> Tuple[str, List[Dict[str, str]]]:
    """
    Dado el cuerpo de un artículo, devuelve:
      - main_text: el texto previo a la primera fracción (sin encabezados de fracción)
      - fracciones: lista de objetos {"Fracción": "I", "Texto": "..."} en orden de aparición
    Reglas:
      - Si no hay fracciones detectadas, todo el artículo se devuelve como main_text.
      - Si hay fracciones repetidas (p. ej., la 'I' separada por cortes), se fusionan.
      - Defensa extra: si por cualquier razón el regex devolviera un romano vacío (no debería),
        NO se trata como fracción.
    """
    if not article_body.strip():
        return "", []

    # 1) Pre-ruptura de fracciones en línea para facilitar detección por inicio de renglón
    prepped = _prebreak_inline_fractions(article_body)

    # 2) Recorremos líneas y construimos fracciones
    main_lines: List[str] = []
    fracciones: List[Dict[str, str]] = []
    curr_rom: Optional[str] = None
    curr_buf: List[str] = []
    seen_order: List[str] = []  # conserva orden de aparición para merges

    def _flush_curr():
        nonlocal curr_rom, curr_buf
        if curr_rom is not None:
            text = "\n".join(curr_buf).strip()
            if text:
                fracciones.append({"Fracción": curr_rom, "Texto": text})
            curr_rom, curr_buf = None, []

    for raw_ln in prepped.split("\n"):
        ln = raw_ln.rstrip()
        m = FRACCION_RE.match(ln)
        if m:
            # Defensa extra: romano no vacío (cinturón y tirantes)
            rom = m.group("roman")
            if not rom:
                # No tratar como fracción; sigue como parte del texto normal
                if curr_rom is None:
                    main_lines.append(ln)
                else:
                    curr_buf.append(ln)
                continue

            # Empieza una fracción nueva → empuja la anterior
            _flush_curr()
            rom = rom.upper()
            rest = ln[m.end():].strip()  # texto tras el encabezado en la misma línea
            curr_rom = rom
            curr_buf = [rest] if rest else []
            if rom not in seen_order:
                seen_order.append(rom)
            continue

        # No es encabezado de fracción
        if curr_rom is None:
            main_lines.append(ln)
        else:
            curr_buf.append(ln)

    _flush_curr()

    # 3) Fusionar fracciones con el mismo romano (si cortes del PDF las repiten)
    merged: Dict[str, List[str]] = {r: [] for r in seen_order}
    for fr in fracciones:
        merged[fr["Fracción"]].append(fr["Texto"])
    fracciones_ordenadas: List[Dict[str, str]] = [
        {"Fracción": r, "Texto": "\n\n".join(chunks).strip()} for r, chunks in merged.items()
    ]

    # 4) Texto principal antes de fracciones
    main_text = "\n".join(main_lines).strip()

    return main_text, fracciones_ordenadas

# ---------------------------------------------------------------------------
# 9.3 Función de conveniencia: procesa artículos completos
#     - Toma un bloque normativo, lo parte en artículos y separa fracciones.
#     - Devuelve lista de artículos listos para armar el JSON (sin capítulos).
# ---------------------------------------------------------------------------

def parse_articles_with_fractions(norma_text: str) -> List[Dict[str, object]]:
    """
    Procesa un bloque normativo y devuelve:
      [
        {"Artículo": int, "Texto": str, "Fracciones": [{"Fracción": str, "Texto": str}, ...]},
        ...
      ]
    Nota: el sufijo capturado en split_articles_blocks NO se incluye aquí;
          se usa después para deduplicación si fuera necesario.
    """
    arts = []
    for blk in split_articles_blocks(norma_text):
        main_text, frs = parse_fractions_from_body(blk["texto"])
        arts.append({
            "Artículo": int(blk["num"]),
            "Texto": main_text,
            "Fracciones": frs
        })
    return arts

# ---------------------------------------------------------------------------
# PRUEBAS RÁPIDAS (puedes comentar tras integrar)
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    demo = (
        "Artículo 1.- Disposiciones generales\n"
        "Este artículo tiene un párrafo inicial.\n"
        "I.- Primera fracción\n"
        "Texto adicional de la fracción I.\n"
        "II) Segunda fracción\n"
        "Artículo 2 Bis. De las definiciones\n"
        "Párrafo introductorio.\n"
        " I. Uno   II. Dos   III. Tres\n"
        "Artículo 3 A — Procedimiento\n"
        "Fracción IV.— Cuatro\n"
        "V — Cinco\n"
        "\n"
        # Caso problemático que resolvemos: no debe crear Fracción vacía
        "Corresponde al Presidente Municipal o a quien haga sus veces, la aplicación de las leyes...\n"
    )
    print("— split_articles_blocks —")
    blocks = split_articles_blocks(demo)
    for b in blocks:
        print({"num": b["num"], "sufijo": b["sufijo"], "header": b["header"]})
    print("\n— parse_articles_with_fractions —")
    parsed = parse_articles_with_fractions(demo)
    for a in parsed:
        print(a["Artículo"], "=>", len(a["Fracciones"]), "fracciones;", "Texto_len:", len(a["Texto"]))


— split_articles_blocks —
{'num': 1, 'sufijo': None, 'header': 'Artículo 1.- Disposiciones generales'}
{'num': 2, 'sufijo': 'Bis', 'header': 'Artículo 2 Bis. De las definiciones'}
{'num': 3, 'sufijo': 'A', 'header': 'Artículo 3 A — Procedimiento'}

— parse_articles_with_fractions —
1 => 2 fracciones; Texto_len: 65
2 => 3 fracciones; Texto_len: 42
3 => 2 fracciones; Texto_len: 13


In [11]:
# === Paso 10/11 (versión adaptada): NIVEL → Capítulos/Artículos/Fracciones ===
from typing import List, Dict, Optional, Tuple
import re

# Requisitos previos:
# - Paso 2: HIER_HEADER_RE (patrón robusto de NIVEL; NO incluye "Artículo")
# - Paso 4: clean_text_light
# - Paso 5: ARTICLE_RE, FRACCION_RE
# - Paso 9: split_articles_blocks, parse_fractions_from_body, parse_articles_with_fractions

# ──────────────────────────────────────────────────────────────────────────────
# 10) Split de la parte normativa en CHUNKS por encabezado de NIVEL
#     - SOLO considera LIBRO / TÍTULO / CAPÍTULO / SECCIÓN (con ordinal en romano, dígito o palabras)
#     - EXCLUYE "Artículo". Así evitamos cortar capítulos cada vez que aparece un artículo.
# ──────────────────────────────────────────────────────────────────────────────

def is_level_header(line: str) -> bool:
    """
    True si 'line' es encabezado formal de NIVEL:
      LIBRO / TÍTULO / CAPÍTULO / SECCIÓN + (romano|dígito|ordinal en palabras).
    - NO considera 'Artículo'.
    - Intenta usar HIER_HEADER_RE (Paso 2). Si no está definido, usa un fallback acento-insensible.
    """
    try:
        return bool(HIER_HEADER_RE.match(line))  # patrón robusto del Paso 2
    except NameError:
        # Fallback sin dependencia: normaliza acentos y matchea en ASCII
        from unicodedata import normalize, category
        s = ''.join(c for c in normalize("NFD", line) if category(c) != "Mn").lower().strip()
        return bool(re.match(
            r"^\s*(libro|titulo|capitulo|seccion)\s+"
            r"((?:[ivxlcdm]+)|(?:\d{1,3})|(?:unico|unica|primer\w+|segund\w+|tercer\w+|cuart\w+|quint\w+|sext\w+|septim\w+|octav\w+|noven\w+|decim\w+(?:\s+\w+)?|vigesim\w+(?:\s+\w+)?))\b",
            s
        ))

def split_norma_into_heading_chunks(norma_text: str) -> List[str]:
    """
    Divide 'norma_text' en chunks iniciados por encabezados de NIVEL (Libro/Título/Capítulo/Sección).
    - Limpia el texto con clean_text_light para neutralizar NBSP y espaciados raros.
    - Si no hay encabezados → devuelve [] (Paso 11 hará 'Capítulo Único' si corresponde).
    """
    if not norma_text or not norma_text.strip():
        return []

    text  = clean_text_light(norma_text)
    lines = text.split("\n")

    # 1) recolectar índices SOLO de encabezados de nivel (no "Artículo")
    header_idxs: List[int] = [i for i, ln in enumerate(lines) if is_level_header(ln)]

    if not header_idxs:
        return []

    # 2) cortar por rangos [header_i, header_{i+1})
    chunks: List[str] = []
    for j, start in enumerate(header_idxs):
        end = header_idxs[j + 1] if j + 1 < len(header_idxs) else len(lines)
        chunk = "\n".join(lines[start:end]).strip()
        if chunk:
            chunks.append(chunk)
    return chunks

def debug_find_headers(norma_text: str) -> List[Tuple[int, str]]:
    """
    Devuelve [(idx, 'línea encabezado NIVEL')] — EXCLUYE 'Artículo ...'.
    Útil para auditar qué detecta el splitter de capítulos.
    """
    txt = clean_text_light(norma_text or "")
    out = []
    for i, ln in enumerate(txt.split("\n")):
        if is_level_header(ln):
            out.append((i, ln.strip()))
    return out

def chapter_label_from_chunk(chunk: str) -> str:
    """
    La etiqueta de capítulo es EXACTAMENTE la primera línea no vacía del chunk.
    (Se conserva tal cual, con acentos y capitalización original.)
    """
    for ln in chunk.split("\n"):
        if ln.strip():
            return ln.strip()
    return ""


# ──────────────────────────────────────────────────────────────────────────────
# 11) Parser de capítulos → artículos → fracciones (con deduplicación interna)
#     - Deduplicación por (número, sufijo) dentro del MISMO chunk (evita cortes del PDF).
#     - Mantiene el orden de aparición (primer visto, primer servido).
#     - Si NO hay encabezados en 'norma', cae a "Capítulo Único".
# ──────────────────────────────────────────────────────────────────────────────

def _merge_articles_keep_order(blocks: List[Dict[str, object]]) -> List[Dict[str, object]]:
    """
    Fusiona bloques de artículos que comparten la misma clave (número, sufijo) DENTRO del chunk.
    - Concatena los textos con una línea en blanco.
    - Conserva el primer 'header' visto.
    - Devuelve lista en el orden de primera aparición de la clave.
    Espera bloques con llaves: {"num": int, "sufijo": str|None, "header": str, "texto": str}
    """
    order: List[Tuple[int, Optional[str]]] = []
    grouped: Dict[Tuple[int, Optional[str]], Dict[str, object]] = {}

    for blk in blocks:
        key = (int(blk["num"]), blk.get("sufijo"))
        if key not in grouped:
            grouped[key] = {"num": key[0], "sufijo": key[1], "header": blk["header"], "texto": blk["texto"]}
            order.append(key)
        else:
            prev = grouped[key]["texto"]
            newt = blk["texto"]
            if prev and newt:
                grouped[key]["texto"] = prev.rstrip() + "\n\n" + newt.lstrip()
            elif newt:
                grouped[key]["texto"] = newt
            # 'header' se mantiene (el primero). Si quisieras el más largo, podrías compararlos aquí.

    return [grouped[k] for k in order]

# === Paso 10/11: mejoras de filtrado y auditoría ===
from typing import List, Dict, Optional, Tuple
import re

def _parse_chapter_chunk(chunk: str) -> Dict[str, object]:
    """
    (sin cambios de estructura) Convierte un chunk en:
      {"Capítulo": <label_exacta>, "Artículos": [...]}
    """
    label = chapter_label_from_chunk(chunk)
    body  = "\n".join(chunk.split("\n")[1:])
    blocks = split_articles_blocks(body)
    if not blocks:
        return {"Capítulo": label, "Artículos": []}

    blocks = _merge_articles_keep_order(blocks)
    articulos: List[Dict[str, object]] = []
    for blk in blocks:
        main_text, frs = parse_fractions_from_body(blk["texto"])
        articulos.append({"Artículo": int(blk["num"]), "Texto": main_text, "Fracciones": frs})
    return {"Capítulo": label, "Artículos": articulos}

def list_empty_heading_labels(norma_text: str) -> List[str]:
    """
    Devuelve las ETIQUETAS de encabezados de NIVEL cuyo chunk no contiene artículos.
    Útil para QA antes de aplicar el filtrado.
    """
    empties: List[str] = []
    for ch in split_norma_into_heading_chunks(norma_text or ""):
        parsed = _parse_chapter_chunk(ch)
        if len(parsed["Artículos"]) == 0:
            empties.append(parsed["Capítulo"])
    return empties

def parse_normative_to_chapters(norma_text: str, *, drop_empty_chapters: bool = True) -> List[Dict[str, object]]:
    """
    Entrada: parte normativa (sin 'Decreto' ni 'Transitorios').
    Salida: capítulos con artículos y fracciones.

    - Si NO hay encabezados → 'Capítulo Único' con todos los artículos.
    - Si hay encabezados:
        * Por defecto DESCARTA los que no tengan artículos (drop_empty_chapters=True).
        * Mantiene TÍTULOS/SECCIONES/… que SÍ tengan artículos.
    """
    norma_text = norma_text or ""
    chunks = split_norma_into_heading_chunks(norma_text)

    # Caso: sin encabezados → Capítulo Único (si hay artículos)
    if not chunks:
        arts = parse_articles_with_fractions(norma_text)
        return [{"Capítulo": "Capítulo Único", "Artículos": arts}] if arts else []

    chapters: List[Dict[str, object]] = []
    for ch in chunks:
        parsed = _parse_chapter_chunk(ch)
        if drop_empty_chapters and len(parsed["Artículos"]) == 0:
            # descarta encabezados de NIVEL sin artículos
            continue
        chapters.append(parsed)

    # Si, tras descartar vacíos, no quedó nada, cae a Capítulo Único
    if not chapters:
        arts = parse_articles_with_fractions(norma_text)
        return [{"Capítulo": "Capítulo Único", "Artículos": arts}] if arts else []

    return chapters



# ──────────────────────────────────────────────────────────────────────────────
# PRUEBA RÁPIDA (puedes comentar una vez integrado)
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    demo = (
        "CAPÍTULO IV De los Efectos de la Declaración de Ausencia\n"
        "Artículo 1. Del objeto\n"
        "I.- Primera fracción\n"
        "II) Segunda fracción\n"
        "Sección décima segunda Del servicio y atención al público\n"
        "Artículo 2 Bis. De las definiciones\n"
        " I. Uno   II. Dos   III. Tres\n"
        "TÍTULO IX Disposiciones finales\n"
        "Artículo 3 A — Procedimiento\n"
        "Fracción IV.— Cuatro\n"
        "V — Cinco\n"
    )

    print("Encabezados de NIVEL detectados:", debug_find_headers(demo))
    chapters = parse_normative_to_chapters(demo)
    print("[Paso 10/11] Capítulos detectados:", len(chapters))
    for c in chapters:
        print(" •", c["Capítulo"], "→", len(c["Artículos"]), "artículos")


Encabezados de NIVEL detectados: [(0, 'CAPÍTULO IV De los Efectos de la Declaración de Ausencia'), (4, 'Sección décima segunda Del servicio y atención al público'), (7, 'TÍTULO IX Disposiciones finales')]
[Paso 10/11] Capítulos detectados: 3
 • CAPÍTULO IV De los Efectos de la Declaración de Ausencia → 1 artículos
 • Sección décima segunda Del servicio y atención al público → 1 artículos
 • TÍTULO IX Disposiciones finales → 1 artículos


In [12]:
# === Paso 12: Parser de Transitorios (Capítulo Único → Artículos con ordinal) ===
from typing import List, Dict, Tuple, Optional
import re

# -----------------------------------------------------------------------------
# 12.1 Remover encabezados "TRANSITORIOS" (y variantes) del bloque transitorio
# -----------------------------------------------------------------------------

def strip_trans_header(trans_text: str) -> str:
    """
    Quita líneas iniciales que sean encabezados de 'Transitorios' (incluye variantes).
    Conserva todo lo demás intacto.
    """
    if not trans_text or not trans_text.strip():
        return ""
    lines = clean_text_light(trans_text).split("\n")
    i = 0
    # Elimina todas las primeras líneas que macheen TRANS_RE (y blancos siguientes)
    while i < len(lines):
        ln = lines[i]
        if is_trans_header(ln):
            i += 1
            # salta blancos consecutivos
            while i < len(lines) and not lines[i].strip():
                i += 1
            # si otro encabezado transitorios aparece inmediatamente después, sigue
            continue
        break
    return "\n".join(lines[i:]).strip()

# -----------------------------------------------------------------------------
# 12.2 Partir por ordinales textuales (con/sin prefijo "Artículo")
#      - Usa ORD_WITH_PREFIX_RE del Paso 5 (o extract_transitory_ordinal).
#      - Captura el texto que quede en la MISMA línea tras el ordinal como parte del cuerpo.
# -----------------------------------------------------------------------------

# Puntuación/ruido opcional justo después del ordinal (., -, —, :, ), ] ...)
_TRAIL_PUNCT = re.compile(r"^[\s\.\-–—:)\]]+\s*")

def _match_trans_header(line: str) -> Optional[Tuple[str, int]]:
    """
    Intenta reconocer un encabezado de transitorio en 'line'.
    Devuelve (ordinal_normalizado, end_pos_del_match) o None.
    """
    m = ORD_WITH_PREFIX_RE.match(line) if 'ORD_WITH_PREFIX_RE' in globals() else None
    if not m:
        # fallback con extract_transitory_ordinal + re-match parcial
        ord_norm = extract_transitory_ordinal(line) if 'extract_transitory_ordinal' in globals() else None
        if not ord_norm:
            return None
        # Re-halla posición aproximada consumiendo la misma superficie con un regex simple:
        # (esto es best-effort y suficiente para obtener trailing)
        m2 = re.match(r"^\s*(?:art[íi]?culo\s+)?" + re.escape(ord_norm.split()[0]), line, re.IGNORECASE)
        end_pos = m2.end() if m2 else 0
        return ord_norm, end_pos
    # Normaliza el ordinal con la función de Paso 5 (capitaliza y respeta acentos)
    ord_norm = extract_transitory_ordinal(line) if 'extract_transitory_ordinal' in globals() else m.group("ord").title()
    return ord_norm, m.end()

def split_transitorios_into_blocks(trans_text: str) -> List[Dict[str, str]]:
    """
    Devuelve bloques de transitorios:
      [
        {"ordinal": "Único"|"Primero"|..., "header": <línea encabezado cruda>, "texto": <cuerpo>},
        ...
      ]
    Ignora cualquier preámbulo entre el encabezado 'TRANSITORIOS' y el primer ordinal.
    """
    body = strip_trans_header(trans_text)
    if not body:
        return []

    lines = body.split("\n")
    out: List[Dict[str, str]] = []
    curr: Dict[str, str] = {}
    buf: List[str] = []

    def _push():
        if curr:
            curr["texto"] = "\n".join(buf).strip()
            out.append(curr.copy())

    for ln in lines:
        m = _match_trans_header(ln)
        if m:
            # nuevo transitorio → empuja el previo
            _push()
            buf = []
            ord_norm, end_pos = m
            trailing = _TRAIL_PUNCT.sub("", ln[end_pos:].strip())
            curr = {"ordinal": ord_norm, "header": ln.strip(), "texto": ""}
            if trailing:
                buf.append(trailing)
        else:
            if curr:
                buf.append(ln)
            else:
                # Texto pre-ordinal: suele ser ruido; lo ignoramos.
                # Si prefieres conservarlo, podrías guardarlo y anexarlo al primer transitorio.
                pass

    _push()
    return out

# -----------------------------------------------------------------------------
# 12.3 Fusión de duplicados (mismo ordinal) y extracción de fracciones
# -----------------------------------------------------------------------------

def _merge_transitory_blocks(blocks: List[Dict[str, str]]) -> List[Dict[str, str]]:
    """
    Funde bloques que comparten el mismo 'ordinal' (por cortes de PDF o reimpresiones).
    - Concatena cuerpos con salto en blanco.
    - Conserva el primer header visto.
    """
    order: List[str] = []
    grouped: Dict[str, Dict[str, str]] = {}
    for blk in blocks:
        k = blk["ordinal"]
        if k not in grouped:
            grouped[k] = {"ordinal": k, "header": blk["header"], "texto": blk["texto"]}
            order.append(k)
        else:
            prev = grouped[k]["texto"]
            newt = blk["texto"]
            if prev and newt:
                grouped[k]["texto"] = prev.rstrip() + "\n\n" + newt.lstrip()
            elif newt:
                grouped[k]["texto"] = newt
    return [grouped[k] for k in order]

def parse_transitorios(trans_text: str) -> List[Dict[str, object]]:
    """
    Parser principal de Transitorios.
    Devuelve SIEMPRE una lista con UN capítulo (por convención):
      [
        {
          "Capítulo": "Capítulo Único",
          "Artículos": [
            {"Artículo": "Único"|"Primero"|..., "Texto": str, "Fracciones": [...]},
            ...
          ]
        }
      ]
    Si no se detecta ningún ordinal, devuelve [].
    """
    if not trans_text or not trans_text.strip():
        return []

    # 1) Partición por ordinales
    blocks = split_transitorios_into_blocks(trans_text)
    if not blocks:
        return []

    # 2) Fusión de duplicados
    blocks = _merge_transitory_blocks(blocks)

    # 3) Extracción de fracciones (si hubiera dentro del transitorio)
    articulos: List[Dict[str, object]] = []
    for blk in blocks:
        main_text, frs = parse_fractions_from_body(blk["texto"])
        articulos.append({
            "Artículo": blk["ordinal"],
            "Texto": main_text,
            "Fracciones": frs
        })

    return [{"Capítulo": "Capítulo Único", "Artículos": articulos}]

# -----------------------------------------------------------------------------
# 12.4 Pruebas rápidas (puedes comentar al integrar)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    demo_trans = (
        "T R A N S I T O R I O S\n"
        "Artículo Único. El presente Decreto entrará en vigor al día siguiente.\n"
        "I.- El Ejecutivo emitirá el reglamento en 90 días.\n"
        "II) Se derogan disposiciones que se opongan.\n"
        "\n"
        "Artículo Primero: Durante el primer año...\n"
        "I. Primera medida   II. Segunda medida   III. Tercera medida\n"
    )
    print("— strip_trans_header —")
    print(strip_trans_header(demo_trans), "\n")
    print("— split_transitorios_into_blocks —")
    blks = split_transitorios_into_blocks(demo_trans)
    for b in blks:
        print(b["ordinal"], "=> header:", b["header"])
    print("\n— parse_transitorios —")
    parsed = parse_transitorios(demo_trans)
    for ch in parsed:
        print(ch["Capítulo"], "→", len(ch["Artículos"]), "artículos")
        for a in ch["Artículos"]:
            print(" •", a["Artículo"], "| fracciones:", len(a["Fracciones"]))


— strip_trans_header —
Artículo Único. El presente Decreto entrará en vigor al día siguiente.
I.- El Ejecutivo emitirá el reglamento en 90 días.
II) Se derogan disposiciones que se opongan.

Artículo Primero: Durante el primer año...
I. Primera medida II. Segunda medida III. Tercera medida 

— split_transitorios_into_blocks —
Único => header: Artículo Único. El presente Decreto entrará en vigor al día siguiente.
Primero => header: Artículo Primero: Durante el primer año...

— parse_transitorios —
Capítulo Único → 2 artículos
 • Único | fracciones: 2
 • Primero | fracciones: 1


In [13]:
# === Paso 13: Ensamble del JSON final (Decreto/Título/Año + Capítulos + Transitorios) ===
from typing import Dict, Any, Optional, Tuple
import json

def assemble_doc_from_text(
    clean_text: str,
    *,
    file_base: Optional[str] = None,
    prefer_strict_norm_start: bool = True,
    context_window_title: int = 60,
    drop_empty_chapters: bool = True
) -> Dict[str, Any]:
    """
    Orquesta todo el pipeline para un documento:
      1) Split en decreto/norma/trans (Paso 7)
      2) Título y Año_publicación (Paso 8)
      3) Capítulos/Artículos/Fracciones de la parte normativa (Paso 10/11)
      4) Transitorios (Paso 12)
      5) Regresa el objeto JSON listo para validación/escritura

    Retorna:
      {
        "Decreto": str,
        "Año_publicación": int|None,
        "Título": str,
        "Capítulos": [ { "Capítulo": str, "Artículos": [ ... ] }, ... ],
        "Transitorios": [ { "Capítulo": "Capítulo Único", "Artículos": [ ... ] } ] | []
      }
    """
    base = file_base or None
    try:
        if base:
            log_start(base, "Assemble: inicio")
    except Exception:
        pass

    # 1) Partición en secciones principales
    with log_step("Split decreto/norma/trans", base=base) if 'log_step' in globals() else nullcontext():
        split = split_decreto_norma_trans(clean_text, prefer_strict=prefer_strict_norm_start, log_base=base)
        decreto_txt = split["decreto"]
        norma_txt   = split["norma"]
        trans_txt   = split["trans"]

    # 2) Título y año
    with log_step("Detección de Título y Año", base=base) if 'log_step' in globals() else nullcontext():
        titulo, anio = guess_title_and_year(clean_text, context_window=context_window_title, log_base=base)

    # 3) Parte normativa → capítulos
    with log_step("Parseo de capítulos/ artículos", base=base) if 'log_step' in globals() else nullcontext():
        capitulos = parse_normative_to_chapters(norma_txt, drop_empty_chapters=drop_empty_chapters)

    # 4) Transitorios
    with log_step("Parseo de transitorios", base=base) if 'log_step' in globals() else nullcontext():
        transitorios = parse_transitorios(trans_txt) if trans_txt.strip() else []

    # 5) Ensamble
    final_doc = {
        "Decreto": decreto_txt or "",
        "Año_publicación": anio if isinstance(anio, int) else None,
        "Título": titulo or "",
        "Capítulos": capitulos or [],
        "Transitorios": transitorios or [],
    }

    try:
        if base:
            # Diagnóstico rápido
            tot_perm = sum(len(c["Artículos"]) for c in final_doc["Capítulos"])
            tot_tran = sum(len(c["Artículos"]) for c in final_doc["Transitorios"]) if final_doc["Transitorios"] else 0
            log_info(f"Assemble OK: {tot_perm} artículos permanentes, {tot_tran} transitorios", base=base)
    except Exception:
        pass

    return final_doc


# Utilidad de escritura (con envoltura raíz "<base>.JSON")
from pathlib import Path

def write_json_output(doc: Dict[str, Any], base_id: str, wrap_root: bool = True) -> Path:
    """
    Escribe el JSON final en JSON_DIR / "<base>.json".
    - Si wrap_root=True, envuelve como { "<base>.JSON": <doc> }, siguiendo tu convención.
    Regresa la ruta escrita.
    """
    JSON_DIR.mkdir(parents=True, exist_ok=True)
    payload = {f"{base_id}.JSON": doc} if wrap_root else doc
    out_path = JSON_DIR / f"{base_id}.json"
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, ensure_ascii=False)
    try:
        log_success(base_id, f"JSON escrito → {out_path.name}")
    except Exception:
        print(f"[assemble] JSON escrito → {out_path}")
    return out_path


# nullcontext para usar 'with' aunque no tengas el logger cargado
from contextlib import contextmanager
@contextmanager
def nullcontext():
    yield


# ───────────────────────────
# Smoke test (puedes comentar)
# ───────────────────────────
if __name__ == "__main__":
    demo = (
        "PODER EJECUTIVO DEL ESTADO\n"
        "Periódico Oficial, jueves 14 de marzo de 2013\n"
        "CAPÍTULO IV De los Efectos de la Declaración de Ausencia\n"
        "Artículo 1. Del objeto\n"
        "I.- Primera fracción\n"
        "II) Segunda fracción\n"
        "Sección décima segunda Del servicio y atención al público\n"
        "Artículo 2 Bis. De las definiciones\n"
        " I. Uno   II. Dos   III. Tres\n"
        "TÍTULO IX Disposiciones finales\n"
        "Artículo 3 A — Procedimiento\n"
        "Fracción IV.— Cuatro\n"
        "V — Cinco\n"
        "T R A N S I T O R I O S\n"
        "Artículo Único. Entrará en vigor al día siguiente.\n"
    )
    base_id = "demo"
    assembled = assemble_doc_from_text(demo, file_base=base_id)
    print("[Paso 13] Keys:", list(assembled.keys()))
    print("  Capítulos:", len(assembled["Capítulos"]), " | Transitorios:", len(assembled["Transitorios"]))


[2025-09-08 09:21:12] INFO [demo] split: decreto=2 líneas, norma=11 líneas, trans=2 líneas
[2025-09-08 09:21:12] INFO [demo] Título detectado: PODER EJECUTIVO DEL ESTADO
[2025-09-08 09:21:12] INFO [demo] Año_publicación detectado: 2013
[2025-09-08 09:21:12] INFO [demo] Assemble OK: 3 artículos permanentes, 1 transitorios
[Paso 13] Keys: ['Decreto', 'Año_publicación', 'Título', 'Capítulos', 'Transitorios']
  Capítulos: 3  | Transitorios: 1


In [14]:
# === Paso 14: Validación del esquema + diagnósticos + guardado ===
from typing import List
from pydantic import BaseModel, Field, ValidationError
from pathlib import Path
import json

# ──────────────────────────────────────────────────────────────────────────────
# Esquema con Pydantic (igual al de tu parse_law.py)
# ──────────────────────────────────────────────────────────────────────────────
class Fraccion(BaseModel):
    Fracción: str
    Texto: str

class ArticuloPermanente(BaseModel):
    Artículo: int
    Texto: str
    Fracciones: List[Fraccion] = Field(default_factory=list)

class ArticuloTransitorio(BaseModel):
    Artículo: str   # "Único", "Primero", "Segundo", ...
    Texto: str
    Fracciones: List[Fraccion] = Field(default_factory=list)

class CapituloPermanente(BaseModel):
    Capítulo: str
    Artículos: List[ArticuloPermanente] = Field(default_factory=list)

class CapituloTransitorio(BaseModel):
    Capítulo: str
    Artículos: List[ArticuloTransitorio] = Field(default_factory=list)

class LeyDoc(BaseModel):
    Decreto: str
    Año_publicación: Optional[int] = None
    Título: str
    Capítulos: List[CapituloPermanente] = Field(default_factory=list)
    Transitorios: List[CapituloTransitorio] = Field(default_factory=list)


# ──────────────────────────────────────────────────────────────────────────────
# Validación + diagnósticos + escritura
# ──────────────────────────────────────────────────────────────────────────────
# === Paso 14: utilidades y diagnóstico enriquecido ===
def _heading_kind(label: str) -> str:
    """
    Clasifica la etiqueta: 'LIBRO' | 'TÍTULO' | 'CAPÍTULO' | 'SECCIÓN' | 'OTRO'
    (best-effort, acento-insensible)
    """
    try:
        s = norm_lower(label)
    except Exception:
        from unicodedata import normalize, category
        s = ''.join(c for c in normalize("NFD", label) if category(c) != "Mn").lower()
    if re.match(r"^\s*libro\b", s): return "LIBRO"
    if re.match(r"^\s*t[ií]tulo\b", s): return "TÍTULO"
    if re.match(r"^\s*cap[ií]tulo\b", s): return "CAPÍTULO"
    if re.match(r"^\s*secc?i[óo]n\b", s): return "SECCIÓN"
    return "OTRO"

def summarize_diagnostics(doc: Dict[str, Any]) -> Dict[str, Any]:
    """Produce métricas útiles para QA y monitoreo (enriquecidas)."""
    chapters = doc.get("Capítulos", []) or []
    trans    = doc.get("Transitorios", []) or []

    n_chapters = len(chapters)
    n_perm_articles = sum(len(c.get("Artículos", [])) for c in chapters)
    n_perm_fracs = sum(sum(len(a.get("Fracciones", [])) for a in c.get("Artículos", [])) for c in chapters)

    n_trans_caps = len(trans)
    n_trans_articles = sum(len(c.get("Artículos", [])) for c in trans)
    n_trans_fracs = sum(sum(len(a.get("Fracciones", [])) for a in c.get("Artículos", [])) for c in trans)

    chapter_labels = [c.get("Capítulo", "") for c in chapters]
    kinds = [_heading_kind(lbl) for lbl in chapter_labels]
    kind_counts: Dict[str, int] = {}
    for k in kinds:
        kind_counts[k] = kind_counts.get(k, 0) + 1

    return {
        "n_chapters": n_chapters,
        "n_perm_articles": n_perm_articles,
        "n_perm_fracciones": n_perm_fracs,
        "n_trans_caps": n_trans_caps,
        "n_trans_articles": n_trans_articles,
        "n_trans_fracciones": n_trans_fracs,
        "chapter_labels": chapter_labels,
        "chapter_kind_breakdown": kind_counts,   # ← desglose por tipo
    }


def write_diagnostics(base_id: str, diag: Dict[str, Any]) -> Path:
    """Guarda diagnósticos en ERRORES_DIR/<base>_diag.json (aunque no haya errores)."""
    ERRORES_DIR.mkdir(parents=True, exist_ok=True)
    path = ERRORES_DIR / f"{base_id}_diag.json"
    with open(path, "w", encoding="utf-8") as f:
        json.dump(diag, f, indent=2, ensure_ascii=False)
    return path

def write_validation_error(base_id: str, err: str, payload: Optional[Dict[str, Any]] = None) -> None:
    """Guarda un archivo de error y opcionalmente el payload bruto para inspección."""
    ERRORES_DIR.mkdir(parents=True, exist_ok=True)
    (ERRORES_DIR / f"{base_id}_validation_error.txt").write_text(err, encoding="utf-8")
    if payload is not None:
        with open(ERRORES_DIR / f"{base_id}_invalid_payload.json", "w", encoding="utf-8") as f:
            json.dump(payload, f, indent=2, ensure_ascii=False)

def validate_and_save(doc: Dict[str, Any], base_id: str, wrap_root: bool = True) -> Optional[Path]:
    """
    Valida contra LeyDoc (Pydantic) y reglas adicionales:
      - Debe haber al menos UN artículo permanente (Capítulos[].Artículos[])
      - Título puede estar vacío (se loguea), Transitorios pueden ser [].
    Si es válido, escribe JSON; si no, guarda error y devuelve None.
    """
    try:
        # Validación estructural
        ley = LeyDoc.model_validate(doc)
    except ValidationError as e:
        err = f"Validación Pydantic falló: {e}"
        try: log_error(base_id, err)
        except Exception: print("[validate] ERROR:", err)
        write_validation_error(base_id, str(e), payload=doc)
        return None

    # Reglas de contenido mínimas
    total_perm = sum(len(c.Artículos) for c in ley.Capítulos)
    if total_perm == 0:
        msg = "Regla: sin artículos permanentes (Capítulos[].Artículos[] = 0)"
        try: log_error(base_id, msg)
        except Exception: print("[validate] ERROR:", msg)
        write_validation_error(base_id, msg, payload=doc)
        return None

    # Avisos no fatales (log INFO/WARN)
    if not ley.Título.strip():
        try: log_warn("Título vacío", base=base_id)
        except Exception: pass
    if ley.Año_publicación is not None and not (1850 <= ley.Año_publicación <= datetime.now().year):
        try: log_warn(f"Año_publicación fuera de rango: {ley.Año_publicación}", base=base_id)
        except Exception: pass

    # Diagnósticos
    diag = summarize_diagnostics(doc)
    try:
        log_info(f"Diag: {diag['n_chapters']} capítulos, {diag['n_perm_articles']} artículos permanentes", base=base_id)
    except Exception:
        pass
    write_diagnostics(base_id, diag)

    # Escritura final
    path = write_json_output(ley.model_dump(), base_id, wrap_root=wrap_root)
    return path


# ──────────────────────────────────────────────────────────────────────────────
# Conveniencia: procesa un archivo clean_*.txt de inicio a fin (lee → arma → valida → guarda)
# ──────────────────────────────────────────────────────────────────────────────
def process_clean_file(path: Path, wrap_root: bool = True) -> Optional[Path]:
    """
    Pipeline completo para un archivo:
      - Lee con load_text (Paso 6)
      - assemble_doc_from_text (Paso 13)
      - validate_and_save (Paso 14)
    Devuelve la ruta del JSON si todo va bien; si no, None.
    """
    base = extract_file_base(path)
    log_start(base, f"Procesando {path.name}")
    try:
        txt, enc = load_text(path)
        ok, why = validate_clean_text(txt)
        if not ok:
            raise ValueError(f"Entrada inválida: {why}")
        doc = assemble_doc_from_text(txt, file_base=base)
        out = validate_and_save(doc, base, wrap_root=wrap_root)
        if out:
            log_success(base, f"OK -> {out.name}")
        return out
    except Exception as e:
        log_exception(base, e, msg_prefix="process_clean_file falló")
        return None


# ───────────────────────────
# Smoke test (puedes comentar)
# ───────────────────────────
if __name__ == "__main__":
    # Intento con alguno de tus archivos si existen en /mnt/data o CLEAN_DIR
    candidates = resolve_input("clean_*.txt")
    if candidates:
        print(f"[Paso 14] Probar con: {candidates[0].name}")
        process_clean_file(candidates[0])
    else:
        print("[Paso 14] No se encontraron clean_*.txt para prueba rápida.")


[2025-09-08 09:21:12] INFO resolve_input: 301 archivos encontrados para patrón='clean_*.txt'
[Paso 14] Probar con: clean_0001.txt
[2025-09-08 09:21:12] INFO [0001] Leído con encoding=utf-8
[2025-09-08 09:21:12] INFO [0001] split: decreto=4 líneas, norma=1035 líneas, trans=437 líneas
[2025-09-08 09:21:12] INFO [0001] Título detectado: CONSTITUCIÓN POLÍTICA DEL ESTADO DE JALISCO
[2025-09-08 09:21:12] INFO [0001] Año_publicación detectado: 1994
[2025-09-08 09:21:12] INFO [0001] Assemble OK: 127 artículos permanentes, 16 transitorios
[2025-09-08 09:21:12] INFO [0001] Diag: 30 capítulos, 127 artículos permanentes
[2025-09-08 09:21:12] SUCCESS [0001] SUCCESS JSON escrito → 0001.json
[2025-09-08 09:21:12] SUCCESS [0001] SUCCESS OK -> 0001.json


In [15]:
# === Paso 15: Procesamiento por lotes + manifest (JSONL/CSV) ===
from typing import List, Dict, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import json, time, traceback

# Opcional: pandas para CSV
try:
    import pandas as pd
    _HAS_PANDAS = True
except Exception:
    _HAS_PANDAS = False

MANIFEST_JSONL = OUTPUT_DIR / "manifest.jsonl"
MANIFEST_CSV   = OUTPUT_DIR / "manifest.csv"

def _json_already_exists(base: str) -> bool:
    return (JSON_DIR / f"{base}.json").exists()

def _safe_log(level: str, base: Optional[str], msg: str):
    try:
        if level == "err" and 'log_error' in globals(): log_error(base or "", msg)
        elif level == "warn" and 'log_warn' in globals(): log_warn(msg, base=base or "")
        elif level == "info" and 'log_info' in globals(): log_info(base, msg) if callable(log_info) else log_info(base, msg)
        else: print(f"[{level.upper()}] {base or ''} {msg}")
    except Exception:
        print(f"[{level.upper()}] {base or ''} {msg}")

def _process_one_file(path: Path, skip_existing: bool = True) -> Dict[str, Any]:
    base = extract_file_base(path)
    t0 = time.time()
    try:
        if skip_existing and _json_already_exists(base):
            # Relee diagnósticos si existen, o calcula rápido desde el JSON
            diag_path = ERRORES_DIR / f"{base}_diag.json"
            diag = {}
            if diag_path.exists():
                diag = json.loads(diag_path.read_text(encoding="utf-8"))
            else:
                # reconstruye métricas desde JSON ya escrito
                payload = json.loads((JSON_DIR / f"{base}.json").read_text(encoding="utf-8"))
                doc = payload.get(f"{base}.JSON") or payload
                diag = summarize_diagnostics(doc)
            return {
                "base": base, "file": str(path), "status": "skipped",
                "json_path": str(JSON_DIR / f"{base}.json"),
                "diag": diag, "dt": time.time()-t0
            }

        out_path = process_clean_file(path)  # usa Paso 6 + 13 + 14
        status = "ok" if out_path else "error"
        diag_path = ERRORES_DIR / f"{base}_diag.json"
        diag = {}
        if diag_path.exists():
            diag = json.loads(diag_path.read_text(encoding="utf-8"))
        return {
            "base": base, "file": str(path), "status": status,
            "json_path": str(out_path) if out_path else None,
            "diag": diag, "dt": time.time()-t0
        }
    except Exception as e:
        _safe_log("err", base, f"batch fallo: {e}")
        return {
            "base": base, "file": str(path), "status": "error",
            "json_path": None, "diag": {"error": str(e), "trace": traceback.format_exc()},
            "dt": time.time()-t0
        }

def batch_process_clean(
    pattern: str = "clean_*.txt",
    max_workers: int = 4,
    skip_existing: bool = True
) -> List[Dict[str, Any]]:
    """
    Ejecuta el pipeline sobre todos los archivos que coincidan con 'pattern'
    encontrados por resolve_input(). Genera OUTPUT_DIR/manifest.jsonl (+CSV si hay pandas).
    """
    files = resolve_input(pattern)
    if not files:
        print(f"[Paso 15] No se encontraron archivos con patrón: {pattern}")
        return []

    print(f"[Paso 15] Procesando {len(files)} archivos (skip_existing={skip_existing}, workers={max_workers})")
    results: List[Dict[str, Any]] = []

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = {ex.submit(_process_one_file, p, skip_existing): p for p in files}
        for fut in as_completed(futs):
            res = fut.result()
            results.append(res)
            base = res["base"]
            print(f"  - {base}: {res['status']} ({res['dt']:.2f}s)")

    # Escribir manifest.jsonl
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    with open(MANIFEST_JSONL, "w", encoding="utf-8") as f:
        for r in results:
            row = {
                "base": r["base"],
                "file": r["file"],
                "status": r["status"],
                "json_path": r["json_path"],
                **{f"diag_{k}": v for k, v in (r.get("diag") or {}).items()}
            }
            f.write(json.dumps(row, ensure_ascii=False) + "\n")

    # Opcional: manifest.csv si hay pandas
    if _HAS_PANDAS:
        df = pd.DataFrame([
            {
                "base": r["base"],
                "file": r["file"],
                "status": r["status"],
                "json_path": r["json_path"],
                **(r.get("diag") or {})
            } for r in results
        ])
        df.to_csv(MANIFEST_CSV, index=False, encoding="utf-8")
        print(f"✓ manifest.jsonl → {MANIFEST_JSONL.name} | CSV → {MANIFEST_CSV.name}")
    else:
        print(f"✓ manifest.jsonl → {MANIFEST_JSONL.name} (pandas no disponible para CSV)")

    # Resumen
    n_ok = sum(1 for r in results if r["status"] == "ok")
    n_skip = sum(1 for r in results if r["status"] == "skipped")
    n_err = sum(1 for r in results if r["status"] == "error")
    print(f"[Resumen] ok={n_ok}  skipped={n_skip}  error={n_err}")

    return results

# ───────────────────────────
# Smoke test (puedes comentar)
# ───────────────────────────
if __name__ == "__main__":
    # Corre lote en /mnt/data y CLEAN_DIR con 4 hilos
    batch_process_clean("clean_*.txt", max_workers=4, skip_existing=True)


[2025-09-08 09:21:13] INFO resolve_input: 301 archivos encontrados para patrón='clean_*.txt'
[Paso 15] Procesando 301 archivos (skip_existing=True, workers=4)
[2025-09-08 09:21:13] INFO [0002] Leído con encoding=utf-8
  - 0001: skipped (0.01s)
[2025-09-08 09:21:13] INFO [0003] Leído con encoding=utf-8
[2025-09-08 09:21:13] INFO [0003] split: decreto=4 líneas, norma=598 líneas, trans=49 líneas
[2025-09-08 09:21:13] INFO [0002] split: decreto=6 líneas, norma=5663 líneas, trans=349 líneas
[2025-09-08 09:21:13] INFO [0004] Leído con encoding=utf-8
[2025-09-08 09:21:13] INFO [0003] Título detectado: CÓDIGO DE ASISTENCIA SOCIAL DEL ESTADO DE JALISCO
[2025-09-08 09:21:13] INFO [0002] Título detectado: CODIGO CIVIL DEL ESTADO DE JALISCO
[2025-09-08 09:21:13] INFO [0005] Leído con encoding=utf-8
[2025-09-08 09:21:13] INFO [0004] split: decreto=2 líneas, norma=281 líneas, trans=16 líneas
[2025-09-08 09:21:13] INFO [0003] Año_publicación detectado: 2019
[2025-09-08 09:21:13] INFO [0002] Sin Año_p

## Correrlo

In [17]:
batch_process_clean("clean_*.txt", max_workers=4, skip_existing=False)


[2025-09-08 09:24:35] INFO resolve_input: 301 archivos encontrados para patrón='clean_*.txt'
[Paso 15] Procesando 301 archivos (skip_existing=False, workers=4)
[2025-09-08 09:24:35] INFO [0001] Leído con encoding=utf-8
[2025-09-08 09:24:35] INFO [0001] split: decreto=4 líneas, norma=1035 líneas, trans=437 líneas
[2025-09-08 09:24:35] INFO [0002] Leído con encoding=utf-8
[2025-09-08 09:24:35] INFO [0004] Leído con encoding=utf-8
[2025-09-08 09:24:35] INFO [0001] Título detectado: CONSTITUCIÓN POLÍTICA DEL ESTADO DE JALISCO
[2025-09-08 09:24:35] INFO [0003] Leído con encoding=utf-8
[2025-09-08 09:24:35] INFO [0001] Año_publicación detectado: 1994
[2025-09-08 09:24:35] INFO [0004] split: decreto=2 líneas, norma=281 líneas, trans=16 líneas
[2025-09-08 09:24:35] INFO [0002] split: decreto=6 líneas, norma=5663 líneas, trans=349 líneas
[2025-09-08 09:24:35] INFO [0003] split: decreto=4 líneas, norma=598 líneas, trans=49 líneas
[2025-09-08 09:24:35] INFO [0001] Assemble OK: 127 artículos perma

[{'base': '0004',
  'file': 'c:\\Users\\braul\\Documents\\_ITAMLaptop\\Datalab\\DataMakers\\Leyes\\14\\temp\\clean\\clean_0004.txt',
  'status': 'ok',
  'json_path': 'c:\\Users\\braul\\Documents\\_ITAMLaptop\\Datalab\\DataMakers\\Leyes\\14\\Refined\\json\\0004.json',
  'diag': {'n_chapters': 19,
   'n_perm_articles': 31,
   'n_perm_fracciones': 128,
   'n_trans_caps': 1,
   'n_trans_articles': 3,
   'n_trans_fracciones': 0,
   'chapter_labels': ['Capítulo Primero Disposiciones Generales',
    'Capítulo Segundo De los principios rectores',
    'Capítulo Tercero De las directrices aplicables a los principios rectores',
    'Capítulo Cuarto De los valores fundamentales en el comportamiento y desempeño de los servidores públicos del Congreso del Estado',
    'Capítulo Quinto De las reglas de integridad',
    'Sección primera De la actuación pública',
    'Sección segunda De la administración de bienes muebles e inmuebles',
    'Sección tercera Del comportamiento digno',
    'Sección cuarta