In [None]:
import os

assert os.path.exists("/content/requirements.txt"), "requirements.txt not found! Please upload it."
assert os.path.exists("/content/sb-iadaia-cap-dev-e91efbc5b66e.json"), "Credentials file not found! Please upload it."

print("All files found, good to go!")

All files found, good to go!


In [None]:
# Instalar las librer√≠as necesarias para el workshop
!pip install -r requirements.txt -q

In [None]:
import json
from google.oauth2.service_account import Credentials
from langchain_google_vertexai import VertexAIEmbeddings
import numpy as np
import logging
from google.cloud import firestore
from google import genai
from google.cloud.exceptions import GoogleCloudError
from typing import Optional, Union
import hashlib
from google.genai.types import EmbedContentConfig
from google.cloud.firestore_v1.vector import Vector
from datetime import datetime
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from google.cloud.firestore_v1.base_query import FieldFilter
import pandas as pd
from pydantic import BaseModel, Field
from langchain_google_vertexai import ChatVertexAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

  from google.cloud.aiplatform.utils import gcs_utils


In [None]:
def load_credentials() -> dict:
    """
    Reads a JSON file and returns its content as a Python dictionary.
    """

    CREDENTIALS_FILE_PATH = '/content/sb-iadaia-cap-dev-e91efbc5b66e.json'

    with open(CREDENTIALS_FILE_PATH) as f:
        creds_dict = json.load(f)

    credentials = Credentials.from_service_account_info(
        creds_dict,
        scopes=["https://www.googleapis.com/auth/cloud-platform"],
    )
    return credentials

# Embeddings

Un **embedding** es una forma de representar texto (palabras, frases o documentos) como un vector num√©rico ‚Äî es decir, una lista de n√∫meros. La idea clave es que ese vector captura el *significado* del texto, de manera que textos con significados similares tendr√°n vectores similares (cercanos en el espacio matem√°tico).

Por ejemplo, si representamos las palabras "rey" y "reina" como vectores, esperar√≠amos que estuvieran mucho m√°s cerca entre s√≠ que "rey" y "avi√≥n".

### ¬øPara qu√© sirven?

Los embeddings son la base de muchas aplicaciones modernas de IA:

- **B√∫squeda sem√°ntica**: encontrar documentos relevantes por significado, no solo por palabras clave exactas.
- **Recomendaciones**: sugerir contenido similar al que un usuario ya consumi√≥.
- **Clasificaci√≥n de texto**: detectar sentimientos, categorizar documentos, etc.
- **Memoria en aplicaciones de LLMs**: almacenar y recuperar informaci√≥n relevante para un modelo de lenguaje.

### ¬øC√≥mo se ven?

Un embedding es simplemente una lista de n√∫meros, por ejemplo:
```python
"Madre" ‚Üí [0.023, -0.147, 0.891, 0.004, ..., -0.312]  # cientos o miles de dimensiones
```

Cada n√∫mero representa una dimensi√≥n en un espacio de alta dimensionalidad. Nosotros no podemos visualizar ese espacio directamente, pero las matem√°ticas s√≠ pueden operar sobre √©l ‚Äî y eso es lo que aprovechan los modelos de IA.

In [None]:
def get_embedding(text: str, credentials) -> list[float]:
    """
    Takes a text and returns its embedding as a Python list using Google's Gemini embedding model via Vertex AI.
    """

    embeddings_model = VertexAIEmbeddings(
        model_name="gemini-embedding-001",
        project="sb-iadaia-cap-dev",
        credentials=credentials,
    )
    return embeddings_model.embed_query(text)

def cosine_similarity(embedding_a: list[float], embedding_b: list[float]) -> float:
    a = np.array(embedding_a)
    b = np.array(embedding_b)
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

In [None]:
creds = load_credentials()

emb_one = get_embedding("Mi madre me quiere mucho", creds)
emb_two = get_embedding("Mam√° me ama", creds)
emb_three = get_embedding("La tierra es plana", creds)

cosine_1 = cosine_similarity(emb_one, emb_two)
cosine_2 = cosine_similarity(emb_one, emb_three)


print(f'\n\n\nEl tama√±o de los embeddings es de {len(emb_one)}')
print(f'Muestra del primero: {emb_one[:5]}')
print(f'\nSimilaridad entre las primeras dos frases: {round(float(cosine_1), 2)}')
print(f'Similaridad entre las primera y la tercera: {round(float(cosine_2), 2)}')

  embeddings_model = VertexAIEmbeddings(
  embeddings_model = VertexAIEmbeddings(





El tama√±o de los embeddings es de 3072
Muestra del primero: [-0.007152873557060957, 0.03021971881389618, -0.01309516653418541, -0.08073106408119202, -0.00037088210228830576]

Similaridad entre las primeras dos frases: 0.84
Similaridad entre las primera y la tercera: 0.59


# Creaci√≥n de nuestra base de conocimiento

Una **base de conocimiento** es un repositorio de informaci√≥n estructurada que un sistema de IA puede consultar para responder preguntas o realizar tareas. A diferencia de un modelo de lenguaje que solo usa lo que aprendi√≥ durante su entrenamiento, una base de conocimiento le permite al modelo acceder a **informaci√≥n espec√≠fica, actualizada y relevante** para un contexto particular.

### ¬øC√≥mo funciona con embeddings?

Aqu√≠ es donde los embeddings del paso anterior entran en juego. El proceso general es:

1. **Indexaci√≥n**: cada documento o fragmento de texto de la base de conocimiento se convierte en un embedding y se almacena.
2. **Consulta**: cuando el usuario hace una pregunta, esa pregunta tambi√©n se convierte en un embedding.
3. **B√∫squeda**: se buscan los documentos cuyos embeddings sean m√°s cercanos al de la pregunta ‚Äî es decir, los m√°s relevantes sem√°nticamente.
4. **Respuesta**: esos documentos se le entregan al modelo de lenguaje como contexto para que genere una respuesta informada.

Este patr√≥n se conoce como **RAG** (*Retrieval-Augmented Generation*) y es uno de los enfoques m√°s usados hoy en d√≠a para construir aplicaciones de IA sobre documentos propios.

### ¬øPor qu√© es √∫til?

Sin una base de conocimiento, un LLM solo puede responder con lo que sabe de forma general. Con una, puede responder preguntas sobre **tus documentos, tus datos, tu empresa** ‚Äî de forma precisa y sin necesidad de re-entrenar el modelo.

In [None]:
# --- Pydantic models ---
class InsuranceProduct(BaseModel):
    id: int = Field(description="Unique identifier for the product")
    name: str = Field(description="Name of the insurance product")
    short_description: str = Field(description="A brief one-line description")
    complete_description: str = Field(description="A detailed multi-sentence description")


class InsuranceProductList(BaseModel):
    products: list[InsuranceProduct] = Field(description="List of 10 insurance products")


# --- Parser and prompt ---
parser = JsonOutputParser(pydantic_object=InsuranceProductList)

prompt = PromptTemplate(
    template=(
        "You are a creative insurance product designer. "
        "Generate 10 completely imaginary and unreal insurance products. "
        "They should be fun, creative, and obviously fictional. "
        "All of the names and descriptions must be in spanish. "
        "Each product must have: id, name, short_description, and complete_description.\n\n"
        "{format_instructions}\n"
    ),
    partial_variables={"format_instructions": parser.get_format_instructions()},
)

# --- LLM call ---
credentials = load_credentials()

llm = ChatVertexAI(
    model_name="gemini-2.5-flash",
    credentials=credentials,
)

chain = prompt | llm | parser

result = chain.invoke({})

# --- Build DataFrame ---
df = pd.DataFrame(result["products"])
df.to_pickle("/content/insurance_products.pkl")


random_index = 4
print(f'\n\n\nNombre: {df["name"].tolist()[random_index]}\nDescripci√≥n corta: {df["short_description"].tolist()[random_index]}\nDescripci√≥n completa: {df["complete_description"].tolist()[random_index]}\n\n')

df

  llm = ChatVertexAI(
  llm = ChatVertexAI(





Vamos a usar el siguiente ejemplo de ahora en adelante:

Nombre: S√∫per Mascotas: P√≥liza de Poderes
Descripci√≥n corta: Protege a tu mascota si desarrolla habilidades sobrehumanas inesperadas.
Descripci√≥n completa: ¬øTu gato empieza a volar o tu perro puede leer la mente? La p√≥liza S√∫per Mascotas cubre los gastos derivados de las nuevas habilidades extraordinarias de tu compa√±ero animal. Incluye desde la adaptaci√≥n del hogar para una mascota con supervelocidad hasta clases de control de poderes y un fondo para da√±os a la propiedad causados por el uso accidental de visi√≥n de rayos X.




Unnamed: 0,id,name,short_description,complete_description
0,1,P√≥liza Calcet√≠n Perdido,¬°Nunca m√°s te preocupes por el calcet√≠n solita...,Esta p√≥liza cubre la inexplicable desaparici√≥n...
1,2,Escudo Capilar Antimal D√≠a,"Garantiza un d√≠a de cabello perfecto, sin impo...",¬øCansado de los d√≠as de cabello rebelde y sin ...
2,3,Cobertura Apocalipsis Zombi Plus,Protecci√≥n total ante el fin del mundo... o al...,Esta es tu garant√≠a de supervivencia definitiv...
3,4,Sue√±o Hecho Realidad (o Pesadilla Evitada),Asegura que tus mejores sue√±os se cumplan y tu...,¬øTienes un sue√±o recurrente de volar o de gana...
4,5,S√∫per Mascotas: P√≥liza de Poderes,Protege a tu mascota si desarrolla habilidades...,¬øTu gato empieza a volar o tu perro puede leer...
5,6,Anti-Tostada Explosiva,Fin a la combusti√≥n espont√°nea y misteriosa de...,¬øTu tostadora tiene vida propia y decide carbo...
6,7,Aguacate Perfecto Garantizado,"¬°Nunca m√°s un aguacate duro, marr√≥n o en mal e...",¬øCansado de la loter√≠a del aguacate? Con esta ...
7,8,Blindaje Anti-Spoiler Total,Protege tu experiencia de entretenimiento de r...,Esta p√≥liza es tu defensa definitiva contra lo...
8,9,Int√©rprete Canino y Felino Universal,Entiende a tus mascotas y lo que *realmente* q...,¬øAlguna vez deseaste saber qu√© piensa tu perro...
9,10,Cl√≥nico Rob√≥tico: Reemplazo Preventivo,Seguro contra ser reemplazado por un clon rob√≥...,"En un futuro no muy lejano, ¬øser√°s suplantado ..."


# Vector Store

Un **Vector Store** (o almac√©n de vectores) es una base de datos dise√±ada espec√≠ficamente para guardar y buscar embeddings de forma eficiente. Es el componente que hace posible la base de conocimiento que vimos antes ‚Äî sin √©l, no tendr√≠amos d√≥nde almacenar los vectores ni c√≥mo buscar entre ellos r√°pidamente.

### ¬øPor qu√© no una base de datos normal?

Una base de datos tradicional es muy buena buscando coincidencias exactas ‚Äî por ejemplo, encontrar todos los registros donde `ciudad = "Bogot√°"`. Pero los embeddings requieren un tipo de b√∫squeda diferente: encontrar los vectores **m√°s cercanos** a uno dado, en un espacio de miles de dimensiones. Esto se conoce como **b√∫squeda por similitud** y las bases de datos tradicionales no est√°n optimizadas para eso.

Un Vector Store s√≠ lo est√°.

### Firestore como Vector Store

En este workshop usaremos **Firestore**, la base de datos de Google Cloud, que desde hace relativamente poco soporta b√∫squeda por similitud de vectores de forma nativa. Esto nos da varias ventajas:

- **Integraci√≥n natural con GCP**: si ya est√°s usando servicios de Google Cloud, Firestore encaja sin fricciones.
- **Escalabilidad**: Firestore est√° dise√±ado para manejar grandes vol√∫menes de datos sin configuraci√≥n adicional.
- **Sin infraestructura extra**: no necesitas levantar un servidor de b√∫squeda separado ‚Äî Firestore hace todo.

### El flujo completo

Uniendo todo lo que hemos visto hasta ahora:
```
Documentos ‚Üí Embeddings ‚Üí Vector Store (Firestore) ‚Üí B√∫squeda sem√°ntica ‚Üí LLM ‚Üí Respuesta
```

En las siguientes secciones veremos c√≥mo poblar ese Vector Store con nuestra base de conocimiento y c√≥mo consultarlo.

## Clase para usar Firestore como VS

In [None]:
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class FirestoreVectorStore:
    # Constante para el campo de embedding (fijo)
    EMBEDDING_KEY = "embedding"
    MAX_DIMENSION = 2048


    def __init__(self, project: str, database: str, collection: str, location: str = "us-east1"):
        """
        Inicializa el vector store de Firestore.

        Args:
            project: ID del proyecto de GCP
            database: Nombre de la base de datos de Firestore
            collection: Nombre de la colecci√≥n donde se almacenar√°n los vectores
            location: Regi√≥n de Google Cloud (default: "us-east1")
        """

        self.project = project
        self.collection = collection
        self.location = location

        # Configurar variables de entorno (con warnings si se sobrescriben)
        existing_project = os.environ.get("GOOGLE_CLOUD_PROJECT")
        if existing_project and existing_project != project:
            logger.warning(f"GOOGLE_CLOUD_PROJECT ya establecido como '{existing_project}', sobrescribiendo con '{project}'")

        existing_location = os.environ.get("GOOGLE_CLOUD_LOCATION")
        if existing_location and existing_location != location:
            logger.warning(f"GOOGLE_CLOUD_LOCATION ya establecido como '{existing_location}', sobrescribiendo con '{location}'")

        os.environ["GOOGLE_CLOUD_PROJECT"] = project
        os.environ["GOOGLE_CLOUD_LOCATION"] = location
        os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"

        try:
            credentials = load_credentials()
            self.db = firestore.Client(project=project, database=database, credentials=credentials)
            self.genai_client = genai.Client(project=project, vertexai=True, credentials=credentials)
            logger.info(f"FirestoreVectorStore inicializado: proyecto={project}, database={database}, colecci√≥n={collection}, location={location}")
        except GoogleCloudError as e:
            logger.error(f"Error al inicializar Firestore: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al inicializar: {e}")
            raise

    def _validate_dimension(self, dimension: Optional[int]) -> None:
        """
        Valida que la dimensi√≥n no exceda el m√°ximo permitido.

        Args:
            dimension: Dimensi√≥n a validar

        Raises:
            ValueError: Si la dimensi√≥n excede el m√°ximo permitido
        """
        if dimension is not None and dimension > self.MAX_DIMENSION:
            error_msg = f"La dimensi√≥n {dimension} excede el m√°ximo permitido de {self.MAX_DIMENSION}"
            logger.error(error_msg)
            raise ValueError(error_msg)

    def _generate_document_id(self, doc: dict, text_key: str) -> str:
        """
        Genera un ID √∫nico (hash) para un documento basado en su contenido.

        Args:
            doc: Documento del cual generar el ID
            text_key: Clave del campo de texto principal

        Returns:
            Hash SHA256 del contenido del documento (16 primeros caracteres)
        """
        # Usar el texto principal para generar el hash
        content = str(doc.get(text_key, ""))
        # Agregar otros campos relevantes para mayor unicidad
        for key, value in sorted(doc.items()):
            if key not in [self.EMBEDDING_KEY, 'id', 'created_at', 'updated_at']:
                content += f"{key}:{value}"

        # Generar hash SHA256 y tomar los primeros 16 caracteres
        hash_object = hashlib.sha256(content.encode('utf-8'))
        return hash_object.hexdigest()[:16]

    def embed_texts(self, texts: list[str], embedding_model: str, dimension: Optional[int] = None):
        """
        Genera embeddings para una lista de textos.
        Args:
            texts: Lista de textos a embedear
            embedding_model: Modelo de embedding a usar
            dimension: Dimensi√≥n del embedding (opcional, m√°ximo 2048)
        Returns:
            Lista de objetos Vector con los embeddings
        Raises:
            ValueError: Si la dimensi√≥n excede el m√°ximo permitido
        """
        try:
            # Validar dimensi√≥n
            self._validate_dimension(dimension)
            # L√≠mite de batch para la API de embeddings (m√°ximo 250)
            MAX_BATCH_SIZE = 250
            logger.info(f"Generando embeddings para {len(texts)} textos con modelo {embedding_model}")
            all_vectors = []
            total_batches = (len(texts) + MAX_BATCH_SIZE - 1) // MAX_BATCH_SIZE
            # Procesar en lotes de m√°ximo 250
            for i in range(0, len(texts), MAX_BATCH_SIZE):
                batch_texts = texts[i:i + MAX_BATCH_SIZE]
                batch_num = (i // MAX_BATCH_SIZE) + 1
                logger.info(f"Procesando lote {batch_num}/{total_batches} ({len(batch_texts)} textos)")
                embeddings = self.genai_client.models.embed_content(
                    model=embedding_model,
                    contents=batch_texts,
                    config=EmbedContentConfig(
                        task_type="RETRIEVAL_DOCUMENT",
                        output_dimensionality=dimension
                    )
                )
                # Validar dimensi√≥n resultante
                if embeddings.embeddings:
                    actual_dimension = len(embeddings.embeddings[0].values)
                    if actual_dimension > self.MAX_DIMENSION:
                        error_msg = f"La dimensi√≥n resultante {actual_dimension} excede el m√°ximo permitido de {self.MAX_DIMENSION}"
                        logger.error(error_msg)
                        raise ValueError(error_msg)
                # Pasar a clase vector y agregar a la lista total
                batch_vectors = [Vector(value=embedding.values) for embedding in embeddings.embeddings]
                all_vectors.extend(batch_vectors)
                logger.info(f"Lote {batch_num}/{total_batches} completado: {len(batch_vectors)} vectores generados")
            logger.info(f"Embeddings generados exitosamente: {len(all_vectors)} vectores en total")
            return all_vectors
        except ValueError:
            raise
        except Exception as e:
            logger.error(f"Error al generar embeddings: {e}")
            raise

    def embed_documents(self, documents: list[dict], embedding_model: str = "gemini-embedding-001", dimension: int = 2048, text_key: str = "text"):
        """
        Genera embeddings para una lista de documentos y los agrega al campo fijo 'embedding'.

        Args:
            documents: Lista de diccionarios con los documentos
            embedding_model: Modelo de embedding a usar (default: "gemini-embedding-001")
            dimension: Dimensi√≥n del embedding (default: 2048, m√°ximo 2048)
            text_key: Clave del diccionario que contiene el texto a embedear (default: "text")

        Returns:
            Lista de documentos con el campo 'embedding' agregado

        Raises:
            ValueError: Si alg√∫n documento no tiene la clave de texto o si la dimensi√≥n excede el m√°ximo
        """
        try:
            # Validar que todos los documentos tengan la clave de texto
            for i, doc in enumerate(documents):
                if text_key not in doc:
                    error_msg = f"El documento en √≠ndice {i} no tiene la clave '{text_key}' requerida"
                    logger.error(error_msg)
                    raise ValueError(error_msg)

            # Generar embeddings
            texts = [doc[text_key] for doc in documents]
            embeddings = self.embed_texts(
                texts=texts,
                embedding_model=embedding_model,
                dimension=dimension
            )

            # Agregar embeddings a los documentos (campo fijo "embedding")
            for i, doc in enumerate(documents):
                doc[self.EMBEDDING_KEY] = embeddings[i]
                logger.debug(f"Embedding agregado al documento {i}")

            logger.info(f"Embeddings agregados a {len(documents)} documentos")
            return documents
        except ValueError:
            raise
        except Exception as e:
            logger.error(f"Error al embedear documentos: {e}")
            raise

    def add_documents(self, documents: list[dict], embedding_model: str = "gemini-embedding-001", dimension: int = 2048, text_key: str = "text"):
        """
        Agrega documentos con embeddings a la colecci√≥n de Firestore.

        Args:
            documents: Lista de diccionarios con los documentos
            embedding_model: Modelo de embedding a usar (default: "gemini-embedding-001")
            dimension: Dimensi√≥n del embedding (default: 2048, m√°ximo 2048)
            text_key: Clave del diccionario que contiene el texto a embedear (default: "text")

        Raises:
            ValueError: Si alg√∫n documento no tiene el campo 'embedding' o si hay errores de validaci√≥n
            GoogleCloudError: Si hay errores al escribir en Firestore
        """
        try:
            # Generar embeddings si no existen
            documents_with_embeddings = self.embed_documents(
                documents=documents,
                embedding_model=embedding_model,
                dimension=dimension,
                text_key=text_key
            )

            # Verificar que todos tengan el campo embedding
            for i, doc in enumerate(documents_with_embeddings):
                if self.EMBEDDING_KEY not in doc:
                    error_msg = f"El documento en √≠ndice {i} no tiene el campo '{self.EMBEDDING_KEY}'"
                    logger.error(error_msg)
                    raise ValueError(error_msg)

            # Agregar documentos a la colecci√≥n usando batch (m√°ximo 500 por batch)
            BATCH_SIZE = 300
            total_docs = len(documents_with_embeddings)
            added_count = 0

            # Dividir en lotes de 300 documentos
            for i in range(0, total_docs, BATCH_SIZE):
                batch = self.db.batch()
                batch_docs = documents_with_embeddings[i:i + BATCH_SIZE]

                for doc in batch_docs:
                    # Agregar metadatos obligatorios
                    doc_id = self._generate_document_id(doc, text_key)
                    doc['id'] = doc_id
                    doc['created_at'] = datetime.utcnow().isoformat()

                    # Usar el hash como document ID en Firestore
                    doc_ref = self.db.collection(self.collection).document(doc_id)
                    batch.set(doc_ref, doc)

                batch.commit()
                added_count += len(batch_docs)
                logger.info(f"Lote agregado: {len(batch_docs)} documentos. Total: {added_count}/{total_docs}")

            logger.info(f"{total_docs} documentos agregados exitosamente a la colecci√≥n '{self.collection}'")

        except ValueError:
            raise
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al agregar documentos: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al agregar documentos: {e}")
            raise

    def count_documents(self):
        """
        Cuenta los documentos en la colecci√≥n.

        Returns:
            N√∫mero de documentos en la colecci√≥n

        Raises:
            GoogleCloudError: Si hay errores al consultar Firestore
            ValueError: Si el resultado tiene formato inesperado
        """
        try:
            count_query = self.db.collection(self.collection).count()
            count_result = count_query.get()

            # QueryResultsList es iterable - extraer el primer resultado de la agregaci√≥n
            for aggregation_result in count_result:
                # Cada resultado de agregaci√≥n es una lista con el valor del count
                if isinstance(aggregation_result, list) and len(aggregation_result) > 0:
                    count = aggregation_result[0].value
                    logger.info(f"Cantidad de documentos en '{self.collection}': {count}")
                    return count

            # Si llegamos aqu√≠, formato inesperado
            error_msg = f"Formato inesperado del resultado de count: {type(count_result)}"
            logger.error(error_msg)
            raise ValueError(error_msg)

        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al contar documentos: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al contar documentos: {e}")
            raise

    def get_documents(self):
        """
        Obtiene todos los documentos de la colecci√≥n.

        Returns:
            Lista de diccionarios con los documentos

        Raises:
            GoogleCloudError: Si hay errores al consultar Firestore
        """
        try:
            docs = [doc.to_dict() for doc in self.db.collection(self.collection).stream()]
            logger.info(f"Obtenidos {len(docs)} documentos de la colecci√≥n '{self.collection}'")
            return docs
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al obtener documentos: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al obtener documentos: {e}")
            raise

    def get_document_by_key(self, key: str, value: Union[str, list, dict]):
        """
        Busca documentos por una llave y valor espec√≠ficos.

        Args:
            key: Nombre del campo a buscar
            value: Valor a buscar

        Returns:
            Lista de diccionarios con los documentos encontrados

        Raises:
            GoogleCloudError: Si hay errores al consultar Firestore
        """
        try:
            docs = self.db.collection(self.collection).where(key, "==", value).get()
            result = [doc.to_dict() for doc in docs]
            logger.info(f"Encontrados {len(result)} documentos con {key}={value}")
            return result
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al buscar documentos por llave: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al buscar documentos: {e}")
            raise

    def get_document_by_id(self, document_id: str):
        """
        Obtiene un documento espec√≠fico por su ID.

        Args:
            document_id: ID √∫nico del documento (hash)

        Returns:
            Diccionario con el documento encontrado o None si no existe

        Raises:
            GoogleCloudError: Si hay errores al consultar Firestore
        """
        try:
            doc_ref = self.db.collection(self.collection).document(document_id)
            doc_snapshot = doc_ref.get()

            if doc_snapshot.exists:
                result = doc_snapshot.to_dict()
                logger.info(f"Documento encontrado con ID: {document_id}")
                return result
            else:
                logger.warning(f"No se encontr√≥ documento con ID: {document_id}")
                return None

        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al buscar documento por ID: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al buscar documento: {e}")
            raise

    def delete_document_by_key(self, key: str, value: Union[str, list, dict], batch_size: int = 500):
        """
        Elimina documentos por llave y valor espec√≠ficos.

        ADVERTENCIA: Esta operaci√≥n es irreversible.

        Args:
            key: Nombre del campo a buscar
            value: Valor a buscar
            batch_size: N√∫mero de documentos a eliminar por lote (m√°ximo 500)

        Returns:
            N√∫mero de documentos eliminados

        Raises:
            ValueError: Si batch_size excede 500 o no se encuentran documentos
            GoogleCloudError: Si hay errores al eliminar en Firestore
        """
        try:
            if batch_size > 500:
                error_msg = "batch_size no puede exceder 500 (l√≠mite de Firestore)"
                logger.error(error_msg)
                raise ValueError(error_msg)

            # Buscar documentos que coincidan
            doc_refs = self.db.collection(self.collection).where(key, "==", value).stream()
            doc_refs_list = list(doc_refs)

            if not doc_refs_list:
                logger.warning(f"No se encontraron documentos con {key}={value}")
                return 0

            logger.warning(f"Eliminando {len(doc_refs_list)} documentos con {key}={value}")

            # Eliminar en batches
            deleted_count = 0
            total_docs = len(doc_refs_list)

            for i in range(0, total_docs, batch_size):
                batch = self.db.batch()
                batch_refs = doc_refs_list[i:i + batch_size]

                for doc_ref in batch_refs:
                    batch.delete(doc_ref.reference)

                batch.commit()
                deleted_count += len(batch_refs)
                logger.info(f"Eliminados {len(batch_refs)} documentos. Total: {deleted_count}/{total_docs}")

            logger.info(f"Eliminaci√≥n completada: {deleted_count} documentos con {key}={value}")
            return deleted_count

        except ValueError:
            raise
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al eliminar documentos por llave: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al eliminar documentos: {e}")
            raise

    def delete_document_by_id(self, document_id: str):
        """
        Elimina un documento espec√≠fico por su ID.

        ADVERTENCIA: Esta operaci√≥n es irreversible.

        Args:
            document_id: ID √∫nico del documento (hash)

        Returns:
            True si se elimin√≥, False si no se encontr√≥

        Raises:
            GoogleCloudError: Si hay errores al eliminar en Firestore
        """
        try:
            doc_ref = self.db.collection(self.collection).document(document_id)
            doc_snapshot = doc_ref.get()

            if not doc_snapshot.exists:
                logger.warning(f"No se encontr√≥ documento con ID: {document_id}")
                return False

            logger.warning(f"Eliminando documento con ID: {document_id}")
            doc_ref.delete()
            logger.info(f"Documento {document_id} eliminado exitosamente")
            return True

        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al eliminar documento por ID: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al eliminar documento: {e}")
            raise

    def update_document_by_key(self, key: str, value: Union[str, list, dict],
                               embedding_model: str = "gemini-embedding-001", dimension: int = 2048,
                               text_key: str = "text", update_fields: Optional[dict] = None):
        """
        Actualiza documentos por llave, regenerando embeddings y opcionalmente otros campos.

        Args:
            key: Nombre del campo a buscar
            value: Valor a buscar
            embedding_model: Modelo de embedding a usar (default: "gemini-embedding-001")
            dimension: Dimensi√≥n del embedding (default: 2048, m√°ximo 2048)
            text_key: Clave del diccionario que contiene el texto a embedear (default: "text")
            update_fields: Diccionario opcional con campos adicionales a actualizar

        Returns:
            N√∫mero de documentos actualizados

        Raises:
            ValueError: Si no se encuentran documentos o si hay errores de validaci√≥n
            GoogleCloudError: Si hay errores al actualizar en Firestore
        """
        try:
            # Buscar documentos por llave
            doc_refs = self.db.collection(self.collection).where(key, "==", value).get()

            if not doc_refs:
                error_msg = f"No se encontraron documentos con la llave '{key}' y valor '{value}'"
                logger.warning(error_msg)
                raise ValueError(error_msg)

            logger.info(f"Encontrados {len(doc_refs)} documentos para actualizar")

            # Convertir a diccionarios para procesar
            docs = [doc_ref.to_dict() for doc_ref in doc_refs]

            # Validar que todos tengan la clave de texto
            for i, doc in enumerate(docs):
                if text_key not in doc:
                    error_msg = f"El documento en √≠ndice {i} no tiene la clave '{text_key}' requerida"
                    logger.error(error_msg)
                    raise ValueError(error_msg)

            # Generar nuevos embeddings
            texts = [doc[text_key] for doc in docs]
            embeddings = self.embed_texts(
                texts=texts,
                embedding_model=embedding_model,
                dimension=dimension
            )

            # Actualizar documentos en Firestore
            batch = self.db.batch()
            updated_count = 0

            for i, doc_ref in enumerate(doc_refs):
                # Actualizar el campo embedding (fijo)
                update_data = {
                    self.EMBEDDING_KEY: embeddings[i].values,
                    'updated_at': datetime.utcnow().isoformat()
                }

                # Agregar campos adicionales si se proporcionan
                if update_fields:
                    update_data.update(update_fields)

                batch.update(doc_ref, update_data)
                updated_count += 1

            batch.commit()
            logger.info(f"{updated_count} documentos actualizados exitosamente")
            return updated_count

        except ValueError:
            raise
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al actualizar documentos: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al actualizar documentos: {e}")
            raise

    def update_document_by_id(self, document_id: str, updated_data: dict,
                              embedding_model: str = "gemini-embedding-001", dimension: int = 2048,
                              text_key: str = "text"):
        """
        Actualiza un documento espec√≠fico por su ID, regenerando el embedding.

        Args:
            document_id: ID √∫nico del documento (hash)
            updated_data: Diccionario con los campos a actualizar
            embedding_model: Modelo de embedding a usar (default: "gemini-embedding-001")
            dimension: Dimensi√≥n del embedding (default: 2048, m√°ximo 2048)
            text_key: Clave del diccionario que contiene el texto a embedear (default: "text")

        Raises:
            ValueError: Si el documento no existe o no tiene el campo de texto
            GoogleCloudError: Si hay errores al actualizar en Firestore
        """
        try:
            # Obtener referencia del documento por ID
            doc_ref = self.db.collection(self.collection).document(document_id)
            doc_snapshot = doc_ref.get()

            if not doc_snapshot.exists:
                error_msg = f"No se encontr√≥ documento con ID '{document_id}'"
                logger.error(error_msg)
                raise ValueError(error_msg)

            logger.info(f"Actualizando documento con ID: {document_id}")

            # Obtener datos actuales y mezclar con los nuevos
            current_data = doc_snapshot.to_dict()
            current_data.update(updated_data)

            # Validar que tenga el campo de texto
            if text_key not in current_data:
                error_msg = f"El documento no tiene la clave '{text_key}' requerida"
                logger.error(error_msg)
                raise ValueError(error_msg)

            # Generar nuevo embedding
            embeddings = self.embed_texts(
                texts=[current_data[text_key]],
                embedding_model=embedding_model,
                dimension=dimension
            )

            # Preparar datos de actualizaci√≥n
            update_data = {
                self.EMBEDDING_KEY: embeddings[0],
                'updated_at': datetime.utcnow().isoformat()
            }

            # Agregar los campos actualizados
            update_data.update(updated_data)

            # Actualizar el documento
            doc_ref.update(update_data)
            logger.info(f"Documento {document_id} actualizado exitosamente")

        except ValueError:
            raise
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al actualizar documento: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al actualizar documento: {e}")
            raise

    def delete_collection(self, batch_size: int = 500):
        """
        Elimina todos los documentos de la colecci√≥n.

        ADVERTENCIA: Esta operaci√≥n es irreversible y eliminar√° todos los documentos.

        Args:
            batch_size: N√∫mero de documentos a eliminar por lote (m√°ximo 500)

        Returns:
            N√∫mero total de documentos eliminados

        Raises:
            ValueError: Si batch_size excede 500
            GoogleCloudError: Si hay errores al eliminar en Firestore
        """
        try:
            if batch_size > 500:
                error_msg = "batch_size no puede exceder 500 (l√≠mite de Firestore)"
                logger.error(error_msg)
                raise ValueError(error_msg)

            logger.warning(f"Iniciando eliminaci√≥n de todos los documentos en la colecci√≥n '{self.collection}'")

            deleted_count = 0
            collection_ref = self.db.collection(self.collection)

            while True:
                # Obtener un lote de documentos
                docs = list(collection_ref.limit(batch_size).stream())

                if not docs:
                    break  # No hay m√°s documentos

                # Eliminar el lote usando batch
                batch = self.db.batch()
                for doc in docs:
                    batch.delete(doc.reference)

                batch.commit()
                deleted_count += len(docs)
                logger.info(f"Eliminados {len(docs)} documentos. Total: {deleted_count}")

            logger.info(f"Colecci√≥n '{self.collection}' limpiada exitosamente. Total eliminados: {deleted_count}")
            return deleted_count

        except ValueError:
            raise
        except GoogleCloudError as e:
            logger.error(f"Error de Firestore al eliminar colecci√≥n: {e}")
            raise
        except Exception as e:
            logger.error(f"Error inesperado al eliminar colecci√≥n: {e}")
            raise

    def as_retriever(self, query: str, k: int = 5, model: str = "gemini-embedding-001",
                    dimension: int = 2048, distance_measure: DistanceMeasure = DistanceMeasure.COSINE,
                    filters: dict = None):
        """
        B√∫squeda de similaridad vectorial optimizada con filtros opcionales.

        Args:
            query: Texto de b√∫squeda
            k: N√∫mero de resultados a retornar
            model: Modelo de embeddings a usar
            dimension: Dimensi√≥n de los embeddings
            distance_measure: Medida de distancia (COSINE, EUCLIDEAN, DOT_PRODUCT)
            filters: Diccionario de filtros para aplicar condiciones.
                    Soporta dos formatos:
                    - Igualdad simple: {"campo": valor}
                    - Operador IN: {"campo": ("in", [valor1, valor2, ...])}

                    Ejemplos:
                    - filters={"categoria": "cardiologia", "activo": True}
                    - filters={"categoria": ("in", ["cardiologia", "neurologia"]), "activo": True}

                    NOTA: Si una lista 'in' tiene m√°s de 30 valores, se divide autom√°ticamente
                    en m√∫ltiples consultas y se combinan los resultados.

        Returns:
            Lista de documentos similares con sus scores

        Raises:
            ValueError: Si hay errores en el formato de filtros
        """

        try:
            # Generar embedding del query
            vector_list = self.embed_texts(texts=[query], embedding_model=model, dimension=dimension)
            query_vector = vector_list[0]

            # Identificar si hay filtros IN con m√°s de 30 valores
            has_large_in_filter = False
            large_in_field = None
            large_in_values = None
            other_filters = {}
            has_other_filters = False

            if filters:
                for field, value in filters.items():
                    if isinstance(value, tuple) and len(value) == 2 and value[0] == "in":
                        operator, values_list = value

                        # Validaciones b√°sicas
                        if not isinstance(values_list, list):
                            error_msg = f"El operador 'in' requiere una lista de valores, recibido: {type(values_list)}"
                            logger.error(error_msg)
                            raise ValueError(error_msg)

                        if len(values_list) == 0:
                            error_msg = "El operador 'in' requiere al menos un valor en la lista"
                            logger.error(error_msg)
                            raise ValueError(error_msg)

                        # Si tiene m√°s de 30 valores, lo manejamos especialmente
                        if len(values_list) > 30:
                            if has_large_in_filter:
                                error_msg = "Solo se puede tener un filtro 'in' con m√°s de 30 valores por consulta"
                                logger.error(error_msg)
                                raise ValueError(error_msg)

                            has_large_in_filter = True
                            large_in_field = field
                            large_in_values = values_list
                            logger.info(f"Filtro IN grande detectado: {field} con {len(values_list)} valores. Se dividir√° en chunks.")
                        else:
                            other_filters[field] = value
                            has_other_filters = True
                    else:
                        other_filters[field] = value
                        has_other_filters = True

            # Determinar el chunk_size √≥ptimo basado en si hay otros filtros
            # Si hay otros filtros + filtro IN grande, reducimos el chunk_size para evitar el l√≠mite de 30 disjunciones
            if has_large_in_filter and has_other_filters:
                # Con otros filtros, usamos chunks m√°s peque√±os para evitar el l√≠mite de disjunciones
                chunk_size = 10
                logger.info(f"Usando chunk_size={chunk_size} debido a filtros adicionales (evitar l√≠mite de 30 disjunciones)")
            else:
                chunk_size = 30

            # Funci√≥n auxiliar para ejecutar una b√∫squeda con filtros espec√≠ficos
            def _execute_search(search_filters, limit_override=None):
                collection_ref = self.db.collection(self.collection)

                # Aplicar filtros
                for field, value in search_filters.items():
                    if isinstance(value, tuple) and len(value) == 2 and value[0] == "in":
                        operator, values_list = value
                        collection_ref = collection_ref.where(filter=FieldFilter(field, "in", values_list))
                        logger.debug(f"Filtro IN aplicado: {field} in {values_list[:3]}... ({len(values_list)} valores)")
                    else:
                        collection_ref = collection_ref.where(filter=FieldFilter(field, "==", value))
                        logger.debug(f"Filtro de igualdad aplicado: {field} == {value}")

                # B√∫squeda vectorial con l√≠mite ajustado
                search_limit = limit_override if limit_override else k
                response = collection_ref.find_nearest(
                    vector_field=self.EMBEDDING_KEY,
                    query_vector=query_vector,
                    distance_measure=distance_measure,
                    limit=search_limit
                )

                return response.get()

            # Si NO hay filtro IN grande, ejecutar b√∫squeda normal
            if not has_large_in_filter:
                results = _execute_search(other_filters if filters else {})

                result_list = [
                    {
                        **{k: v for k, v in doc.to_dict().items()
                        if k not in [self.EMBEDDING_KEY, "vector_distance"]},
                        "score": 1 - doc.to_dict().get("vector_distance", 0)
                    }
                    for doc in results
                ]

                logger.info(f"B√∫squeda completada: {len(result_list)} resultados encontrados")
                return result_list

            # Si HAY filtro IN grande, dividir en chunks y combinar resultados
            logger.info(f"Ejecutando b√∫squeda en m√∫ltiples chunks para {large_in_field}")

            all_results = []
            num_chunks = (len(large_in_values) - 1) // chunk_size + 1

            # Solicitar m√°s resultados por chunk para compensar
            # Pedimos k * (n√∫mero de chunks) para asegurar que tenemos suficientes
            results_per_chunk = max(k * 2, k * num_chunks // 2)

            # Dividir la lista en chunks
            for i in range(0, len(large_in_values), chunk_size):
                chunk = large_in_values[i:i + chunk_size]

                # Crear filtros para este chunk
                chunk_filters = other_filters.copy()
                chunk_filters[large_in_field] = ("in", chunk)

                # Ejecutar b√∫squeda para este chunk con l√≠mite aumentado
                logger.info(f"Ejecutando chunk {i//chunk_size + 1}/{num_chunks} con {len(chunk)} valores (solicitando {results_per_chunk} resultados)")

                try:
                    chunk_results = _execute_search(chunk_filters, limit_override=results_per_chunk)

                    # Agregar resultados con sus scores
                    for doc in chunk_results:
                        doc_dict = doc.to_dict()
                        all_results.append({
                            **{k: v for k, v in doc_dict.items()
                            if k not in [self.EMBEDDING_KEY, "vector_distance"]},
                            "score": 1 - doc_dict.get("vector_distance", 0),
                            "_doc_id": doc.id  # Para deduplicar
                        })
                except Exception as chunk_error:
                    logger.warning(f"Error en chunk {i//chunk_size + 1}: {chunk_error}. Continuando con otros chunks...")
                    continue

            if not all_results:
                logger.warning("No se obtuvieron resultados de ning√∫n chunk")
                return []

            # Deduplicar resultados (por si un documento aparece en m√∫ltiples chunks)
            unique_results = {}
            for result in all_results:
                doc_id = result.pop("_doc_id")
                if doc_id not in unique_results or result["score"] > unique_results[doc_id]["score"]:
                    unique_results[doc_id] = result

            # Ordenar por score descendente y tomar los top k
            final_results = sorted(unique_results.values(), key=lambda x: x["score"], reverse=True)[:k]

            logger.info(f"B√∫squeda con chunks completada: {len(final_results)} resultados finales de {len(all_results)} totales (despu√©s de deduplicaci√≥n)")
            return final_results

        except ValueError:
            raise
        except Exception as e:
            logger.error(f"Error en b√∫squeda vectorial: {e}")
            raise

## Ingestando informaci√≥n

In [None]:
documentos = df.to_dict(orient="records")

# Veamos c√≥mo lucen
documentos[0]

{'id': 1,
 'name': 'P√≥liza Calcet√≠n Perdido',
 'short_description': '¬°Nunca m√°s te preocupes por el calcet√≠n solitario!',
 'complete_description': 'Esta p√≥liza cubre la inexplicable desaparici√≥n de un calcet√≠n de un par perfectamente combinado durante el ciclo de lavado. Recibir√°s una indemnizaci√≥n equivalente al valor de un par de calcetines nuevos o, en casos extremos, un servicio de detective de calcetines para reunirte con tu media perdida. ¬°Adi√≥s a la tristeza del caj√≥n de calcetines hu√©rfanos!'}

In [None]:
vector_store = FirestoreVectorStore(
    project="sb-iadaia-cap-dev",
    database=f"vs-rag-workshop",
    collection="t_seguros_fake_gemini"
)

#! AS√ç ES COMO SE INGESTAR√çA LA INFORMACI√ìN PARA USAR FIRESTORE COMO VS
# vector_store.add_documents(documentos, "gemini-embedding-001", 2048, "short_description")

## Retriever

Un **retriever** es el componente encargado de consultar el Vector Store y traer los documentos m√°s relevantes dado una pregunta o consulta. Es esencialmente el puente entre el usuario y la base de conocimiento.

Cuando el usuario hace una pregunta, el retriever la convierte en un embedding y busca en Firestore los fragmentos de texto m√°s cercanos sem√°nticamente ‚Äî esos fragmentos son los que luego se le pasan al modelo de lenguaje como contexto para generar la respuesta.

En t√©rminos simples: el retriever **sabe d√≥nde buscar y qu√© traer**.

In [None]:
vector_store.as_retriever(query="Quiero un seguro para cuando derramo algo sobre mi teclado :(", k=1)

[{'created_at': '2026-02-18T21:38:38.252255',
  'complete_description': 'Sabemos que la vida con caf√© es mejor, pero a veces tambi√©n m√°s... h√∫meda. Si tu preciado teclado sucumbe a un derrame accidental de caf√©, t√© o cualquier bebida energ√©tica matutina, nuestra cobertura te proporciona un teclado de reemplazo y un kit de limpieza de emergencia para tu estaci√≥n de trabajo. ¬°Tu productividad est√° a salvo!',
  'short_description': 'Protecci√≥n para tu teclado contra desastres l√≠quidos matutinos.',
  'id': '66af4a87b681c888',
  'name': 'Cobertura de Caf√© Derramado sobre Teclado',
  'score': 1}]

# RAG

**RAG** (*Retrieval-Augmented Generation*) es el patr√≥n que une todos los componentes que hemos visto hasta ahora. El nombre lo dice todo: es generaci√≥n de texto (*Generation*) enriquecida (*Augmented*) con informaci√≥n recuperada (*Retrieval*) de una base de conocimiento.

El flujo es simple:

1. El usuario hace una pregunta.
2. El **retriever** busca en el **Vector Store** los documentos m√°s relevantes.
3. Esos documentos se le pasan al **LLM** como contexto adicional.
4. El LLM genera una respuesta informada, basada tanto en su conocimiento general como en los documentos recuperados.

### ¬øPor qu√© importa?

RAG resuelve uno de los problemas m√°s comunes al trabajar con LLMs en contextos empresariales: el modelo no conoce tu informaci√≥n interna. Con RAG, no necesitas reentrenar el modelo ‚Äî simplemente le das acceso a tus documentos en el momento en que los necesita.

In [None]:
def rag(user_input: str) -> str:
    """
    Receives a user question, retrieves relevant documents from Firestore,
    and returns the LLM response augmented with that context.
    """

    # --- Retriever ---
    vector_store = FirestoreVectorStore(
        project="sb-iadaia-cap-dev",
        database="vs-rag-workshop",
        collection="t_seguros_fake_gemini",
    )

    results = vector_store.as_retriever(query=user_input, k=3)

    # Build context from retrieved documents
    context = "\n\n".join(
        f"- {doc.get('name', '')}: {doc.get('complete_description', doc.get('short_description', ''))}"
        for doc in results
    )

    # --- LLM ---
    credentials = load_credentials()
    llm = ChatVertexAI(
        model_name="gemini-2.5-flash",
        credentials=credentials,
    )

    prompt = ChatPromptTemplate.from_messages([
        ("system",
         "You are a helpful insurance assistant. Use ONLY the following context "
         "to answer the user's question. If the context does not contain enough "
         "information, say so. Always answer in Spanish.\n\n"
         "Context:\n{context}"),
        ("human", "{question}"),
    ])

    chain = prompt | llm | StrOutputParser()

    return chain.invoke({"context": context, "question": user_input})

In [None]:
print(f'\n\n\nRespuesta del RAG:\n\n{rag(user_input="Quiero un seguro para cuando derramo algo sobre mi teclado :(")}')

  llm = ChatVertexAI(





Respuesta del RAG:

¬°Entiendo perfectamente! Tenemos una cobertura perfecta para eso. Nuestra "Cobertura de Caf√© Derramado sobre Teclado" te protege si derramas caf√©, t√© o cualquier otra bebida accidentalmente sobre tu teclado. Te proporcionamos un teclado de reemplazo y un kit de limpieza de emergencia para tu estaci√≥n de trabajo. ¬°As√≠ tu productividad no se ver√° afectada!


# Agente de Langgraph

Un **agente de IA** es un sistema que, dado un objetivo o una pregunta, es capaz de **razonar, tomar decisiones y ejecutar acciones** de forma aut√≥noma para llegar a una respuesta. A diferencia de un LLM al que simplemente le haces una pregunta y te responde, un agente puede decidir qu√© pasos seguir, qu√© herramientas usar y c√≥mo combinar resultados ‚Äî iterando hasta completar la tarea.

En este workshop construiremos nuestro agente usando **LangGraph**, una librer√≠a dise√±ada para crear agentes como grafos de flujo de trabajo, lo que nos da control preciso sobre c√≥mo razona y act√∫a el agente.

### Componentes del agente

#### üõ†Ô∏è Tools (Herramientas)
Las tools son funciones que el agente puede decidir invocar cuando las necesita. Por ejemplo, una tool puede consultar el Vector Store, hacer un c√°lculo, o llamar a una API externa. El agente no las ejecuta todas siempre ‚Äî decide cu√°les usar seg√∫n el contexto de la conversaci√≥n. En nuestro caso, el retriever sobre Firestore ser√° una de las tools disponibles.

#### üß† LLM
El modelo de lenguaje es el "cerebro" del agente. Es quien lee la conversaci√≥n, decide qu√© tool invocar (si es que necesita alguna), interpreta los resultados y finalmente genera la respuesta. El LLM no act√∫a solo ‚Äî est√° guiado por el system prompt y limitado a las tools que le damos.

#### üìã System Prompt
El system prompt es el conjunto de instrucciones que le define al LLM **qui√©n es y c√≥mo debe comportarse**. Es donde le decimos su rol, su tono, sus limitaciones y cualquier regla de negocio relevante. Un buen system prompt es clave para que el agente se comporte de forma coherente y predecible.

#### üíæ Memoria (Firestore)
La memoria le permite al agente recordar conversaciones anteriores. Sin ella, cada mensaje ser√≠a tratado como una conversaci√≥n nueva y el agente perder√≠a todo el contexto previo. En este workshop usaremos **Firestore** tambi√©n como almac√©n de memoria, lo que nos permite persistir el historial de conversaci√≥n de forma escalable y sin infraestructura adicional ‚Äî aprovechando el mismo servicio que ya usamos para el Vector Store.

#### üîÄ El Grafo (LangGraph)
LangGraph modela el comportamiento del agente como un **grafo de nodos y conexiones**. Cada nodo representa una acci√≥n o decisi√≥n (como invocar el LLM, ejecutar una tool, o verificar una condici√≥n), y las conexiones determinan el flujo entre ellos. Esto nos da una ventaja importante sobre otros enfoques: el comportamiento del agente es **expl√≠cito, trazable y modificable**, en lugar de ser una caja negra.

El grafo de nuestro agente se ver√° algo as√≠:
```
Entrada del usuario ‚Üí LLM ‚Üí ¬øNecesita una tool?
                              ‚îú‚îÄ‚îÄ S√≠ ‚Üí Ejecutar tool ‚Üí LLM ‚Üí Respuesta
                              ‚îî‚îÄ‚îÄ No ‚Üí Respuesta
```

### El agente completo

Uniendo todo:

| Componente | Rol |
|---|---|
| System Prompt | Define el comportamiento del agente |
| LLM | Razona y genera respuestas |
| Tools | Acciones que el agente puede ejecutar |
| Memoria (Firestore) | Recuerda conversaciones anteriores |
| Grafo (LangGraph) | Orquesta el flujo completo |

In [None]:
from langchain_core.tools import tool
from langgraph.prebuilt import InjectedState
from langchain_core.tools.base import InjectedToolCallId
from typing_extensions import Annotated
from langgraph.types import Command
from langchain_core.messages import ToolMessage

from typing import Any, AsyncIterator, Dict, Iterator, Optional, Sequence, Tuple
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, CheckpointTuple, ChannelVersions
import pickle
from datetime import datetime
import pytz

from langchain_core.messages import SystemMessage
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.prebuilt import ToolNode

from langchain_core.messages import HumanMessage

## Tools

In [None]:
class InputRAGTool(BaseModel):
    tool_call_id: Annotated[str, InjectedToolCallId]
    state: Annotated[dict, InjectedState]
    pregunta: str = Field(None, description='La pregunta que tiene el usuario ed seguros o coberturas.')


@tool(args_schema=InputRAGTool)
def call_rag_productos_y_coberturas(pregunta: str, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId]) -> dict:
    """
    Esta tool hace una b√∫squeda sem√°ntica de coberturas o seguros para un usuario, dada una pregunta espec√≠fica.
    """

    try:

        rag_response = rag(user_input=pregunta)

        update_dict = {
            "messages": [
                ToolMessage(
                    content=f"Tool ejecutada correctamente. Respuesta de la tool: {rag_response}",
                    tool_call_id=tool_call_id
                )
            ],
            "ultima_pregunta": pregunta,
            "ultima_respuesta": rag_response
        }

        return Command(update=update_dict)

    except Exception as e:
        print(f"Error inesperado en call_rag_productos_y_coberturas: {e}")
        update_dict = {
            "messages": [
                ToolMessage(
                    content="Error inesperado al procesar la solicitud del usuario.",
                    tool_call_id=tool_call_id
                )
            ],
            "ultima_pregunta": pregunta,
            "ultima_respuesta": "Lo siento gfe, fall√© :("
        }

        return Command(update=update_dict)

## LLM

In [None]:
tools = [call_rag_productos_y_coberturas]
llm_with_tools = llm.bind_tools(tools)

## System Prompt

In [None]:
AGENT_SYSTEM_PROMPT = """
Eres un asistente virtual llamado "Segur√≠n", un experto asesor que trabaja para una empresa aseguradora.
Tu trabajo consiste en ayudar al cliente cuando tenga dudas de qu√© producto o coberturas sirven para suplir sus necesidades.

### REGLA CR√çTICA:
Si no tienes la suficiente informaci√≥n completa para llamar una tool, debes pedirle al usuario que te proporcione la informaci√≥n faltante.

### HERRAMIENTAS O TOOLS DISPONIBLES:
- call_rag_productos_y_coberturas: Esta herramienta te permite obtener la informaci√≥n m√°s relevante de productos o coberturas que tiene la aseguradora. S√≥lo utiliza esta herramienta cuando el usuario tenga una pregunta expl√≠cita de un seguro o cobertura.
    Par√°metros:
        - pregunta: Es la pregunta que el usuario tiene aserca de seguros o coberturas.

### PROTOCOLO GENERAL
- Al inicio de la conversaci√≥n, aseg√∫rate de saludar al usuario present√°ndote con nombre propio, y manteniendo un lenguaje formal pero amable.
- No respondas preguntas que no est√©n relacionadas con la aseguradora.
- Conversa con el usuario hasta asegurarte de tener clara cu√°l es la pregunta que tiene.
- Una vez tengas clara la pregunta, llama la tool 'call_rag_productos_y_coberturas' y responde al usuario dada la informaci√≥n que te responde la tool
- La aseguradora cubre algunas cosas no tradicionales, por lo tanto no descartes la preguntas a la ligera. Si el usuario est√° preguntando por un seguro o cobertura, por loco que suene, utiliza la tool.
- Puedes llamar m√∫ltiples veces la tool en una misma conversaci√≥n, si el usuario tiene m√∫ltiples preguntas.
- Cada vez que respondas una pregunta del usuario, pregunta si necesita m√°s informaci√≥n o si ya est√° resuelta su duda.
- Si al llamar a la herramienta algo sale mal, response al usuario que algo sali√≥ mal, pero que en unos minutos puede volver a intentarlo.
- Si el usuario indica que est√° conforme con la informaci√≥n que le brindaste, desp√≠dete usando vocabulario formal pero amable.

### ESTILO:
- Trato formal ("usted", no "t√∫")
- Breve y directo
- Profesional
- No saludar con Buenos d√≠as o buenas tardes
"""

## Memoria (Firestore)

In [None]:
class JsonPlusSerializerCompat(JsonPlusSerializer):
    #Clase para serializar-deserializar el checkpointer, hereda m√©todos de JsonPlusSerializer
    def loads(self, data: bytes) -> Any:
        if data.startswith(b"\x80") and data.endswith(b"."):
            return pickle.loads(data)
        return super().loads(data)

class FirestoreSaver(BaseCheckpointSaver):
    """
    Clase para implementar memoria de Langgraph en Firestore, debe especificarse
    la base de datos (database) el nombre de coleccion de los checkpoints (collection_name)
    y el nombre de la coleccion de pasos intermedios (pw_collection_name).
    """
    serde = JsonPlusSerializerCompat()

    def __init__(self, database = "(default)", collection_name: str = "checkpoints", pw_collection_name: str = "checkpoint_writes", serde: Optional[Any] = None) -> None:
        super().__init__(serde=serde)
        self.db: firestore.Client = firestore.Client(database=database, credentials=creds)
        self.async_db: firestore.AsyncClient = firestore.AsyncClient(database=database, credentials=creds)
        self.collection_name: str = collection_name
        self.pw_collection_name: str = pw_collection_name

    # M√©todo para traer memoria asociada a un thread_id
    def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        thread_id: str = config["configurable"]["thread_id"]
        thread_ts: Optional[str] = config["configurable"].get("thread_ts")

        doc_ref: firestore.DocumentReference = self.db.collection(self.collection_name).document(thread_id)
        doc: firestore.DocumentSnapshot = doc_ref.get()
        #Trae todo lo asociado al thead_id

        if not doc.exists:
            return None

        data: Dict[str, Any] = doc.to_dict()
        return self._process_checkpoint_data_common(data)

    # M√©todo asincronico para traer memmoria
    async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        thread_id: str = config["configurable"]["thread_id"]
        thread_ts: Optional[str] = config["configurable"].get("thread_ts")

        doc_ref: firestore.AsyncDocumentReference = self.async_db.collection(self.collection_name).document(thread_id)
        doc: firestore.DocumentSnapshot = await doc_ref.get()

        data: Dict[str, Any] = doc.to_dict()
        return await self._process_checkpoint_data_common(data)

    # Para listar checkpoints (Para listar checkpoints basados en un criterio)
    def list(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ) -> Iterator[CheckpointTuple]:
        thread_id: Optional[str] = config["configurable"]["thread_id"] if config else None
        if filter:
            raise NotImplementedError("No se cuenta con la funcionalidad de filtrado")

        # Obtiene una referencia a la colecci√≥n de checkpoints
        col_ref: firestore.CollectionReference = self.db.collection(self.collection_name)

        # Si se proporcion√≥ un thread_id, filtra por ese thread_id
        if thread_id:
            col_ref = col_ref.where("thread_id", "==", thread_id)

        docs: firestore.QuerySnapshot = col_ref.order_by("timestamp", direction=firestore.Query.DESCENDING).limit(limit or 100).get()

        for doc in docs:
            yield self._process_checkpoint_data_common(doc.to_dict())

    # M√©todo asincronico  para listar checkpoints (Para listar checkpoints basados en un criterio)
    async def alist(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ) -> AsyncIterator[CheckpointTuple]:
        thread_id: Optional[str] = config["configurable"]["thread_id"] if config else None
        if filter:
            raise NotImplementedError("Filtering is not implemented for FirestoreSaver")

        # Obtiene una referencia a la colecci√≥n de checkpoints
        col_ref: firestore.AsyncCollectionReference = self.async_db.collection(self.collection_name)

        # Si se proporcion√≥ un thread_id, filtra por ese thread_id
        if thread_id:
            col_ref = col_ref.where("thread_id", "==", thread_id)

        docs: firestore.QuerySnapshot = await col_ref.order_by("timestamp", direction=firestore.Query.DESCENDING).limit(limit or 100).get()

        async for doc in docs:
            yield self._process_checkpoint_data_common(doc.to_dict())

    # Para guardar un checkpoint
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        thread_id: str = config["configurable"]["thread_id"]
        timestamp: str = datetime.now(pytz.timezone('America/Bogota')).strftime('%Y-%m-%d %H:%M:%S')
        ts: str = checkpoint["id"]

        doc_ref: firestore.DocumentReference = self.db.collection(self.collection_name).document(thread_id)
        doc_ref.set({
            "checkpoint": self.serde.dumps(checkpoint),
            "metadata": self.serde.dumps(metadata),
            "thread_id": thread_id,
            "timestamp": timestamp
        })

        return {
            "configurable": {
                "thread_id": thread_id,
                "thread_ts": ts,
            },
        }

    # M√©todo asincronico para put
    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        thread_id: str = config["configurable"]["thread_id"]
        timestamp: str = datetime.now(pytz.timezone('America/Bogota')).strftime('%Y-%m-%d %H:%M:%S')
        ts: str = checkpoint["id"]

        doc_ref: firestore.AsyncDocumentReference = self.async_db.collection(self.collection_name).document(f"{thread_id}_{timestamp}")
        await doc_ref.set({
            "checkpoint": self.serde.dumps(checkpoint),
            "metadata": self.serde.dumps(metadata),
            "thread_id": thread_id,
            "timestamp": timestamp
        })

        return {
            "configurable": {
                "thread_id": thread_id,
                "thread_ts": ts,
            },
        }

    def put_writes(
        self,
        config: dict,
        writes: Sequence[Tuple[str, Any]],
        task_id: str,
    ) -> None:
        """
        Guarda escrituras intermedias vinculados a un checkpoint.


        Args:
            config (dict): Configuraci√≥n del checkpoint.
            writes (Sequence[Tuple[str, Any]]): Lista de escrituras intermedias, cada uno como una pareja (channel, value).
            task_id (str): Identificador de la tarea creando las escrituras intermedias.
        """
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"]["checkpoint_ns"]
        checkpoint_id = config["configurable"]["checkpoint_id"]

        for idx, (channel, value) in enumerate(writes):
            doc_id = f"{thread_id}"  # Documento para esta base
            type_, serialized_value = self.serde.dumps_typed(value)

            write_data = {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": checkpoint_id,
                "task_id": task_id,
                "channel": channel,
                "type": type_,
                "value": serialized_value,
            }

            # Guardado de documento
            self.db.collection(self.pw_collection_name).document(doc_id).set(write_data, merge=True)

    def _process_checkpoint_data_common(self, data: Dict[str, Any]) -> CheckpointTuple:
        checkpoint: Checkpoint = self.serde.loads(data["checkpoint"])
        metadata: CheckpointMetadata = self.serde.loads(data["metadata"])
        thread_id: str = data["thread_id"]
        thread_ts: str = data["timestamp"]

        config: RunnableConfig = {"configurable": {"thread_id": thread_id, "thread_ts": thread_ts}}
        return CheckpointTuple(config=config, checkpoint=checkpoint, metadata=metadata, parent_config=None)

## Grafo

LangGraph modela el comportamiento del agente como un **grafo de nodos y conexiones**. Cada nodo representa una acci√≥n o decisi√≥n (como invocar el LLM, ejecutar una tool, o verificar una condici√≥n), y las conexiones determinan el flujo entre ellos. Esto nos da una ventaja importante sobre otros enfoques: el comportamiento del agente es **expl√≠cito, trazable y modificable**, en lugar de ser una caja negra.

El patr√≥n que usaremos es **ReAct** (*Reason + Act*) ‚Äî el agente razona sobre qu√© hacer, act√∫a ejecutando una tool si es necesario, observa el resultado, y vuelve a razonar hasta tener una respuesta final:
```mermaid
graph TD
    A([üßë Usuario]) --> B[LLM]
    B --> C{¬øNecesita tool?}
    C -->|S√≠| D[Ejecutar Tool]
    D -->|Resultado| B
    C -->|No| E([üí¨ Respuesta final])
```

In [None]:
class AgentState(MessagesState):
    thread_id: str
    solicitud: str
    ultima_pregunta: str
    ultima_respuesta: str




def model_call(state: AgentState) -> AgentState:

    system_prompt = SystemMessage(content=AGENT_SYSTEM_PROMPT)

    print(f"System prompt: {system_prompt}")
    print(f"Messages: {state['messages']}")

    response = llm_with_tools.invoke([system_prompt] + state['messages'])

    return {'messages': response}

def should_continue(state: AgentState) -> str:

    messages = state['messages']
    last_message = messages[-1]

    if not last_message.tool_calls:
        return "end"
    else:
        return "continue"


graph = StateGraph(AgentState)

graph.add_node('agent', model_call)
tool_node = ToolNode(tools=tools)
graph.add_node('tools', tool_node)

graph.add_edge(START, 'agent')
graph.add_conditional_edges(
    'agent',
    should_continue,
    {
        "continue": "tools",
        "end": END
    }
)
graph.add_edge('tools', 'agent')

memory = FirestoreSaver(database="workshop-agent-memory", collection_name="workshop-agent-memory", pw_collection_name="workshop-agent-memory-pw")


app = graph.compile(checkpointer=memory)

## Llamando al agente

In [None]:
def call_agente_workshop(thread_id: str, user_message: str):

    input_message = HumanMessage(content=user_message)

    configurable = {
        "metadata": {"thread_id": thread_id},
        "configurable": {"thread_id": thread_id}
    }

    initial_state = {
        "messages": [input_message],
        "thread_id": thread_id
    }

    result = app.invoke(input=initial_state, config=configurable)

    last_message = result.get("messages", [])[-1]
    ultima_pregunta = result.get("ultima_pregunta", '')
    ultima_respuesta = result.get("ultima_respuesta", '')

    return {
        "output_message": last_message.content,
        "thread_id": thread_id,
        "ultima_pregunta": ultima_pregunta,
        "ultima_respuesta": ultima_respuesta

    }

In [None]:
mi_nombre = "andres_silva"
intento = "1"

thread_id = f'{mi_nombre}_{intento}'

texto_conversaci√≥n = "Hola"


output_agente = call_agente_especializado_pac(thread_id=thread_id, user_message=texto_conversaci√≥n)

print(f'RESPUESTA AGENTE:\n\n{output_agente.get("output_message")[-1]}')

System prompt: content='\nEres un asistente virtual llamado "Segur√≠n", un experto asesor que trabaja para una empresa aseguradora.\nTu trabajo consiste en ayudar al cliente cuando tenga dudas de qu√© producto o coberturas sirven para suplir sus necesidades.\n\n### REGLA CR√çTICA:\nSi no tienes la suficiente informaci√≥n completa para llamar una tool, debes pedirle al usuario que te proporcione la informaci√≥n faltante.\n\n### HERRAMIENTAS O TOOLS DISPONIBLES:\n- call_rag_productos_y_coberturas: Esta herramienta te permite obtener la informaci√≥n m√°s relevante de productos o coberturas que tiene la aseguradora. S√≥lo utiliza esta herramienta cuando el usuario tenga una pregunta expl√≠cita de un seguro o cobertura.\n    Par√°metros:\n        - pregunta: Es la pregunta que el usuario tiene aserca de seguros o coberturas.\n\n### PROTOCOLO GENERAL\n- Al inicio de la conversaci√≥n, aseg√∫rate de saludar al usuario present√°ndote con nombre propio, y manteniendo un lenguaje formal pero amab

  llm = ChatVertexAI(


System prompt: content='\nEres un asistente virtual llamado "Segur√≠n", un experto asesor que trabaja para una empresa aseguradora.\nTu trabajo consiste en ayudar al cliente cuando tenga dudas de qu√© producto o coberturas sirven para suplir sus necesidades.\n\n### REGLA CR√çTICA:\nSi no tienes la suficiente informaci√≥n completa para llamar una tool, debes pedirle al usuario que te proporcione la informaci√≥n faltante.\n\n### HERRAMIENTAS O TOOLS DISPONIBLES:\n- call_rag_productos_y_coberturas: Esta herramienta te permite obtener la informaci√≥n m√°s relevante de productos o coberturas que tiene la aseguradora. S√≥lo utiliza esta herramienta cuando el usuario tenga una pregunta expl√≠cita de un seguro o cobertura.\n    Par√°metros:\n        - pregunta: Es la pregunta que el usuario tiene aserca de seguros o coberturas.\n\n### PROTOCOLO GENERAL\n- Al inicio de la conversaci√≥n, aseg√∫rate de saludar al usuario present√°ndote con nombre propio, y manteniendo un lenguaje formal pero amab