# Agentic RAG con Autogen utilizando Azure AI Services


In [2]:
import os
import time
import asyncio
from typing import List, Dict

from autogen_agentchat.agents import AssistantAgent
from autogen_core import CancellationToken
from autogen_agentchat.messages import TextMessage
from azure.core.credentials import AzureKeyCredential
from autogen_ext.models.azure import AzureAIChatCompletionClient

from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType, SearchableField

from dotenv import load_dotenv

load_dotenv()

True

## Creando el cliente

Inicializamos el cliente Azure AI Chat Completion, después lo usaremos para interactuar con el servicio de Azure OpenAI para generar respuestas a las queries del usuario.

In [3]:
client = AzureAIChatCompletionClient(
    model="gpt-4o-mini",
    endpoint="https://models.inference.ai.azure.com",
    credential=AzureKeyCredential(os.getenv("GITHUB_TOKEN")),
    model_info={
        "json_output": True,
        "function_calling": True,
        "vision": True,
        "family": "unknown",
    },
)

## Vector Database 

Inicializamos Azure AI Search con memoria persistente y varios ejemplos de documentos. Azure AI Search sera utilizado para guardar y obtener los documentos que después serán el contexto para generar respuestas congruentes. 

In [None]:
# Inicializamos AzureAI
search_service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
search_api_key = os.getenv("AZURE_SEARCH_API_KEY")
index_name = "travel-documents"

search_client = SearchClient(
    endpoint=search_service_endpoint,
    index_name=index_name,
    credential=AzureKeyCredential(search_api_key)
)

index_client = SearchIndexClient(
    endpoint=search_service_endpoint,
    credential=AzureKeyCredential(search_api_key)
)

# Index schema
fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True),
    SearchableField(name="content", type=SearchFieldDataType.String)
]

index = SearchIndex(name=index_name, fields=fields)

# Creamos el index
index_client.create_index(index)


documents = [
    {"id": "1", "content": "Paraguay Travel ofrece paquetes de vacaciones de lujo a destinos exóticos en todo el mundo."},
    {"id": "2", "content": "Nuestros servicios de viaje premium incluyen planificación personalizada de itinerarios y asistencia de conserjería 24/7."},
    {"id": "3", "content": "El seguro de viaje de Paraguay cubre emergencias médicas, cancelaciones de viaje y pérdida de equipaje."},
    {"id": "4", "content": "Los destinos populares incluyen Cerro Paraguari, los Alpes Suizos y safaris africanos."},
    {"id": "5", "content": "Paraguay Travel ofrece acceso exclusivo a hoteles boutique y tours privados con guía."}
]


# Añadimos documentos al index
search_client.upload_documents(documents)


[<azure.search.documents._generated.models._models_py3.IndexingResult at 0x2299c1b8200>,
 <azure.search.documents._generated.models._models_py3.IndexingResult at 0x2299c1b9d90>,
 <azure.search.documents._generated.models._models_py3.IndexingResult at 0x2299c1b9bb0>,
 <azure.search.documents._generated.models._models_py3.IndexingResult at 0x2299c1b9d00>,
 <azure.search.documents._generated.models._models_py3.IndexingResult at 0x2299c1b9c70>]

In [None]:
def get_retrieval_context(query: str) -> str:
    results = search_client.search(query)
    context_strings = []
    for result in results:
        context_strings.append(f"Documento: {result['content']}")
    return "\n\n".join(context_strings) if context_strings else "No hay resultados"

def get_weather_data(location: str) -> str:
    """
    Acá simulamos la obtención de datos del clima, podemos conectarnos también a una API del clima para esto
    """
    weather_database = {
        "new york": {"temperature": 72, "condition": "Parcialmente nublado", "humidity": 65, "wind": "10 mph"},
        "london": {"temperature": 60, "condition": "Lluvioso", "humidity": 80, "wind": "15 mph"},
        "tokyo": {"temperature": 75, "condition": "Soleado", "humidity": 50, "wind": "5 mph"},
        "sydney": {"temperature": 80, "condition": "Despejado", "humidity": 45, "wind": "12 mph"},
        "paris": {"temperature": 68, "condition": "Nublado", "humidity": 70, "wind": "8 mph"},
    }
    
    # Normalizar la cadena de ubicación
    location_key = location.lower()
    
    # Verificar si tenemos datos para esta ubicación
    if location_key in weather_database:
        data = weather_database[location_key]
        return f"Clima para {location.title()}:\n" \
               f"Temperatura: {data['temperature']}°F\n" \
               f"Condición: {data['condition']}\n" \
               f"Humedad: {data['humidity']}%\n" \
               f"Viento: {data['wind']}"
    else:
        return f"No hay datos meteorológicos disponibles para {location}."


## Configuración del agente

Configuramos el agente de retrieval y el asistente. El retrieval va a ser un agente especializado en encontrar información relevante utilizando búsqueda semántica, mientras que el asistente generará respuestas detalladas basadas en la información del retrieval. 

In [None]:
assistant = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message=(
        "Eres un asistente de IA útil que proporciona respuestas utilizando ÚNICAMENTE el contexto proporcionado. "
        "NO incluyas información externa. Basa tu respuesta completamente en el contexto dado a continuación."
    ),
)


## RAGEvaluator Class

La clase RAGEvaluation va a evaluar las respuestas basadas en metricas como el largor, las fuentes de información citadas, el tiempo de respuesta y la relevancia del contexto

In [None]:
class RAGEvaluator:
    def __init__(self):
        self.responses: List[Dict] = []

    def evaluate_response(self, query: str, response: str, context: List[Dict]) -> Dict:
        # Las métricas: 
        start_time = time.time()
        metrics = {
            'response_length': len(response),
            'source_citations': sum(1 for doc in context if doc["content"] in response),
            'evaluation_time': time.time() - start_time,
            'context_relevance': self._calculate_relevance(query, context)
        }
        self.responses.append({
            'query': query,
            'response': response,
            'metrics': metrics
        })
        return metrics

    def _calculate_relevance(self, query: str, context: List[Dict]) -> float:
        # Generamos un score de las métricas
        return sum(1 for c in context if query.lower() in c["content"].lower()) / len(context)

## Procesamiento del Query con RAG

Definimos una función ask_rag que va a mandar el query (del backend) al asistente, procesa la respuesta y luego la evalúa. Esta función maneja la interacción con el asistente y luego usa el evaluador para medir la calidad de las respuestas.

In [None]:
async def ask_unified_rag(query: str, evaluator: RAGEvaluator, location: str = None):
    """
    Una función RAG unificada que combina tanto la recuperación de documentos
    como datos meteorológicos según la consulta y un parámetro opcional de ubicación.
    
    Args:
        query: La pregunta del usuario
        evaluator: El evaluador RAG para medir la calidad de la respuesta
        location: Ubicación opcional para consultas sobre el clima
    """
    try:
        # Obtener contexto de ambas fuentes
        retrieval_context = get_retrieval_context(query)
        
        # Si se proporciona ubicación, agregar datos del clima
        weather_context = ""
        if location:
            weather_context = get_weather_data(location)
            weather_intro = f"\nInformación del clima para {location}:\n"
        else:
            weather_intro = ""
        
        # Ampliar la consulta con ambos contextos si están disponibles
        augmented_query = (
            f"Contexto recuperado:\n{retrieval_context}\n\n"
            f"{weather_intro}{weather_context}\n\n"
            f"Consulta del usuario: {query}\n\n"
            "Basándote ÚNICAMENTE en el contexto anterior, por favor proporciona la respuesta."
        )

        # Enviar la consulta ampliada como mensaje del usuario
        start_time = time.time()
        response = await assistant.on_messages(
            [TextMessage(content=augmented_query, source="user")],
            cancellation_token=CancellationToken(),
        )
        processing_time = time.time() - start_time

        # Crear contexto combinado para la evaluación
        combined_context = documents.copy()  # Comenzar con los documentos de viaje
        
        # Agregar el clima como documento si existe
        if location and weather_context:
            combined_context.append({"id": f"weather-{location}", "content": weather_context})
        
        # Evaluar la respuesta
        metrics = evaluator.evaluate_response(
            query=query,
            response=response.chat_message.content,
            context=combined_context
        )
        
        result = {
            'response': response.chat_message.content,
            'processing_time': processing_time,
            'metrics': metrics,
        }
        
        # Agregar ubicación al resultado si se proporciona
        if location:
            result['location'] = location
            
        return result
    except Exception as e:
        print(f"Error al procesar la consulta unificada: {e}")
        return None


# Ejemplo

In [None]:
async def main():
    evaluator = RAGEvaluator()
    
    # Definir consultas del usuario similares al ejemplo de Semantic Kernel
    user_inputs = [
        # Consultas solo de viaje
        {"query": "¿Puedes explicar la cobertura del seguro de viaje de Paraguay Travel?"},
        
        # Consultas solo de clima 
        {"query": "¿Cuál es la condición climática actual en London?", "location": "london"},
        
        # Consultas combinadas
        {"query": "¿Cuál es un destino frío ofrecido por Paraguay Travel y cuál es su temperatura?", "location": "london"},
    ]
    
    print("Procesando consultas:")
    for query_data in user_inputs:
        query = query_data["query"]
        location = query_data.get("location")
        
        if location:
            print(f"\nProcesando consulta para {location}: {query}")
        else:
            print(f"\nProcesando consulta: {query}")
        
        # Obtener el contexto RAG para mostrarlo (similar al ejemplo de Semantic Kernel)
        retrieval_context = get_retrieval_context(query)
        weather_context = get_weather_data(location) if location else ""
        
        # Mostrar el contexto RAG para mayor transparencia
        print("\n--- Contexto RAG ---")
        print(retrieval_context)
        if weather_context:
            print(f"\n--- Contexto del clima para {location} ---")
            print(weather_context)
        print("-------------------\n")
            
        result = await ask_unified_rag(query, evaluator, location)
        if result:
            print("Respuesta:", result['response'])
            print("\nMétricas:", result['metrics'])
        print("\n" + "="*60 + "\n")


## Para hacer correr el script

In [None]:
if __name__ == "__main__":
    if asyncio.get_event_loop().is_running():
        await main()
    else:
        asyncio.run(main())