# 4. Preparar Documentos
En este notebook se desarrolla el preprocesamiento necesario para la generación de embeddings a partir de los textos normativos, con el objetivo de indexar la información en una base Redis para su posterior consulta semántica y recuperación eficiente. El proceso abarca la preparación de los textos, el uso de modelos de lenguaje para obtener representaciones vectoriales (embeddings) y la estructuración de la información para su integración en el pipeline de gestión normativa.


In [115]:
# library
import pandas as pd
from typing import List, Union
import tiktoken
import redis as RedisClient

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores.redis.base import Redis as RedisVectorStore

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from collections import Counter


import numpy as np
from redis.commands.search.query import Query
from IPython.display import display, HTML



In [None]:
# -------------------------------
# CONSTANTS
# -------------------------------
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_USERNAME = "default"
REDIS_PASSWORD = ""
REDIS_INDEX = "normativas_sii"
GPT_KEY = "sk-proj--....."  # ← Reemplázalo por tu clave real
REDIS_URL = f"redis://{REDIS_USERNAME}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
DATA_DIR ="../raw_data"
CSV_PATH = "classified_regulations.csv"
MODEL_NAME = "text-embedding-3-large"  # Modelo de OpenAI para embeddings
# Tokenizador para OpenAI embeddings
tokenizer = tiktoken.encoding_for_model(MODEL_NAME)
MAX_TOKENS = 30000
SAFE_TOKENS = 25000  # margen de seguridad

## 4.1 Carga de Datos
En esta sección se realiza la carga de los datos normativos previamente preprocesados. Los datos contienen los textos completos y la metadata relevante asociada a cada normativa. Esta información será utilizada como insumo para la generación de los embeddings.

Se aplica un preprocesamiento básico a los textos normativos para optimizar la calidad de los embeddings generados. Esto puede incluir normalización de caracteres, eliminación de símbolos innecesarios y, en algunos casos, reducción de ruido textual. El objetivo es asegurar que la representación vectorial capture el contenido semántico relevante de cada normativa.

Utilizando un modelo de lenguaje preentrenado, se obtienen los embeddings para cada texto normativo. Estos vectores numéricos permiten comparar y recuperar información normativa de manera eficiente, habilitando funcionalidades avanzadas de búsqueda y clasificación semántica.




In [117]:
# -------------------------------
# FUNCTIONS
# -------------------------------
def contar_tokens(texto: str) -> int:
    """
    Counts the number of tokens in a given text using the default tokenizer.

    Parameters:
    - texto (str): The input text to tokenize and count.

    Returns:
    - int: The number of tokens in the provided text.
    """
    return len(tokenizer.encode(texto))


def resumen_chunks_por_documento(documentos):
    """
    Imprime un resumen del número de chunks generados por cada documento.

    Para cada documento en la lista, cuenta cuántos chunks (fragmentos) le corresponden
    y muestra el total de chunks junto a un resumen por documento.

    Parámetros:
    - documentos (list): Lista de objetos documento, cada uno debe tener un atributo 'metadata' 
      con al menos la clave 'name'.

    Retorna:
    - None. Solo imprime el resumen en consola.
    """
    conteo = Counter(doc.metadata.get("name", "Sin nombre") for doc in documentos)
    print(f"Total de chunks generados: {len(documentos)}")
    print("Resumen por documento:")
    for nombre, cantidad in conteo.items():
        print(f"- {nombre}: {cantidad} chunk(s)")



def preparar_documentos(path_csv: str,
                        n: int = None,
                        anios: Union[int, List[int]] = None,
                        chunk_size: int = None,
                        chunk_overlap: int = None) -> List[Document]:
    """
    Carga y prepara documentos desde un archivo CSV para procesamiento de NLP o indexación.

    - Lee el archivo CSV y filtra filas con valor nulo en la columna "corpus".
    - Permite filtrar documentos por uno o varios años especificados en 'anios'.
    - Puede limitar la cantidad de documentos a los primeros 'n' del DataFrame.
    - Si se indican 'chunk_size' y 'chunk_overlap', fragmenta los textos largos en chunks seguros.
    - Solo incluye documentos o fragmentos que no superen el límite seguro de tokens.
    - Los metadatos relevantes se asocian a cada documento.

    Parámetros:
    - path_csv (str): Ruta al archivo CSV de entrada.
    - n (int, opcional): Número máximo de documentos a procesar (None = todos).
    - anios (int o List[int], opcional): Año(s) a filtrar en la columna 'nombre'.
    - chunk_size (int, opcional): Tamaño máximo de chunk (en tokens) para fragmentar los textos largos.
    - chunk_overlap (int, opcional): Cantidad de tokens de solapamiento entre chunks.

    Retorna:
    - List[Document]: Lista de objetos Document listos para procesamiento o indexación.

    Notas:
    - Usa la variable SAFE_TOKENS para controlar el límite máximo de tokens por documento o chunk.
    - Si un documento supera el límite y no se especifica chunking, se omite y se imprime advertencia.
    - Requiere la función contar_tokens() y la clase Document definida/importada.
    """
    
    df = pd.read_csv(path_csv).dropna(subset=["corpus"])
    
    # Filtro por año
    if anios:
        if isinstance(anios, int):
            anios = [anios]
        # Convierte a int si tus años están como string
        df = df[df["anno"].isin(anios)]


    if n:
        df = df.head(n)
    print(f"📄 Cargando {len(df)} documentos desde '{path_csv}' (filtrados por año: {anios})...")
    documentos = []
    splitter = None

    if chunk_size and chunk_overlap:
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ".", " "]
        )

    for _, row in df.iterrows():
        metadatos = {
            "name": row.get("nombre", ""),
            "description": row.get("descripcion", ""),
            "fuente": row.get("fuente", ""),
            "url": row.get("url", ""),
            "tipo_documento": row.get("tipo_documento", ""),
            "relevancia": row.get("relevancia", ""),
            "explicacion": row.get("explicacion", ""),
            "anno": safe_int(row.get("anno", "")),
            "n_tokens": safe_int(contar_tokens(str(row["corpus"]).strip()))
        }
        cuerpo = str(row["corpus"]).strip()
        total_tokens = contar_tokens(cuerpo)

        if total_tokens <= SAFE_TOKENS:
            documentos.append(Document(page_content=cuerpo, metadata=metadatos))
        elif splitter:
            partes = splitter.split_text(cuerpo)
            for parte in partes:
                if contar_tokens(parte) <= SAFE_TOKENS:
                    documentos.append(Document(page_content=parte, metadata=metadatos))
                else:
                    print(f"⚠️ Chunk excede el máximo seguro de tokens, se omitió parcialmente.")
        else:
            print(f"⚠️ Documento omitido por superar límite de tokens y no hay splitter definido.")

    return documentos

def indexar_en_redis(documentos, redis_url, redis_index, gpt_key, eliminar_anteriores=False):
    """
    Indexa una lista de documentos en una base vectorial Redis a través de LangChain.

    - Permite eliminar el índice previo en Redis antes de indexar, si se solicita.
    - Controla el límite de tokens por documento y por lote, omitiendo o fragmentando cuando es necesario.
    - Indexa los documentos en lotes para evitar superar el límite seguro de tokens por operación.
    - Registra los documentos que superan los límites de tokens o generan errores durante la indexación.
    - Guarda un CSV con los errores de indexación, si existen.

    Parámetros:
    - documentos (list): Lista de objetos Document a indexar.
    - redis_url (str): URL de conexión a la base de datos Redis.
    - redis_index (str): Nombre del índice vectorial en Redis.
    - gpt_key (str): API key de OpenAI para embeddings.
    - eliminar_anteriores (bool, opcional): Si True, elimina el índice previo antes de indexar (por defecto False).

    Retorna:
    - int: Número total de documentos efectivamente indexados.

    Notas:
    - Usa la variable SAFE_TOKENS como límite máximo de tokens por documento/lote.
    - Requiere las funciones contar_tokens(), la clase Document, y las clases de embeddings e integración Redis de LangChain.
    - Los errores durante la indexación quedan registrados en 'errores_indexacion.csv'.
    """
    # ... tu código ...

    embeddings = OpenAIEmbeddings(
        model=MODEL_NAME,
        openai_api_key=GPT_KEY
    )

    if eliminar_anteriores:
        print("🧹 Eliminando índice anterior...")
        try:
            r = RedisClient.from_url(redis_url)
            r.ft(redis_index).dropindex(delete_documents=True)
            print(f"🧹 Índice '{redis_index}' eliminado correctamente.")
        except Exception as e:
            print(f"⚠️ No se pudo eliminar el índice '{redis_index}' (puede que no exista): {e}")

    index_schema = {
        "text": [
            {"name": "name"},
            {"name": "description"},
            {"name": "fuente"},
            {"name": "url"},
            {"name": "tipo_documento"},
            {"name": "relevancia"},
            {"name": "explicacion"},
        ],
        "numeric": [
            {"name": "anno"},
            {"name": "n_tokens"},
        

        ],
        "tag": []
    }

    print(f"🚀 Iniciando indexación de {len(documentos)} documentos (con control de tokens)...")

    errores = []
    indexados = []
    lote_actual = []
    tokens_lote = 0

    for i, doc in enumerate(documentos):
        texto = doc.page_content
        tokens_doc = contar_tokens(texto)
        if tokens_doc > SAFE_TOKENS:
            errores.append({
                "indice": i + 1,
                "name": doc.metadata.get('name', f'doc_{i}'),
                "tokens": tokens_doc,
                "error": "Supera SAFE_TOKENS individualmente"
            })
            continue
        if tokens_lote + tokens_doc > SAFE_TOKENS:
            # Indexar lote actual
            RedisVectorStore.from_documents(
                documents=lote_actual,
                embedding=embeddings,
                redis_url=redis_url,
                index_name=redis_index,
                index_schema=index_schema
            )
            indexados.extend(lote_actual)
            print(f"✅ Lote de {len(lote_actual)} documentos indexado.")
            # Resetear lote
            lote_actual = []
            tokens_lote = 0

        lote_actual.append(doc)
        tokens_lote += tokens_doc

    # Indexar último lote si quedó algo pendiente
    if lote_actual:
        try:
            RedisVectorStore.from_documents(
                documents =lote_actual,
                embedding=embeddings,
                redis_url=redis_url,
                index_name=redis_index,
                index_schema=index_schema
            )
            indexados.extend(lote_actual)
            print(f"✅ Último lote de {len(lote_actual)} documentos indexado.")
        except Exception as e:
            print(f"❌ Error en último lote: {str(e)}")
            
            for j, d in enumerate(lote_actual):
                errores.append({
                    "indice": len(indexados) + j + 1,
                    "name": d.metadata.get('name', f'doc_{j}'),
                    "tokens": contar_tokens(d.page_content),
                    "error": f"Error en lote final: {str(e)}"
                })

    print(f"📦 Total indexados: {len(indexados)} / {len(documentos)}")

    if errores:
        df_err = pd.DataFrame(errores)
        df_err.to_csv("errores_indexacion.csv", index=False, encoding="utf-8-sig")
        print("📄 Errores registrados en 'errores_indexacion.csv'")

    return len(indexados)

def buscar_knn_normativas(query: str, k: int = 5, min_score: float = 0.0):
    """
    Realiza una búsqueda KNN (k-Nearest Neighbors) en Redis para encontrar normativas similares a una consulta.

    - Obtiene el embedding de la consulta usando el modelo de OpenAI.
    - Ejecuta la búsqueda vectorial KNN sobre el índice 'normativas_sii' en Redis.
    - Procesa los resultados filtrando por un puntaje mínimo (min_score).
    - Presenta los resultados en una tabla HTML interactiva, donde el nombre enlaza a la URL de la normativa (si está disponible).
    - Devuelve los resultados como un DataFrame de pandas.

    Parámetros:
    - query (str): Consulta de texto a buscar.
    - k (int, opcional): Número de vecinos más cercanos a recuperar (default = 5).
    - min_score (float, opcional): Puntaje mínimo requerido para incluir un resultado (default = 0.0).

    Retorna:
    - pd.DataFrame: DataFrame con los resultados filtrados, incluyendo columnas 'name', 'relevancia', 'explicacion' y 'vector_score'.

    Notas:
    - Requiere variables globales para la conexión Redis: REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_USERNAME, REDIS_PASSWORD, y GPT_KEY.
    - Utiliza el modelo 'text-embedding-3-large' de OpenAI para embeddings.
    - El enlace en la columna 'name' solo aparece si la URL está disponible.
    - Muestra la tabla en HTML directamente en el notebook.

 
    """
    
    # Obtener embedding
    embeddings = OpenAIEmbeddings(
        model=MODEL_NAME,  # Dimensión fija de 6144
        openai_api_key=GPT_KEY
    )
    query_vector = embeddings.embed_query(query)

    # Conexión Redis
    r = RedisClient.Redis(
        host=REDIS_HOST,
        port=int(REDIS_PORT),
        db=REDIS_DB,
        username=REDIS_USERNAME,
        password=REDIS_PASSWORD,
        decode_responses=True
    )

    # Búsqueda
    base_query = f"*=>[KNN {k} @content_vector $vec as vector_score]"
    q = (
        Query(base_query)
        .return_fields("name", "relevancia", "explicacion", "url", "vector_score")
        .sort_by("vector_score")
        .dialect(2)
    )

    results = r.ft("normativas_sii").search(
        q, query_params={"vec": np.array(query_vector, dtype=np.float32).tobytes()}
    )

    # Procesar resultados
    filas = []
    for doc in results.docs:
        score = float(doc.vector_score)
        if score >= min_score:
            nombre = getattr(doc, "name", "")
            url = getattr(doc, "url", "")
            nombre_link = f'<a href="{url}" target="_blank">{nombre}</a>' if url else nombre

            filas.append({
                "name": nombre_link,
                "relevancia": getattr(doc, "relevancia", ""),
                "explicacion": getattr(doc, "explicacion", ""),
                "vector_score": round(score, 4)
            })

    # Mostrar en HTML
    df = pd.DataFrame(filas)
    display(HTML(df.to_html(escape=False, index=False)))

    return df

def safe_int(val):
    try:
        return int(val)
    except (ValueError, TypeError):
        return None

## 4.2 Preprocesar Documentos
Se estructura la información para su integración en Redis, asignando claves únicas a cada normativa y asociando sus respectivos embeddings. Esta organización es fundamental para facilitar la recuperación rápida y precisa de normativas relevantes mediante consultas vectoriales.



In [118]:
# Mostrar muestra de documentos cargados
documentos = preparar_documentos(
    path_csv=f"{DATA_DIR}/{CSV_PATH}",
    n=None,
    anios=[2020,2021,2022,2023,2024],
    chunk_size=1000,
    chunk_overlap=200
)
print(f"Total de documentos preparados: {len(documentos)}")


📄 Cargando 1032 documentos desde '../raw_data/classified_regulations.csv' (filtrados por año: [2020, 2021, 2022, 2023, 2024])...
Total de documentos preparados: 3039


In [119]:
resumen_chunks_por_documento(documentos)

Total de chunks generados: 3039
Resumen por documento:
- Resolución Exenta SII N° 143 del 23 de Noviembre del 2020: 1 chunk(s)
- Resolución Exenta SII N° 86 del 30 de Julio del 2020: 1 chunk(s)
- Resolución Exenta SII N° 25 del 04 de Marzo del 2022: 1 chunk(s)
- Circular N° 28 del 09 de Julio del 2024: 1 chunk(s)
- Resolución Exenta SII N° 64 del 07 de Junio del 2024: 1 chunk(s)
- Resolución Exenta SII N° 121 del 19 de Diciembre del 2024: 1 chunk(s)
- Circular N° 2 del 09 de Enero del 2020: 1 chunk(s)
- Resolución Exenta SII N° 95 del 31 de Agosto del 2023: 1 chunk(s)
- Resolución Exenta SII N° 117 del 12 de Diciembre del 2024: 1 chunk(s)
- Resolución Exenta SII N° 31 del 11 de Abril del 2022: 1 chunk(s)
- Circular N° 59 del 10 de Septiembre del 2020: 1 chunk(s)
- Resolución Exenta SII N° 03 del 12 de Enero del 2021: 1 chunk(s)
- Resolución Exenta SII N° 146 del 21 de Diciembre del 2023: 1 chunk(s)
- Resolución Exenta SII N° 37 del 21 de Marzo del 2023: 1 chunk(s)
- Circular N° 29 del 

# 4.3 Preparación para indexación en Redis
Se estructura la información para su integración en Redis, asignando claves únicas a cada normativa y asociando sus respectivos embeddings. Esta organización es fundamental para facilitar la recuperación rápida y precisa de normativas relevantes mediante consultas vectoriales.

Se realiza la conexión con la instancia de Redis y se procede a la carga de los embeddings generados junto con la metadata correspondiente. Este paso habilita la infraestructura para consultas semánticas, permitiendo la búsqueda y recomendación de normativas en función de la similitud entre textos.


In [120]:
indexar_en_redis(documentos, REDIS_URL, REDIS_INDEX, GPT_KEY, eliminar_anteriores=True)

🧹 Eliminando índice anterior...
⚠️ No se pudo eliminar el índice 'normativas_sii' (puede que no exista): normativas_sii: no such index
🚀 Iniciando indexación de 3039 documentos (con control de tokens)...
✅ Lote de 11 documentos indexado.
✅ Lote de 9 documentos indexado.
✅ Lote de 12 documentos indexado.
✅ Lote de 10 documentos indexado.
✅ Lote de 7 documentos indexado.
✅ Lote de 10 documentos indexado.
✅ Lote de 9 documentos indexado.
✅ Lote de 16 documentos indexado.
✅ Lote de 12 documentos indexado.
✅ Lote de 10 documentos indexado.
✅ Lote de 16 documentos indexado.
✅ Lote de 10 documentos indexado.
✅ Lote de 13 documentos indexado.
✅ Lote de 16 documentos indexado.
✅ Lote de 6 documentos indexado.
✅ Lote de 4 documentos indexado.
✅ Lote de 9 documentos indexado.
✅ Lote de 4 documentos indexado.
✅ Lote de 9 documentos indexado.
✅ Lote de 2 documentos indexado.
✅ Lote de 13 documentos indexado.
✅ Lote de 11 documentos indexado.
✅ Lote de 11 documentos indexado.
✅ Lote de 16 documentos

3039

## 4.4 Validación del proceso
Se efectúan pruebas de validación sobre la carga y consulta de embeddings en Redis para asegurar la correcta integración del flujo. Se verifican tanto la recuperación de los vectores como la consistencia de la metadata asociada a cada normativa.


In [121]:
df = buscar_knn_normativas("¿Qué normativas regulan las boletas electrónicas?", k=5)



name,relevancia,explicacion,vector_score
Resolución Exenta SII N° 108 del 08 de Septiembre del 2020,Relevante,Clave encontrada: Contiene 'boleta' | Clave encontrada: Contiene 'puntos de venta' | Clave encontrada: Contiene 'operadores',0.3554
Resolución Exenta SII N° 53 del 09 de Junio del 2022,Relevante,Clave encontrada: Contiene 'cumplimiento tributario',0.3595
Resolución Exenta SII N° 104 del 02 de Septiembre del 2020,Relevante,Clave encontrada: Contiene 'boleta',0.3648
Resolución Exenta SII N° 152 del 09 de Diciembre del 2020,Relevante,Clave encontrada: Contiene 'boleta',0.3715
Resolución Exenta SII N° 163 del 18 de Diciembre del 2020,Relevante,Clave encontrada: Contiene 'boleta' | Clave encontrada: Contiene 'cumplimiento tributario',0.3736


In [122]:
df = buscar_knn_normativas("Encontrar medios de pago", k=5)


name,relevancia,explicacion,vector_score
Resolución Exenta SII N° 106 del 06 de Septiembre del 2023,Relevante,Clave encontrada: Contiene 'medio de pago' | Clave encontrada: Contiene 'cumplimiento tributario' | Clave encontrada: Contiene 'POS' | Clave encontrada: Contiene 'no presencial' | Clave encontrada: Contiene 'pagos electrónicos',0.6062
Resolución Exenta SII N° 91 del 21 de Agosto del 2020,Relevante,Clave encontrada: Contiene 'POS',0.6222
Resolución Exenta SII N° 167 del 28 de Diciembre del 2020,Relevante,Clave encontrada: Contiene 'POS',0.6248
Resolución Exenta SII N° 76 del 06 de Julio del 2021,Relevante,Clave encontrada: Contiene 'boleta' | Clave encontrada: Contiene 'medio de pago' | Clave encontrada: Contiene 'medios de pago electrónicos' | Clave encontrada: Contiene 'operadores' | Clave encontrada: Contiene 'psp' | Clave encontrada: Contiene 'proveedores de servicios para procesamiento de pagos' | Clave encontrada: Contiene 'no presencial' | Clave encontrada: Contiene 'pagos electrónicos',0.6466
Resolución Exenta SII N° 176 del 31 de Diciembre del 2020,Relevante,Clave encontrada: Contiene 'boleta' | Clave encontrada: Contiene 'medio de pago' | Clave encontrada: Contiene 'medios de pago electrónicos' | Clave encontrada: Contiene 'POS' | Clave encontrada: Contiene 'pagos electrónicos',0.649


## 4.5 Conclusiones

En esta etapa se desarrolló el preprocesamiento y la preparación de los textos normativos para la generación de embeddings vectoriales, con el propósito de habilitar la indexación y recuperación semántica eficiente en una base de datos Redis. El flujo de trabajo implementado incluyó la carga de datos preprocesados, la normalización y depuración adicional de los textos, y la obtención de representaciones vectoriales mediante modelos de lenguaje avanzados.

Posteriormente, los documentos y sus embeddings fueron organizados y estructurados para su integración en Redis, asignando claves únicas y asociando la metadata relevante. Se estableció un pipeline robusto para la indexación, que contempla la gestión de límites de tokens, la fragmentación inteligente de textos largos (chunking), y el control de errores en la carga masiva, asegurando la integridad y trazabilidad de los datos.

Este proceso permite habilitar funcionalidades de búsqueda y recuperación semántica avanzada, donde consultas en lenguaje natural pueden ser eficientemente mapeadas y comparadas contra el corpus normativo. La infraestructura construida, basada en Redis y modelos de embeddings de última generación, sienta las bases para el desarrollo de sistemas de recomendación, análisis automatizado y consulta inteligente de normativas legales, contribuyendo a la modernización y eficiencia en la gestión documental y normativa.

La metodología implementada garantiza la reproducibilidad, escalabilidad y calidad del sistema, posicionando la solución para aplicaciones futuras en análisis jurídico, cumplimiento normativo asistido por IA y automatización avanzada de flujos legales.
