# Parte 2:  Sistema RAG - Recuperación y Generación (Inferencia)
*   **Autor:** Carolina Torres Zapata
*   **Fecha:** 2025-11-24
*   **Contexto:** Este es el componente final del sistema ("The App"). Aquí integramos la **Base de Conocimiento** construida previamente con un **LLM Generativo** (Llama 3 o similar) para responder preguntas de usuario.
*   **Objetivos del Notebook:**
     1.  **Recuperación (Retrieval):** Implementar un motor de búsqueda vectorial en memoria (rápido y eficiente) usando similitud de coseno.
     2.  **Generación (Generation):** Conectar con un LLM mediante `Databricks Serving Endpoints`.
     3.  **Grounding (Seguridad):** Diseñar un *System Prompt* estricto que obligue al modelo a responder **solo** con la información suministrada, mitigando alucinaciones.



## 0. Configuración del Entorno y Dependencias

Antes de iniciar el flujo de inferencia, aseguramos que el cluster tenga las herramientas necesarias para la orquestación del RAG.

*   **`sentence-transformers`**: Motor local para vectorizar la pregunta del usuario (debe coincidir con la versión usada en la ingesta).
*   **`databricks-sdk[openai]`**: Cliente oficial para interactuar con los **Serving Endpoints** (LLMs) de Databricks de forma segura.

**Nota Operativa:** Se ejecuta `dbutils.library.restartPython()` para reiniciar el proceso de Python y forzar la carga de las nuevas librerías instaladas sin necesidad de reiniciar todo el cluster.

In [0]:
# INSTALACIÓN DE LIBRERÍAS
# sentence-transformers: Para vectorizar la pregunta.
# databricks-sdk[openai]: Cliente necesario para hablar con Llama 3.
# mlflow: Para registro de experimentos.
%pip install -U -q sentence-transformers "databricks-sdk[openai]" mlflow databricks-agents

# REINICIO DEL KERNEL
# Obligatorio para aplicar cambios. Al terminar esta celda, la memoria se limpia.
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


## 1. Importar Librerías

In [0]:
import numpy as np
import pandas as pd
import mlflow
from sentence_transformers import SentenceTransformer
from databricks.sdk import WorkspaceClient

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:440)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:470)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:80)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:348)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:59)
	at com.databricks.logging.AttributionContext$.withValue(Attr

## 2. Conexión con el LLM
En un entorno operativo, los endpoints pueden cambiar de nombre o estar inactivos.
Implementamos una lógica de **"Health Check"**: iteramos sobre una lista de modelos aprobados (priorizando Llama 3-70B) y nos conectamos al primero que responda exitosamente. Esto evita que el pipeline falle por un error de configuración estática.

In [0]:
# CONFIGURACIÓN DEL MODELO LLM
LLM_ENDPOINT_NAME = None

def is_endpoint_available(endpoint_name):
    """Verifica si un endpoint responde."""
    try:
        client = WorkspaceClient().serving_endpoints.get_open_ai_client()
        client.chat.completions.create(
            model=endpoint_name, 
            messages=[{"role": "user", "content": "Test"}]
        )
        return True
    except Exception:
        return False

print("🔄 Buscando endpoint activo...")

# Lista de candidatos (Llama 3 es la prioridad)
candidates = [
    "databricks-meta-llama-3-3-70b-instruct", 
    "databricks-meta-llama-3-1-70b-instruct",
    "databricks-claude-3-7-sonnet"
]

for candidate in candidates:
    if is_endpoint_available(candidate):
        LLM_ENDPOINT_NAME = candidate
        break

# Validación estricta: Si no hay modelo, detenemos el notebook
assert LLM_ENDPOINT_NAME is not None, "❌ No se encontró ningún modelo activo."

print(f"🚀 Conectado exitosamente a: {LLM_ENDPOINT_NAME}")

🔄 Buscando endpoint activo...
🚀 Conectado exitosamente a: databricks-meta-llama-3-3-70b-instruct


## 3. Carga de Recursos (Base de Conocimiento)
Inicialización del Motor de Búsqueda (In-Memory)
Cargamos los dos componentes críticos para la fase de recuperación:
1.  **Base de Conocimiento (Vectores):** La tabla Silver `rag_embeddings` convertida a matrices de NumPy para cálculos matemáticos ultrarrápidos.
2.  **Encoder (Modelo de Embeddings):** El mismo modelo `all-MiniLM-L6-v2` usado en la ingesta. *Nota: Es vital usar exactamente el mismo modelo para que los vectores sean comparables.*

In [0]:
#  CARGA DE MOTOR DE BÚSQUEDA

print("⏳ Cargando recursos en memoria...")

# A. Cargar Tabla Silver (Knowledge Base)
TABLA_VECTORES = "dev.silver.rag_embeddings" 

try:
    df_kb = spark.read.table(TABLA_VECTORES).toPandas()
    
    # Convertir a matriz NumPy para velocidad
    kb_matrix = np.stack(df_kb["embedding"].values)
    kb_texts = df_kb["chunk_text"].values
    
    print(f"   ✅ Base de datos cargada: {len(kb_texts)} documentos.")

    # B. Cargar Modelo de Embeddings (Local)
    embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
    
    print("   ✅ Modelo de vectorización (all-MiniLM-L6-v2) listo.")

except Exception as e:
    print(f"❌ Error cargando recursos: {e}")

⏳ Cargando recursos en memoria...
   ✅ Base de datos cargada: 12 documentos.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

   ✅ Modelo de vectorización (all-MiniLM-L6-v2) listo.


##  4. Lógica del Sistema RAG (Funciones)
Implementamos la búsqueda semántica mediante **Producto Punto (Similitud de Coseno)**.
El flujo es:
1.  El usuario hace una pregunta -> Se convierte en vector.
2.  Comparamos ese vector contra los 12 vectores de nuestra base de datos.
3.  Seleccionamos los `k=3` fragmentos más similares (con mayor puntaje).

Aplicamos técnicas de **Prompt Engineering** para soporte operativo:
*   **Rol:** "Asistente Técnico experto en Databricks".
*   **Restricción Negativa:** Si el contexto no tiene la respuesta, el modelo debe admitirlo explícitamente (*"La información disponible no menciona..."*). Esto es crucial para evitar engañar al usuario.

In [0]:
# DEFINICIÓN DE FUNCIONES (CORE)

def recuperar_contexto(pregunta, k=3):
    """Vectoriza la pregunta y busca los 3 fragmentos más similares."""
    # 1. Vectorizar
    query_vector = embedding_model.encode(pregunta)
    
    # 2. Similitud (Producto Punto)
    scores = np.dot(kb_matrix, query_vector)
    
    # 3. Ranking
    top_indices = np.argsort(scores)[-k:][::-1]
    
    return [kb_texts[i] for i in top_indices]


def sistema_rag(pregunta):
    """Orquestador: Pregunta -> Contexto -> LLM -> Respuesta"""
    print(f"🔎 Analizando: '{pregunta}'")
    
    # PASO 1: RETRIEVAL
    chunks = recuperar_contexto(pregunta, k=3)
    contexto_str = "\n\n".join(chunks)
    
    print(f"   📄 Contexto encontrado: {len(chunks)} fragmentos.")
    
    # PASO 2: GENERATION
    try:
        w = WorkspaceClient()
        client = w.serving_endpoints.get_open_ai_client()
        
        # Prompt del Sistema (Reglas para el LLM)
        system_instructions = f"""
        Eres un Asistente Técnico experto en Databricks.
        Responde a la pregunta del usuario basándote ÚNICAMENTE en el contexto proporcionado abajo.
        
        Reglas:
        1. Si la respuesta está en el contexto, explícala claramente en español.
        2. Si la respuesta NO está en el contexto, di textualmente: "La información disponible no menciona este tema".
        3. No inventes información.
        
        CONTEXTO:
        {contexto_str}
        """
        
        response = client.chat.completions.create(
            model=LLM_ENDPOINT_NAME,
            messages=[
                {"role": "system", "content": system_instructions},
                {"role": "user", "content": pregunta}
            ],
            temperature=0.1, 
            max_tokens=500
        )
        
        respuesta_final = response.choices[0].message.content
        
        # Salida visual
        print("\n" + "="*60)
        print("🤖 RESPUESTA GENERADA:")
        print("="*60)
        print(respuesta_final)
        print("-" * 60)
        
    except Exception as e:
        print(f"❌ Error en la generación: {e}")

##5. Pruebas (Interacción)
Ejecutamos escenarios de prueba para validar el comportamiento del sistema:
1.  **Caso Positivo:** Pregunta sobre "Unity Catalog" (información presente en el documento). Se espera una respuesta técnica y precisa.
2.  **Caso Negativo (Control):** Pregunta sobre "DBUs" (concepto de facturación no presente en el texto introductorio). Se espera que el sistema active la cláusula de seguridad y **no** invente una respuesta.

In [0]:
# Pregunta 1: Sobre Gobernanza
sistema_rag("¿Para qué sirve Unity Catalog?")

# Pregunta 2: Sobre Infraestructura
sistema_rag("¿Qué son las DBUs?")

sistema_rag("¿Qué servicios ofrece Azure Databricks?")

🔎 Analizando: '¿Para qué sirve Unity Catalog?'
   📄 Contexto encontrado: 3 fragmentos.

🤖 RESPUESTA GENERADA:
El catálogo de Unity proporciona un modelo unificado de gobernanza de datos para el almacén de lago de datos. Sirve para facilitar la ejecución de análisis seguros en la nube, proporcionar una división de responsabilidades y limitar la capacidad de acceso a los datos. También permite a los administradores de bases de datos proteger el acceso a los datos sin necesidad de escalar en la administración de acceso a identidades nativas de la nube (IAM) y las redes. Además, incluye una versión administrada de Delta Sharing para el uso compartido de datos fuera del entorno seguro.
------------------------------------------------------------
🔎 Analizando: '¿Qué son las DBUs?'
   📄 Contexto encontrado: 3 fragmentos.

🤖 RESPUESTA GENERADA:
La información disponible no menciona este tema.
------------------------------------------------------------
🔎 Analizando: '¿Qué servicios ofrece Azur