# Encaje Semántico de *Complaints* (NHTSA) con **intfloat/multilingual-e5-large-instruct**

## 1. Objetivo

Se realizaron **embeddings semánticos** del corpus de *Complaints* de NHTSA para habilitar búsqueda semántica, deduplicación analítica y vinculación posterior en un **grafo de conocimiento**. Se priorizó un pipeline **reproducible y eficiente** en memoria/tiempo, robusto a interrupciones.

---

## 2. Datos y alcance

* **Fuente** (normalizada previamente):

  * Parquet: `/content/drive/MyDrive/NHTSA/processed/complaints.parquet`
  * JSONL (alternativo): `/content/drive/MyDrive/NHTSA/processed/complaints_corpus.jsonl`
* **Texto base**: campo **`CDESCR`** (narrativa de la queja).
* **Metadatos**: `CMPLID`/`ODINO` (ID), `MAKETXT`, `MODELTXT`, `YEARTXT`, `COMPDESC`, etc.

**Nota metodológica.** En esta etapa se optó por **no aplicar chunking** artificial: cada fila (queja) se representa con **un único embedding** del texto narrativo completo, favoreciendo la coherencia a nivel de caso. La opción de chunking queda reservada para tareas donde el *recall* dependa de pasajes cortos.

---

## 3. Modelo y configuración

* **Modelo**: `intfloat/multilingual-e5-large-instruct` (1024 dims).
* **Formato instruct**: prefijo **`passage: `** al texto (estilo E5-instruct).
* **Normalización**: vectores **L2-normalizados** (adecuados para similitud coseno).
* **Dispositivo**: `cuda` si disponible (con **TF32** habilitado); *fallback* CPU.
* **Parámetros de rendimiento**:

  * `max_seq_length = 256` (menor *padding* ⇒ mayor throughput).
  * `batch_size = 1024` (ajustable según VRAM).
  * Escritura **streaming** a disco (memmap) por **shards** reanudables.

---

## 4. Pipeline de encaje (resumen técnico)

1. **Selección de origen**: Parquet (preferente) o JSONL.
2. **Lectura *streaming*** en lotes (≈200k filas por tirón) para **evitar materializar** el corpus completo en RAM.
3. **Preparación del texto**:

   * Normalización de columnas mínimas (`ID`, `TEXT`).
   * Prefijo `passage: ` para E5-instruct (sin copiar toda la columna en RAM).
4. **(Opcional) deduplicación por batch** mediante *hash* (`hash_pandas_object`).

   * En *Complaints* se dejó **desactivada** (*DEDUP_TEXTS = False*), dada la menor redundancia observada y para preservar matices por incidente.
5. **Encaje por shards** (p.ej., 25 000 filas/shard):

   * *Batching* en GPU/CPU.
   * Escritura directa a **memmap `.npy`** (sin acumular todo el shard en RAM).
   * Guardado de **metadatos alineados** (`meta_shard_XXXX.parquet`).
6. **Manifest de ejecución** (`manifest.json`):

   * Registro de shards, filas, dimensión, *flags* (normalize, fp16), y estado (**reanudable** ante interrupciones).

---

## 5. Gestión de recursos y robustez

* **RAM host** contenida**:** lectura por lotes + memmap de salida.
* **VRAM** aprovechada**:** `batch_size` alto y `max_seq_length` reducido (menos *padding*).
* **Reanudación** ante cortes**:** cada shard se cierra atómicamente y queda inventariado en `manifest.json`.
* **Opcional FP16 en disco**: `USE_FP16_STORAGE=True` reduce ≈50 % el tamaño de los `.npy` con pérdida marginal en *recall*.

---

## 6. Artefactos generados

Directorio de salida:
`/content/drive/MyDrive/NHTSA/embeddings/complaints_e5_mlg_instruct/`

* `embeddings_shard_0000.npy`, `embeddings_shard_0001.npy`, … (vectores 1024-D).
* `meta_shard_0000.parquet`, `meta_shard_0001.parquet`, … (ID y metadatos alineados).
* `manifest.json` (descripción integral para reanudar e indexar).

---

## 7. Controles de calidad

Se aplicaron verificaciones automáticas por shard:

* **Dimensión** consistente (=1024).
* **Cardinalidad**: filas de embeddings == filas de metadatos.
* **Normalización**: norma L2 ≈ 1.0 (muestreo).
* **Integridad**: `id` no nulo; proporción de textos vacíos ≈ 0 % (previa limpieza).
* **Trazabilidad**: cada shard queda registrado en el *manifest* con estado `complete`.

---

## 8. Justificación metodológica

* **Sin chunking**: preserva contexto de incidente y reduce costo total de inferencia; apropiado cuando las preguntas operativas se refieren al **caso completo** (e.g., co-ocurrencias entre narrativa y metadatos).
* **E5-instruct**: alineado con recuperación semántica multilingüe y buen equilibrio **calidad/latencia/almacenamiento** observado en evaluaciones previas.
* **Streaming + shards**: requisito práctico dado el volumen (≈millones de registros) y limitaciones de RAM y tiempo de sesión.

---

## 9. Limitaciones y mitigaciones

* **Textos muy largos**: truncados a `max_seq_length=256`; si emergen consultas sensibles a contexto distal, elevar a 384/512 en una **re-codificación focalizada**.
* **Deduplicación desactivada**: conserva granularidad; si se detectan redundancias temáticas, se puede activar por lotes o aplicar **agregación por similitud** posterior (clustering).
* **Variabilidad idiomática**: E5 es multilingüe; sin embargo, para dominios mixtos EN/ES/tech, la cobertura es adecuada pero no perfecta. Mitigar con **re-ranking** de pasajes relevantes si se integran consultas complejas.

---

## 10. Próximos pasos

1. **Indexación** (FAISS/HNSW) por shards ⇒ índice global (coseno/IP).
2. **Evaluación R2R/Q2R** sobre *Complaints* (métrica: Recall@k, nDCG@k) con *queries* canónicas.
3. **Integración** al **grafo**: unificación por `VIN`, `MMY`, `COMPONENT`, *campaign* y *investigations*.
4. **RAG/Agente**: *retrieval* híbrido (BM25 + vectores) + *re-ranker* ligero para respuestas explicables.

---

## 11. Reproducibilidad (resumen de parámetros)

* `model = "intfloat/multilingual-e5-large-instruct"`
* `max_seq_length = 256`, `batch_size = 1024`, `normalize = True`
* `STREAM_READ_ROWS = 200_000`, `SHARD_ROWS = 25_000`
* `USE_FP16_STORAGE = False` (opcionalmente `True`)
* Salida: `…/complaints_e5_mlg_instruct/{embeddings_shard_*.npy, meta_shard_*.parquet, manifest.json}`

---


In [None]:
# ── Montaje de Drive  ─────────────────────────────────────────────
from google.colab import drive
drive.mount('/content/drive')

# ── Dependencias ──────────────────────────────────────────────────────────────
!pip -q install sentence-transformers==3.0.1

# ── Librerías ─────────────────────────────────────────────────────────────────
import os, re, json, math, hashlib, textwrap, gc, time
from pathlib import Path
import numpy as np
import pandas as pd

# ── Detección de dispositivo ─────────────────────────────────────────────────
import torch
if torch.cuda.is_available():
    DEVICE = "cuda"
else:
    try:
        DEVICE = "mps" if torch.backends.mps.is_available() else "cpu"
    except Exception:
        DEVICE = "cpu"

MODEL_NAME = "intfloat/multilingual-e5-large-instruct"   # mismo que elegimos
NORMALIZE  = True

# Batches sugeridos (ajusta si tu GPU es T4/V100/A100/CPU)
BATCH_SIZE = 64 if DEVICE in ("cuda","mps") else 16

# ── Rutas I/O ─────────────────────────────────────────────────────────────────
BASE_IN   = Path("/content/drive/MyDrive/NHTSA/processed")
PARQ_IN   = BASE_IN / "complaints.parquet"
JSONL_IN  = BASE_IN / "complaints_corpus.jsonl"  # alternativo

BASE_OUT  = Path("/content/drive/MyDrive/NHTSA/embeddings/complaints_e5_mlg_instruct")
BASE_OUT.mkdir(parents=True, exist_ok=True)

VEC_NPY   = BASE_OUT / "complaints_embeddings.npy"
META_OUT  = BASE_OUT / "complaints_chunks_meta.parquet"
MAP_CSV   = BASE_OUT / "id_map.csv"
CONF_JSON = BASE_OUT / "config.json"

print(f"Device: {DEVICE} | Batch size: {BATCH_SIZE}")
print("Entradas posibles:")
print(" -", PARQ_IN,  "| existe:", PARQ_IN.exists())
print(" -", JSONL_IN, "| existe:", JSONL_IN.exists())
print("Salidas ->", BASE_OUT)


Mounted at /content/drive
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.1/227.1 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25hDevice: cuda | Batch size: 64
Entradas posibles:
 - /content/drive/MyDrive/NHTSA/processed/complaints.parquet | existe: True
 - /content/drive/MyDrive/NHTSA/processed/complaints_corpus.jsonl | existe: True
Salidas -> /content/drive/MyDrive/NHTSA/embeddings/complaints_e5_mlg_instruct


In [None]:
# ──────────────────────────────────────────────────────────────────────────────
# CE L D A 1 · Cargar corpus de Complaints y generar chunks (~900 caracteres)
# ──────────────────────────────────────────────────────────────────────────────
import pandas as pd, numpy as np, json, textwrap
from pathlib import Path
from tqdm import tqdm

PARQ_IN  = Path("/content/drive/MyDrive/NHTSA/processed/complaints.parquet")
JSONL_IN = Path("/content/drive/MyDrive/NHTSA/processed/complaints_corpus.jsonl")

def normalize_year(x):
    try:
        v = int(str(x))
        if 1949 <= v <= 2035:
            return v
    except:
        pass
    return np.nan

def first_nonnull(d, keys, default=None):
    for k in keys:
        if k in d and pd.notna(d[k]) and str(d[k]).strip() != "":
            return d[k]
    return default

def hard_wrap(text: str, width: int = 900) -> list[str]:
    """Divide texto en fragmentos ≤ width respetando espacios."""
    if not isinstance(text, str) or not text.strip():
        return []
    return textwrap.wrap(text.strip(), width=width, break_long_words=False, break_on_hyphens=False)

def load_complaints_corpus(parquet_path: Path, jsonl_path: Path) -> pd.DataFrame:
    """Devuelve DataFrame con columnas mínimas id, text (+ metadatos si existen)."""
    if parquet_path.exists():
        df = pd.read_parquet(parquet_path)
        cols = [c.upper() for c in df.columns]
        df.columns = cols

        id_col   = "CMPLID" if "CMPLID" in cols else "ODINO" if "ODINO" in cols else None
        text_col = "CDESCR" if "CDESCR" in cols else None
        assert id_col and text_col, "No se encontraron columnas mínimas CMPLID/ODINO y CDESCR"

        base = pd.DataFrame({
            "id": df[id_col].astype(str),
            "text": df[text_col].astype(str)
        })
        if "MAKETXT" in cols:   base["make"]      = df["MAKETXT"].astype(str)
        if "MODELTXT" in cols:  base["model"]     = df["MODELTXT"].astype(str)
        if "YEARTXT" in cols:   base["year"]      = df["YEARTXT"].map(normalize_year)
        if "COMPDESC" in cols:  base["component"] = df["COMPDESC"].astype(str)
        return base

    elif jsonl_path.exists():
        rows = []
        with open(jsonl_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    obj = json.loads(line)
                except json.JSONDecodeError:
                    continue
                _id  = obj.get("id")
                _txt = obj.get("text")
                meta = obj.get("metadata", {}) or {}
                if not _id or not _txt:
                    continue
                rows.append({
                    "id": str(_id),
                    "text": str(_txt),
                    "make": first_nonnull(meta, ["MAKETXT","make"]),
                    "model": first_nonnull(meta, ["MODELTXT","model"]),
                    "year": normalize_year(first_nonnull(meta, ["YEARTXT","year"])),
                    "component": first_nonnull(meta, ["COMPDESC","component"])
                })
        return pd.DataFrame(rows)
    else:
        raise FileNotFoundError("No se encontraron complaints.parquet ni complaints_corpus.jsonl")

def make_chunks(df_base: pd.DataFrame, wrap_width: int = 900) -> pd.DataFrame:
    """Crea chunks textuales ≤ wrap_width con metadatos básicos."""
    records = []
    for _, r in tqdm(df_base.iterrows(), total=len(df_base), desc="Chunking"):
        rid = str(r["id"])
        text = str(r["text"]) if pd.notna(r["text"]) else ""
        chunks = hard_wrap(text, width=wrap_width)
        if not chunks:
            continue
        for j, ch in enumerate(chunks):
            records.append({
                "chunk_id": f"{rid}::ch{j}",
                "id": rid,
                "make": r.get("make"),
                "model": r.get("model"),
                "year": r.get("year"),
                "component": r.get("component"),
                "chunk_idx": j,
                "text": ch
            })
    return pd.DataFrame.from_records(records)

# Ejecutar carga y chunking
corpusC = load_complaints_corpus(PARQ_IN, JSONL_IN)
print("Corpus base complaints:", corpusC.shape)
display(corpusC.head(3))

complaints_ch = make_chunks(corpusC, wrap_width=1200)
print("Chunks generados:", complaints_ch.shape)
display(complaints_ch.head(5))

# Chequeos mínimos
assert {"id","text","chunk_id"}.issubset(complaints_ch.columns), "Faltan columnas mínimas"
assert len(complaints_ch) > 0, "No hay chunks generados"


Corpus base complaints: (2137711, 6)


Unnamed: 0,id,text,make,model,year,component
0,1,RADIATOR FAILED @ HIGHWAY SPEED OBSTRUCTING DR...,VOLVO,760,,ENGINE AND ENGINE COOLING:COOLING SYSTEM:RADIA...
1,2,"FUEL LEAKED FROM FUEL TANK AREA, EMITTING STRO...",FORD,THUNDERBIRD,,"FUEL SYSTEM, GASOLINE:DELIVERY"
2,3,SHIFTED INTO REVERSE VEHICLE JERKED VIOLENTLY....,KIA,SEPHIA,,POWER TRAIN:AUTOMATIC TRANSMISSION


Chunking: 100%|██████████| 2137711/2137711 [06:08<00:00, 5795.62it/s]


Chunks generados: (2299879, 8)


Unnamed: 0,chunk_id,id,make,model,year,component,chunk_idx,text
0,1::ch0,1,VOLVO,760,,ENGINE AND ENGINE COOLING:COOLING SYSTEM:RADIA...,0,RADIATOR FAILED @ HIGHWAY SPEED OBSTRUCTING DR...
1,2::ch0,2,FORD,THUNDERBIRD,,"FUEL SYSTEM, GASOLINE:DELIVERY",0,"FUEL LEAKED FROM FUEL TANK AREA, EMITTING STRO..."
2,3::ch0,3,KIA,SEPHIA,,POWER TRAIN:AUTOMATIC TRANSMISSION,0,SHIFTED INTO REVERSE VEHICLE JERKED VIOLENTLY....
3,4::ch0,4,DODGE,600,,"FUEL SYSTEM, GASOLINE:STORAGE:TANK ASSEMBLY",0,FUEL TANK ; LEAKS BECAUSE OF RUST GAS LEAK BY ...
4,5::ch0,5,DODGE,CARAVAN,,SEATS,0,"DRIVER SIDE SEAT FRAME BROKE IN TWO, CAUSING S..."


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
# Mitiga fragmentación de VRAM (PyTorch 2.0+)
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
os.environ["TOKENIZERS_PARALLELISM"] = "false"


In [None]:
# ===================== Complaints → e5-multilingual-large-instruct (RESUME + REPAIR) =====================
import os, json, time, gc, re, math
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
import pyarrow.parquet as pq
import torch
from sentence_transformers import SentenceTransformer
from contextlib import nullcontext
from torch.cuda.amp import autocast

# -------------------- Config --------------------
MODEL_NAME        = "intfloat/multilingual-e5-large-instruct"
OUT_DIR           = Path("/content/drive/MyDrive/NHTSA/embeddings/complaints_e5_mlg_instruct")
PARQ_IN           = Path("/content/drive/MyDrive/NHTSA/processed/complaints.parquet")
JSONL_IN          = Path("/content/drive/MyDrive/NHTSA/processed/complaints_corpus.jsonl")

DEVICE            = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE        = 512
INIT_BATCH        = 512
MIN_BATCH         = 64
SHARD_ROWS        = 25_000
STREAM_READ_ROWS  = 200_000
MAX_SEQ_LEN       = 256
NORMALIZE         = True

# Al principio guardaste en float32 (USE_FP16_STORAGE=False), mantenemos eso:
USE_FP16_STORAGE  = False
DEDUP_TEXTS       = False

SAFE_MODE_NO_DELETE = True  # no borrar nada

os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
OUT_DIR.mkdir(parents=True, exist_ok=True)

manifest_path   = OUT_DIR / "manifest.json"
checkpoint_path = OUT_DIR / "checkpoint.json"

# -------------------- Utilidades --------------------
def load_manifest(path: Path):
    if path.exists():
        with open(path, "r") as f:
            return json.load(f)
    return {"dataset":"complaints","model":MODEL_NAME,"device":DEVICE,"dim":None,
            "batch_size":BATCH_SIZE,"max_seq_len":MAX_SEQ_LEN,"normalize":NORMALIZE,
            "fp16_storage":USE_FP16_STORAGE,"shard_rows":SHARD_ROWS,"shards":[]}

def save_manifest(obj, path: Path):
    tmp = path.with_suffix(".tmp")
    with open(tmp, "w") as f: json.dump(obj, f, indent=2)
    os.replace(tmp, path)

def load_checkpoint(path: Path):
    if path.exists():
        with open(path, "r") as f:
            return json.load(f)
    return {"rows_done": 0, "next_shard_idx": 0}

def save_checkpoint(rows_done: int, next_idx: int):
    tmp = checkpoint_path.with_suffix(".tmp")
    with open(tmp, "w") as f:
        json.dump({"rows_done": int(rows_done), "next_shard_idx": int(next_idx)}, f)
    os.replace(tmp, checkpoint_path)

def pick_source():
    if PARQ_IN.exists(): return "parquet"
    if JSONL_IN.exists(): return "jsonl"
    raise FileNotFoundError("No se encontró complaints.parquet ni complaints_corpus.jsonl")

def format_e5_passage_col(series: pd.Series) -> pd.Series:
    return "passage: " + series.astype("string").str.strip()

def hash64(s: pd.Series) -> pd.Series:
    return pd.util.hash_pandas_object(s.astype("string"), index=False).astype("int64")

def stream_parquet_batches(parquet_path: Path, cols=None, batch_rows=STREAM_READ_ROWS):
    pf = pq.ParquetFile(str(parquet_path))
    read_cols = cols or pf.schema.names
    for batch in pf.iter_batches(batch_size=batch_rows, columns=read_cols):
        yield batch.to_pandas(types_mapper=None)

def stream_jsonl_batches(jsonl_path: Path, batch_rows=STREAM_READ_ROWS):
    buf = {"ID":[], "TEXT":[], "MAKETXT":[], "MODELTXT":[], "YEARTXT":[], "COMPDESC":[]}
    n = 0
    def flush():
        nonlocal buf
        if buf["ID"]:
            yield pd.DataFrame(buf)
        buf = {"ID":[], "TEXT":[], "MAKETXT":[], "MODELTXT":[], "YEARTXT":[], "COMPDESC":[]}
    import json as _json
    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in f:
            try: obj = _json.loads(line)
            except _json.JSONDecodeError: continue
            _id = obj.get("id"); _txt = obj.get("text"); meta = obj.get("metadata") or {}
            if not _id or not _txt: continue
            buf["ID"].append(str(_id)); buf["TEXT"].append(str(_txt))
            buf["MAKETXT"].append(meta.get("MAKETXT")); buf["MODELTXT"].append(meta.get("MODELTXT"))
            buf["YEARTXT"].append(meta.get("YEARTXT")); buf["COMPDESC"].append(meta.get("COMPDESC"))
            n += 1
            if n % batch_rows == 0:
                yield from flush()
    yield from flush()

# ---------- REPAIR: shards escritos como bruto -> convertir a .npy válido ----------
def _idx_from_name(p: Path):
    m = re.search(r'(\d{4})', p.stem)
    return int(m.group(1)) if m else None

def _try_npy_shape(path: Path):
    # Prueba cargar como .npy normal
    arr = np.load(path, mmap_mode="r", allow_pickle=False)
    return int(arr.shape[0]), int(arr.shape[1])

def _repair_headerless_npy(p_npy: Path, p_meta: Path):
    """
    Si el archivo es binario crudo (sin encabezado .npy), lo rehace a formato .npy usando:
      - filas = len(meta)
      - dim    = file_size / (rows * bytes_per_elem)
      - dtype  = float32 por default (o float16 si detectas que cuadra)
    """
    rows_meta = len(pd.read_parquet(p_meta))
    size_bytes = p_npy.stat().st_size

    # Intentar con float32 primero
    bpe32 = 4
    dim32 = size_bytes / (rows_meta * bpe32) if rows_meta > 0 else 0
    ok32  = dim32.is_integer() and dim32 > 0

    # Intentar con float16 también
    bpe16 = 2
    dim16 = size_bytes / (rows_meta * bpe16) if rows_meta > 0 else 0
    ok16  = dim16.is_integer() and dim16 > 0

    if not ok32 and not ok16:
        raise ValueError(f"No cuadra tamaño con filas meta. size={size_bytes}, rows={rows_meta}, dim32={dim32}, dim16={dim16}")

    # Preferimos float32 (coincide con tu config original)
    if ok32:
        dtype = np.float32
        dim   = int(dim32)
        bpe   = bpe32
    else:
        dtype = np.float16
        dim   = int(dim16)
        bpe   = bpe16

    # Abrir el binario crudo como memmap y re-empaquetar a .npy con encabezado
    mm = np.memmap(str(p_npy), mode="r", dtype=dtype, shape=(rows_meta, dim))
    tmp = p_npy.with_suffix(".npy.tmp")  # mismo nombre + .tmp

    # USAR open_memmap para crear un .npy válido con encabezado
    from numpy.lib.format import open_memmap
    fp = open_memmap(str(tmp), mode="w+", dtype=dtype, shape=(rows_meta, dim))
    fp[:] = mm[:]
    del fp, mm
    os.replace(tmp, p_npy)  # commit atómico

    return rows_meta, dim, dtype

def repair_and_read_shape(p_npy: Path, p_meta: Path):
    try:
        return _try_npy_shape(p_npy)
    except Exception:
        # Intentar reparar si era raw
        rows, dim, dtype = _repair_headerless_npy(p_npy, p_meta)
        # Verificar ahora sí como .npy
        r2, d2 = _try_npy_shape(p_npy)
        assert (rows, dim) == (r2, d2), f"Reparado pero forma inconsistente: {(rows,dim)} vs {(r2,d2)}"
        return rows, dim

# -------------------- HARD RESUME con REPAIR --------------------
def hard_resume_from_disk_with_repair(out_dir: Path, manifest_path: Path, checkpoint_path: Path):
    npys  = { _idx_from_name(p): p for p in out_dir.glob("embeddings_shard_*.npy") if _idx_from_name(p) is not None and not str(p).endswith(".tmp") }
    metas = { _idx_from_name(p): p for p in out_dir.glob("meta_shard_*.parquet")  if _idx_from_name(p) is not None and not str(p).endswith(".tmp") }

    complete = []
    warnings = []
    dim_detected = None

    for idx in sorted(set(npys) & set(metas)):
        p_npy, p_meta = npys[idx], metas[idx]
        try:
            rows_npy, dim = repair_and_read_shape(p_npy, p_meta)  # <-- repara si hace falta
            rows_meta = len(pd.read_parquet(p_meta))
            if rows_npy == rows_meta and rows_npy > 0:
                complete.append((idx, rows_npy, p_npy, p_meta, dim))
                dim_detected = dim
            else:
                warnings.append(f"[skip] {idx:04d} filas_npy={rows_npy} != filas_meta={rows_meta}")
        except Exception as e:
            warnings.append(f"[skip] {idx:04d} error al leer/reparar: {e}")

    if warnings:
        print("\n".join(warnings))

    rows_done = sum(r for (i, r, *_ ) in complete)
    next_idx = (max([i for (i, *_ ) in complete], default=-1) + 1)

    man = {
        "dataset": "complaints",
        "model": MODEL_NAME,
        "device": DEVICE,
        "dim": dim_detected,
        "batch_size": BATCH_SIZE,
        "max_seq_len": MAX_SEQ_LEN,
        "normalize": NORMALIZE,
        "fp16_storage": USE_FP16_STORAGE,
        "shard_rows": SHARD_ROWS,
        "shards": [
            {"shard_idx": int(i), "rows": int(r), "embeddings": str(pn), "meta": str(pm), "complete": True}
            for (i, r, pn, pm, _d) in sorted(complete, key=lambda x: x[0])
        ]
    }
    save_manifest(man, manifest_path)
    save_checkpoint(int(rows_done), int(next_idx))

    print(f"[HARD RESUME + REPAIR] shards_ok={len(complete)} | rows_done={rows_done:,} | next_shard_idx={next_idx}")
    return rows_done, next_idx, dim_detected

# -------------------- Modelo --------------------
t0 = time.time()
model = SentenceTransformer(MODEL_NAME, device=DEVICE)
try:
    model.max_seq_length = MAX_SEQ_LEN
except Exception:
    pass
if DEVICE == "cuda":
    torch.backends.cuda.matmul.allow_tf32 = True
    try: torch.set_float32_matmul_precision("high")
    except Exception: pass
print(f"Device: {DEVICE} | Modelo cargado en {time.time()-t0:.2f}s (max_len={MAX_SEQ_LEN}, bs={BATCH_SIZE})")

# -------------------- Reconstrucción de estado (con REPAIR) --------------------
rows_done, shard_idx, _ = hard_resume_from_disk_with_repair(OUT_DIR, manifest_path, checkpoint_path)
ck = load_checkpoint(checkpoint_path)
rows_done = max(rows_done, ck.get("rows_done", 0))
shard_idx = max(shard_idx, ck.get("next_shard_idx", 0))
print(f"▶︎ Reanudación efectiva: rows_done={rows_done:,} | next_shard_idx={shard_idx}")

# -------------------- Encode + commit atómico (usa open_memmap para .npy válido) --------------------
def encode_and_flush_shard_streaming(cur_sub: pd.DataFrame, shard_idx: int):
    shard_emb_final  = OUT_DIR / f"embeddings_shard_{shard_idx:04d}.npy"
    shard_meta_final = OUT_DIR / f"meta_shard_{shard_idx:04d}.parquet"
    shard_emb_tmp    = OUT_DIR / f"embeddings_shard_{shard_idx:04d}.npy.tmp"
    shard_meta_tmp   = OUT_DIR / f"meta_shard_{shard_idx:04d}.parquet.tmp"

    # Skip si ya existe (consistente)
    if shard_emb_final.exists() and shard_meta_final.exists():
        try:
            rows_expected = len(pd.read_parquet(shard_meta_final))
            arr = np.load(shard_emb_final, mmap_mode="r", allow_pickle=False)
            rows_npy, dim = int(arr.shape[0]), int(arr.shape[1])
            if rows_npy == rows_expected:
                man = load_manifest(manifest_path)
                man["dim"] = int(dim) if man.get("dim") is None else man["dim"]
                man["shards"] = [s for s in man["shards"] if int(s["shard_idx"]) != shard_idx]
                man["shards"].append({"shard_idx": shard_idx, "rows": rows_npy,
                                      "embeddings": str(shard_emb_final), "meta": str(shard_meta_final),
                                      "complete": True})
                man["shards"] = sorted(man["shards"], key=lambda s: s["shard_idx"])
                save_manifest(man, manifest_path)
                save_checkpoint(sum(s["rows"] for s in man["shards"]), shard_idx+1)
                print(f"↪︎ Shard {shard_idx:04d} ya existía y es consistente. SKIP.")
                return rows_npy, dim
        except Exception as e:
            print(f"⚠️ Shard {shard_idx:04d} existente pero no comprobable ({e}). Reescribiendo de forma segura…")

    # Preparar textos
    texts = cur_sub["__model_text"].astype("string").tolist()
    rows  = len(texts)
    if rows == 0:
        man = load_manifest(manifest_path)
        man["shards"] = [s for s in man["shards"] if int(s["shard_idx"]) != shard_idx]
        save_manifest(man, manifest_path)
        save_checkpoint(sum(s["rows"] for s in man["shards"]), shard_idx+1)
        print(f"↪︎ Shard {shard_idx:04d} vacío. SKIP.")
        return 0, man.get("dim")

    cur_bs = min(BATCH_SIZE, rows)
    dtype = np.float16 if USE_FP16_STORAGE else np.float32
    use_autocast = (DEVICE == "cuda")
    # torch.amp.autocast recomendado (mensaje deprecación)
    amp_ctx = (lambda: torch.amp.autocast("cuda", dtype=torch.float16)) if use_autocast else nullcontext

    # 1) Embeddings → open_memmap *.tmp (crea .npy válido con header)
    from numpy.lib.format import open_memmap
    fp = None
    dim = None
    pbar = tqdm(total=rows, desc=f"Shard {shard_idx:04d}", unit="row", leave=False)
    rows_written = 0
    try:
        i = 0
        while i < rows:
            bs = min(cur_bs, rows - i)
            try:
                batch = texts[i:i+bs]
                with amp_ctx():
                    with torch.no_grad():
                        vec = model.encode(batch, batch_size=bs, convert_to_numpy=True,
                                           normalize_embeddings=NORMALIZE, show_progress_bar=False
                                           ).astype("float32", copy=False)
                if dim is None:
                    dim = vec.shape[1]
                    fp  = open_memmap(str(shard_emb_tmp), mode="w+", dtype=dtype, shape=(rows, dim))
                n_now = vec.shape[0]
                fp[rows_written:rows_written+n_now, :] = vec.astype(dtype, copy=False)
                rows_written += n_now; i += n_now; pbar.update(n_now)
                del batch, vec; gc.collect()
                if torch.cuda.is_available(): torch.cuda.empty_cache()
                if DEVICE == "cuda":
                    free, total = torch.cuda.mem_get_info()
                    if free/total > 0.50 and cur_bs < INIT_BATCH:
                        cur_bs = min(INIT_BATCH, cur_bs*2)
            except RuntimeError as e:
                msg = str(e).lower()
                if "out of memory" in msg or "cuda" in msg:
                    del batch; gc.collect()
                    if torch.cuda.is_available(): torch.cuda.empty_cache()
                    new_bs = max(MIN_BATCH, bs//2)
                    if new_bs < bs:
                        cur_bs = new_bs
                        continue
                    else:
                        raise
                else:
                    raise
    finally:
        pbar.close()
        if fp is not None:
            del fp
        gc.collect()

    # 2) Meta → parquet *.tmp
    meta_cols = [c for c in ["id","make","model","year","component","_h"] if c in cur_sub.columns]
    cur_sub[meta_cols].to_parquet(shard_meta_tmp, index=False)

    # 3) Commit atómico
    os.replace(shard_emb_tmp, shard_emb_final)
    os.replace(shard_meta_tmp, shard_meta_final)

    # 4) Actualizar manifest y checkpoint
    man = load_manifest(manifest_path)
    man["dim"] = int(dim) if man.get("dim") is None else man["dim"]
    man["shards"] = [s for s in man["shards"] if int(s["shard_idx"]) != shard_idx]
    man["shards"].append({"shard_idx": shard_idx, "rows": rows_written,
                          "embeddings": str(shard_emb_final), "meta": str(shard_meta_final),
                          "complete": True})
    man["shards"] = sorted(man["shards"], key=lambda s: s["shard_idx"])
    save_manifest(man, manifest_path)
    save_checkpoint(sum(s["rows"] for s in man["shards"]), shard_idx+1)

    print(f"→ Shard {shard_idx:04d} COMPLETO | filas={rows_written:,} | dim={dim} | {shard_emb_final.name}")
    return rows_written, dim

# -------------------- Bucle principal (respeta checkpoint/shards existentes) --------------------
SOURCE = pick_source()
print(f"Origen de datos: {SOURCE}")

source_iter = (stream_parquet_batches(PARQ_IN, cols=None, batch_rows=STREAM_READ_ROWS)
               if SOURCE=="parquet" else
               stream_jsonl_batches(JSONL_IN, batch_rows=STREAM_READ_ROWS))

ck = load_checkpoint(checkpoint_path)
skip_rows = ck.get("rows_done", 0)           # <- salta filas ya embebidas
next_idx  = ck.get("next_shard_idx", 0)
shard_idx = max(shard_idx, next_idx)

acc_parts, acc_rows = [], 0
total_rows_processed = skip_rows

for df in source_iter:
    df.columns = [c.upper() for c in df.columns]

    if "TEXT" not in df.columns:
        if "CDESCR" in df.columns:
            df["TEXT"] = df["CDESCR"].astype(str)
        else:
            raise AssertionError("No se encontró columna de texto (TEXT o CDESCR).")
    if "ID" not in df.columns:
        cid = "CMPLID" if "CMPLID" in df.columns else ("ODINO" if "ODINO" in df.columns else None)
        assert cid is not None, "No se encontró ID (CMPLID/ODINO)."
        df["ID"] = df[cid].astype(str)

    # Skip por checkpoint (salta lotes completos/parciales)
    if skip_rows > 0:
        if len(df) <= skip_rows:
            skip_rows -= len(df)
            continue
        else:
            df = df.iloc[skip_rows:].copy()
            skip_rows = 0

    df["_h"] = hash64(df["TEXT"])
    if DEDUP_TEXTS:
        df = df.drop_duplicates("_h")
    df["__model_text"] = format_e5_passage_col(df["TEXT"])

    keep = ["ID","__model_text","_h"]
    for c in ["MAKETXT","MODELTXT","YEARTXT","COMPDESC"]:
        if c in df.columns: keep.append(c)

    acc_parts.append(df[keep]); acc_rows += len(df)
    del df; gc.collect()

    if acc_rows >= SHARD_ROWS:
        cur = pd.concat(acc_parts, ignore_index=True)
        if len(cur) > SHARD_ROWS:
            cur_sub = cur.iloc[:SHARD_ROWS].copy()
            rest    = cur.iloc[SHARD_ROWS:].copy()
            acc_parts = [rest] if len(rest) else []
            acc_rows  = 0 if not acc_parts else len(rest)
        else:
            cur_sub = cur; acc_parts = []; acc_rows = 0

        cur_sub = cur_sub.rename(columns={
            "ID":"id", "MAKETXT":"make", "MODELTXT":"model",
            "YEARTXT":"year", "COMPDESC":"component"
        })
        n_written, dim = encode_and_flush_shard_streaming(cur_sub, shard_idx)
        shard_idx += 1
        total_rows_processed += n_written
        print(f"Total filas procesadas: {total_rows_processed:,}")

# flush final
if acc_parts:
    cur = pd.concat(acc_parts, ignore_index=True)
    cur = cur.rename(columns={
        "ID":"id", "MAKETXT":"make", "MODELTXT":"model",
        "YEARTXT":"year", "COMPDESC":"component"
    })
    n_written, dim = encode_and_flush_shard_streaming(cur, shard_idx)
    shard_idx += 1
    total_rows_processed += n_written
    print(f"Total filas procesadas: {total_rows_processed:,}")

print("Manifest:", manifest_path)
print("Checkpoint:", checkpoint_path)


Device: cuda | Modelo cargado en 4.81s (max_len=256, bs=512)
[HARD RESUME + REPAIR] shards_ok=11 | rows_done=275,000 | next_shard_idx=11
▶︎ Reanudación efectiva: rows_done=275,000 | next_shard_idx=11
Origen de datos: parquet




→ Shard 0011 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0011.npy
Total filas procesadas: 300,000




→ Shard 0012 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0012.npy
Total filas procesadas: 325,000




→ Shard 0013 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0013.npy
Total filas procesadas: 350,000




→ Shard 0014 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0014.npy
Total filas procesadas: 375,000




→ Shard 0015 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0015.npy
Total filas procesadas: 400,000




→ Shard 0016 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0016.npy
Total filas procesadas: 425,000




→ Shard 0017 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0017.npy
Total filas procesadas: 450,000




→ Shard 0018 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0018.npy
Total filas procesadas: 475,000




→ Shard 0019 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0019.npy
Total filas procesadas: 500,000




→ Shard 0020 COMPLETO | filas=25,000 | dim=1024 | embeddings_shard_0020.npy
Total filas procesadas: 525,000




KeyboardInterrupt: 