# V4 — 01 RAG Ingest (Pinecone)
Este notebook es la versión **ASCII-safe** para cargar tu RAG a Pinecone.

Sugerencia: trabaja desde la carpeta `natubot_v4/` y ajusta `INPUT_PATH` a tus archivos.


# Natubot — Transform + Load RAG → Pinecone (Gemini Embeddings) — v3 (ASCII-safe IDs)
Este notebook:
1) Normaliza tu RAG al contrato `rag_contract_v1` (metadata plana, sin null, tipos válidos).
2) Genera embeddings con Gemini (`gemini-embedding-001`, 768 dims).
3) Hace upsert a Pinecone.
4) Incluye celdas extra para **listar/crear/verificar** tu índice de Pinecone y obtener el **host**.

## Requisitos
- Python 3.9+
- `pip install -U google-genai "pinecone[grpc]" pydantic pydantic-settings`

## Variables de entorno (recomendado)
- `GEMINI_API_KEY`
- `PINECONE_API_KEY`
- `PINECONE_INDEX_NAME` (nombre del índice)
- (opcional) `PINECONE_INDEX_HOST` (si ya lo tienes)
- (opcional) `PINECONE_NAMESPACE`
- Si vas a **crear** índice desde este notebook (opcional):
  - `PINECONE_CLOUD` (ej: aws)
  - `PINECONE_REGION` (ej: us-east-1)

Abre este `.ipynb` en VS Code y ejecútalo celda por celda.

## 0) Instalar dependencias
Si ya estás en tu venv con dependencias instaladas, puedes saltarte esta celda.

In [None]:
# Ejecuta en tu terminal (no dentro del notebook si no quieres):
# pip install -U google-genai "pinecone[grpc]" pydantic pydantic-settings


## 1) Config (aquí pones tus keys / nombre del índice / namespace)

In [None]:
import os
from datetime import datetime, timezone
import json
import hashlib
from typing import Any, Dict, Iterable, List, Optional, Tuple

# ===============
# INPUTS
# ===============
# En VS Code, usa rutas locales:
INPUT_PATH = "./rag_documents.jsonl"        # o "./all_products_merged.json"
OUTPUT_JSONL_PATH = "./rag_contract_v1.jsonl"  # salida normalizada (opcional)

# ===============
# GEMINI
# ===============
# Recomendado: usar env vars. Si quieres pegar la key aquí para pruebas, reemplaza el valor.
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
EMBED_MODEL = "gemini-embedding-001"
EMBED_DIM = 768  # recomendado para Pinecone (menos costo/almacenamiento)

# ===============
# PINECONE
# ===============
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "")
# Nombre del índice ("database" en tu lenguaje). Esto es lo más importante.
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "natubot-index")
# Host del índice (si no lo tienes, este notebook puede intentar resolverlo desde el nombre)
PINECONE_INDEX_HOST = os.getenv("PINECONE_INDEX_HOST", "")

# Namespace: partición lógica dentro del índice (útil para separar datasets/versions)
PINECONE_NAMESPACE = os.getenv("PINECONE_NAMESPACE", "natubot")

# Si quieres crear el índice desde el notebook (opcional)
PINECONE_CLOUD = os.getenv("PINECONE_CLOUD", "aws")
PINECONE_REGION = os.getenv("PINECONE_REGION", "us-east-1")
PINECONE_METRIC = os.getenv("PINECONE_METRIC", "cosine")

# ===============
# Contract / ops
# ===============
SCHEMA_VERSION = "rag_contract_v1"
DATA_VERSION = "mns_2019_v1"
RECORD_TYPE = "product_info"
CONTENT_STATUS_DEFAULT = "complete"

# ===============
# Batch sizes
# ===============
EMBED_BATCH_SIZE = 32
UPSERT_BATCH_SIZE = 200

# ===============
# Safety caps
# ===============
# Pinecone: metadata por record debe estar <= 40KB.
MAX_TEXT_CHARS = 12000
MAX_METADATA_BYTES = 38000  # margen bajo 40KB

print("INPUT_PATH:", INPUT_PATH)
print("PINECONE_INDEX_NAME:", PINECONE_INDEX_NAME)
print("PINECONE_NAMESPACE:", PINECONE_NAMESPACE)
print("PINECONE_INDEX_HOST (si ya lo tienes):", PINECONE_INDEX_HOST or "(vacío)")


## 2) Validación rápida de configuración

In [None]:
missing = []
if not GEMINI_API_KEY:
    missing.append("GEMINI_API_KEY")
if not PINECONE_API_KEY:
    missing.append("PINECONE_API_KEY")
if not PINECONE_INDEX_NAME:
    missing.append("PINECONE_INDEX_NAME")

if missing:
    print("Faltan variables:", ", ".join(missing))
    print("Recomendado: setearlas como env vars antes de ejecutar.")
else:
    print("OK: variables mínimas presentes")


## 3) Pinecone — listar índices / crear índice / resolver host
Si ya tienes el **host**, puedes saltarte a la siguiente celda.

Notas:
- Este bloque usa el cliente de control-plane (`pinecone`) para listar/crear/describir índices.
- El bloque de upsert/query usará el cliente gRPC (`pinecone[grpc]`) y requiere `host`.

In [None]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=PINECONE_API_KEY)

# 1) Listar índices disponibles
try:
    idx_list = pc.list_indexes()
    # Compatibilidad: a veces devuelve objeto con .names()
    names = idx_list.names() if hasattr(idx_list, "names") else idx_list
    print("Índices en tu cuenta:", names)
except Exception as e:
    print("No se pudo listar índices:", repr(e))


In [None]:
# 2) Crear índice si NO existe (opcional)
# Solo corre esta celda si quieres que el notebook cree el índice.
# Debe coincidir con EMBED_DIM (768) y métrica cosine.

from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=PINECONE_API_KEY)

def index_exists(name: str) -> bool:
    try:
        idx_list = pc.list_indexes()
        names = idx_list.names() if hasattr(idx_list, "names") else idx_list
        return name in list(names)
    except Exception:
        return False

if index_exists(PINECONE_INDEX_NAME):
    print("El índice ya existe:", PINECONE_INDEX_NAME)
else:
    print("Creando índice:", PINECONE_INDEX_NAME)
    pc.create_index(
        name=PINECONE_INDEX_NAME,
        dimension=EMBED_DIM,
        metric=PINECONE_METRIC,
        spec=ServerlessSpec(cloud=PINECONE_CLOUD, region=PINECONE_REGION),
    )
    print("Solicitud de creación enviada. (Puede tardar un poco en estar listo)")


In [None]:
# 3) Describir índice y resolver HOST (si PINECONE_INDEX_HOST está vacío)
from pinecone import Pinecone

pc = Pinecone(api_key=PINECONE_API_KEY)

desc = pc.describe_index(PINECONE_INDEX_NAME)
# Compatibilidad: desc puede ser dict u objeto
host = None
if isinstance(desc, dict):
    host = desc.get("host") or desc.get("status", {}).get("host")
else:
    host = getattr(desc, "host", None) or getattr(getattr(desc, "status", None), "host", None)

if host:
    print("HOST del índice:", host)
    if not PINECONE_INDEX_HOST:
        PINECONE_INDEX_HOST = host
        print("PINECONE_INDEX_HOST actualizado en memoria (solo para esta sesión del notebook).")
else:
    print("No pude resolver el host. Revisa en la consola de Pinecone y pega PINECONE_INDEX_HOST manualmente.")


## 4) Helpers: lectura, normalización de metadata (Pinecone), hashing e IDs

In [None]:
import json
import hashlib
import re
import unicodedata
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

def to_ascii_id(s: str, max_len: int = 512) -> str:
    """Convierte a ASCII seguro para IDs de Pinecone:
    - quita acentos/diacríticos (Caléndula -> Calendula)
    - reemplaza caracteres no permitidos por '_'
    """
    s = str(s or "")
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = s.strip()
    s = re.sub(r"[^A-Za-z0-9._:-]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return (s[:max_len] if s else "unknown")

def _sha256(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()

def _utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def _is_flat_value(v: Any) -> bool:
    # Pinecone metadata values válidos: string/number/bool/list[str]; no dict; no null.
    if v is None:
        return False
    if isinstance(v, (str, int, float, bool)):
        return True
    if isinstance(v, list) and all(isinstance(x, str) for x in v):
        return True
    return False

def _to_list_str(v: Any) -> Optional[List[str]]:
    if v is None:
        return None
    if isinstance(v, list):
        out = []
        for x in v:
            if x is None:
                continue
            s = str(x).strip()
            if s:
                out.append(s)
        return out or None
    if isinstance(v, str):
        parts = [p.strip() for p in v.split(",")]
        parts = [p for p in parts if p]
        return parts or None
    s = str(v).strip()
    return [s] if s else None

def _clean_metadata(md: Dict[str, Any]) -> Dict[str, Any]:
    """Enforce Pinecone constraints:
      - flat JSON, no nested objects
      - no null values (omit keys)
      - values only: str/number/bool/list[str]
      - keys must not start with '$'
    """
    clean: Dict[str, Any] = {}
    for k, v in (md or {}).items():
        if v is None:
            continue
        k = str(k)
        if not k or k.startswith("$"):
            continue

        # Normalize lists
        if k in {"health_goal_tags", "ingredient_tags", "source_pages"}:
            v = _to_list_str(v)
            if not v:
                continue

        # Pinecone metadata no soporta dicts
        if isinstance(v, dict):
            continue

        # If list but not list[str] -> coerce
        if isinstance(v, list) and not all(isinstance(x, str) for x in v):
            v = _to_list_str(v)
            if not v:
                continue

        # Final type check
        if not _is_flat_value(v):
            continue

        # Drop empty strings / empty lists
        if isinstance(v, str) and not v.strip():
            continue
        if isinstance(v, list) and len(v) == 0:
            continue

        clean[k] = v

    return clean

def _metadata_bytes(md: Dict[str, Any]) -> int:
    return len(json.dumps(md, ensure_ascii=False).encode("utf-8"))

def _truncate_text_to_fit(md: Dict[str, Any], text_key: str = "text") -> Dict[str, Any]:
    """Garantiza que metadata <= MAX_METADATA_BYTES truncando `text` si hace falta."""
    md = dict(md)
    text = md.get(text_key, "") or ""
    if not isinstance(text, str):
        text = str(text)

    # cap básico
    if len(text) > MAX_TEXT_CHARS:
        text = text[:MAX_TEXT_CHARS].rstrip()
        md[text_key] = text

    # cap por bytes
    while _metadata_bytes(md) > MAX_METADATA_BYTES and len(text) > 2000:
        text = text[: int(len(text) * 0.9)].rstrip()
        md[text_key] = text

    return md

def _build_chunk_id(product_id: str, section: str, chunk_index: int, content_hash: str) -> str:
    # Pinecone exige Vector ID ASCII. Sanitizamos product_id/section.
    pid = to_ascii_id(product_id)
    sec = to_ascii_id(section)
    return f"{pid}::{sec}::{int(chunk_index)}::{content_hash[:8]}"


## 5) Cargar input (JSONL chunks o JSON productos)

In [None]:
def load_jsonl(path: str) -> List[Dict[str, Any]]:
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            rows.append(json.loads(line))
    return rows

def load_json(path: str) -> Any:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

raw = None
if INPUT_PATH.lower().endswith(".jsonl"):
    raw = load_jsonl(INPUT_PATH)
    input_mode = "jsonl_chunks"
elif INPUT_PATH.lower().endswith(".json"):
    raw = load_json(INPUT_PATH)
    input_mode = "json_products"
else:
    raise ValueError("INPUT_PATH debe ser .jsonl o .json")

print("input_mode =", input_mode)
print("records =", len(raw) if isinstance(raw, list) else type(raw))


## 6) Transformar a `rag_contract_v1`
Ajusta `SECTION_FIELDS` si tu JSON de productos usa llaves distintas.

In [None]:
PRODUCT_ID_FIELD = "product_id"
PRODUCT_NAME_FIELD = "product_name"

SECTION_FIELDS = {
    "overview": ["description", "benefits", "indications", "how_it_works"],
    "ingredients": ["ingredients", "active_ingredients", "composition"],
    "usage_safety": ["dosage", "usage", "warnings", "contraindications", "storage"],
}

def build_section_text(product: Dict[str, Any], section: str) -> str:
    parts = []
    for key in SECTION_FIELDS.get(section, []):
        val = product.get(key)
        if val is None:
            continue
        if isinstance(val, list):
            val = ", ".join([str(x) for x in val if x is not None])
        val = str(val).strip()
        if not val:
            continue
        parts.append(f"{key}: {val}")
    return "\n".join(parts).strip()

def normalize_chunks_from_products(products: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    chunks: List[Dict[str, Any]] = []
    for p in products:
        product_id = str(p.get(PRODUCT_ID_FIELD, "")).strip() or "unknown_product"
        product_name = str(p.get(PRODUCT_NAME_FIELD, "")).strip() or "unknown_product_name"

        section_texts: List[Tuple[str, str]] = []
        for section in SECTION_FIELDS.keys():
            txt = build_section_text(p, section)
            if txt:
                section_texts.append((section, txt))

        chunk_total = max(1, len(section_texts))
        if not section_texts:
            section_texts = [("overview", json.dumps(p, ensure_ascii=False)[:MAX_TEXT_CHARS])]

        for idx, (section, text) in enumerate(section_texts):
            text = text[:MAX_TEXT_CHARS].strip()
            content_hash = _sha256(text)

            md = {
                "text": text,
                "product_id": product_id,
                "product_name": product_name,
                "section": section,
                "language": p.get("language", "es"),
                "health_goal_tags": p.get("health_goal_tags"),
                "ingredient_tags": p.get("ingredient_tags"),
                "product_type": p.get("product_type"),
                "dosage_form": p.get("dosage_form"),
                "registration_number": p.get("registration_number"),
                "source_pdf": p.get("source_pdf"),
                "source_pages": p.get("source_pages"),
                "content_status": p.get("content_status", CONTENT_STATUS_DEFAULT),
                "record_type": RECORD_TYPE,
                "content_hash": content_hash,
                "chunk_index": idx,
                "chunk_total": chunk_total,
                "data_version": DATA_VERSION,
                "schema_version": SCHEMA_VERSION,
                "ingested_at": _utc_iso(),
            }

            md = _clean_metadata(md)
            md = _truncate_text_to_fit(md, "text")
            chunk_id = _build_chunk_id(product_id, section, idx, md["content_hash"])
            chunks.append({"id": chunk_id, "text": md["text"], "metadata": md})
    return chunks

def normalize_chunks_from_jsonl(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    chunks: List[Dict[str, Any]] = []
    for r in rows:
        md = r.get("metadata", {}) if isinstance(r, dict) else {}
        text = r.get("text") or md.get("text") or ""
        text = str(text).strip()
        if not text:
            continue

        product_id = str(md.get("product_id", r.get("product_id", "unknown_product"))).strip()
        product_name = str(md.get("product_name", r.get("product_name", "unknown_product_name"))).strip()
        section = str(md.get("section", r.get("section", "overview"))).strip() or "overview"
        language = str(md.get("language", r.get("language", "es"))).strip() or "es"

        text = text[:MAX_TEXT_CHARS].strip()
        content_hash = _sha256(text)

        md_final = dict(md)
        md_final.update({
            "text": text,
            "product_id": product_id,
            "product_name": product_name,
            "section": section,
            "language": language,
            "record_type": md.get("record_type", RECORD_TYPE),
            "content_hash": md.get("content_hash", content_hash),
            "schema_version": md.get("schema_version", SCHEMA_VERSION),
            "data_version": md.get("data_version", DATA_VERSION),
            "ingested_at": md.get("ingested_at", _utc_iso()),
        })

        md_final.setdefault("content_status", CONTENT_STATUS_DEFAULT)
        md_final.setdefault("chunk_index", int(md_final.get("chunk_index", 0) or 0))
        md_final.setdefault("chunk_total", int(md_final.get("chunk_total", 1) or 1))

        md_final = _clean_metadata(md_final)
        md_final = _truncate_text_to_fit(md_final, "text")

        chunk_id = r.get("id")
        if chunk_id:
            chunk_id = to_ascii_id(chunk_id)
        else:
            chunk_id = _build_chunk_id(product_id, section, md_final["chunk_index"], md_final["content_hash"])
        chunks.append({"id": chunk_id, "text": md_final["text"], "metadata": md_final})

    return chunks

if input_mode == "json_products":
    chunks = normalize_chunks_from_products(raw)
else:
    chunks = normalize_chunks_from_jsonl(raw)

print("normalized chunks:", len(chunks))
print("sample:", json.dumps(chunks[0], ensure_ascii=False)[:900], "...")


## 7) (Opcional) Guardar JSONL normalizado

In [None]:
with open(OUTPUT_JSONL_PATH, "w", encoding="utf-8") as f:
    for c in chunks:
        f.write(json.dumps(c, ensure_ascii=False) + "\n")

print("Wrote:", OUTPUT_JSONL_PATH)


## 8) Embeddings con Gemini (batch) + Upsert a Pinecone (gRPC)

In [None]:
from google import genai
from google.genai import types
from pinecone.grpc import PineconeGRPC as PineconeGRPC

if not GEMINI_API_KEY:
    raise ValueError("Falta GEMINI_API_KEY")
if not PINECONE_API_KEY:
    raise ValueError("Falta PINECONE_API_KEY")
if not PINECONE_INDEX_HOST:
    raise ValueError("Falta PINECONE_INDEX_HOST. Ejecuta la celda para resolver host o pégalo manualmente.")

gclient = genai.Client(api_key=GEMINI_API_KEY)

pc_grpc = PineconeGRPC(api_key=PINECONE_API_KEY)
index = pc_grpc.Index(host=PINECONE_INDEX_HOST)

def batched(lst: List[Any], n: int) -> Iterable[List[Any]]:
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def embed_texts(texts: List[str]) -> List[List[float]]:
    res = gclient.models.embed_content(
        model=EMBED_MODEL,
        contents=texts,
        config=types.EmbedContentConfig(
            task_type="RETRIEVAL_DOCUMENT",
            output_dimensionality=EMBED_DIM,
        ),
    )
    return [e.values for e in res.embeddings]

upserted = 0

for batch_docs in batched(chunks, EMBED_BATCH_SIZE):
    texts = [d["metadata"]["text"] for d in batch_docs]
    vectors = embed_texts(texts)

    pinecone_vectors = []
    for d, v in zip(batch_docs, vectors):
        pinecone_vectors.append({
            "id": d["id"],
            "values": v,
            "metadata": d["metadata"],
        })

    index.upsert(vectors=pinecone_vectors, namespace=PINECONE_NAMESPACE)
    upserted += len(pinecone_vectors)

    if upserted % (EMBED_BATCH_SIZE * 10) == 0:
        print("Upserted so far:", upserted)

print("DONE. Total upserted:", upserted)


## 9) Ver stats del índice (namespaces, conteos)

In [None]:
stats = index.describe_index_stats(namespace=PINECONE_NAMESPACE)
print(stats)


## 10) Smoke test: query top_k (Gemini RETRIEVAL_QUERY + Pinecone query)

In [None]:
def embed_query(text: str) -> List[float]:
    res = gclient.models.embed_content(
        model=EMBED_MODEL,
        contents=text,
        config=types.EmbedContentConfig(
            task_type="RETRIEVAL_QUERY",
            output_dimensionality=EMBED_DIM,
        ),
    )
    return res.embeddings[0].values

query = "¿Para qué sirve este producto y cómo se usa?"
qvec = embed_query(query)

res = index.query(
    namespace=PINECONE_NAMESPACE,
    vector=qvec,
    top_k=5,
    include_metadata=True,
    include_values=False,
)

matches = getattr(res, "matches", []) or []
print("matches:", len(matches))

for i, m in enumerate(matches, start=1):
    md = m.metadata or {}
    print(f"\n#{i} score={m.score:.4f} id={m.id}")
    print("product:", md.get("product_name"), "| section:", md.get("section"))
    snippet = md.get("text","")
    print("text_snippet:", (snippet[:220] + "...") if snippet else "")
