# Agentic RAG with Autogen using Azure AI Services

This notebook demonstrates implementing Retrieval-Augmented Generation (RAG) using Autogen agents with enhanced evaluation capabilities.

In [1]:
# Import the OS module to access environment variables like API keys
import os
# Import time module to measure processing and evaluation times
import time
# Import asyncio for asynchronous programming with async/await patterns
import asyncio
# Import typing hints for better code documentation and IDE support
from typing import List, Dict

# Import the AssistantAgent class from autogen_agentchat - this is our main AI agent
from autogen_agentchat.agents import AssistantAgent
# Import CancellationToken to handle operation cancellation in async operations
from autogen_core import CancellationToken
# Import TextMessage to create message objects for agent communication
from autogen_agentchat.messages import TextMessage
# Import Azure credential handling for secure authentication
from azure.core.credentials import AzureKeyCredential
# Import the Azure AI Chat Completion Client for OpenAI model interactions
from autogen_ext.models.azure import AzureAIChatCompletionClient

# Import SearchClient to perform search operations on Azure AI Search index
from azure.search.documents import SearchClient
# Import SearchIndexClient to create and manage search indexes
from azure.search.documents.indexes import SearchIndexClient
# Import model classes to define the structure of our search index
from azure.search.documents.indexes.models import SearchIndex, SimpleField, SearchFieldDataType, SearchableField

# Import dotenv to load environment variables from .env file
from dotenv import load_dotenv

# Load environment variables from .env file into the current environment
load_dotenv()

True

In [2]:
# Import the entire azure search models module for debugging purposes
import azure.search.documents.indexes.models as azure_module
# Print the type of SearchableField class to verify it's properly imported
print(type(azure_module.SearchableField))



<class 'function'>


## Create the Client 

First, we initialize the Azure AI Chat Completion Client. This client will be used to interact with the Azure OpenAI service to generate responses to user queries.

In [3]:
# import autogen_ext.models.azure as M

#help(M.AzureAIChatCompletionClient)

#print(M.AzureAIChatCompletionClient.__doc__)

#  |  Args:
#  |      endpoint (str): The endpoint to use. **Required.**
#  |      credential (union, AzureKeyCredential, AsyncTokenCredential): The credentials to use. **Required**
#  |      model_info (ModelInfo): The model family and capabilities of the model. **Required.**
#  |      model (str): The name of the model. **Required if model is hosted on GitHub Models.**
#  |      frequency_penalty: (optional,float)
#  |      presence_penalty: (optional,float)
#  |      temperature: (optional,float)
#  |      top_p: (optional,float)
#  |      max_tokens: (optional,int)
#  |      response_format: (optional, literal["text", "json_object"])
#  |      stop: (optional,List[str])
#  |      tools: (optional,List[ChatCompletionsToolDefinition])
#  |      tool_choice: (optional,Union[str, ChatCompletionsToolChoicePreset, ChatCompletionsNamedToolChoice]])
#  |      seed: (optional,int)

In [5]:
# Create an Azure AI Chat Completion Client to interact with the GPT model
client = AzureAIChatCompletionClient(
    # Specify the model to use - gpt-4o-mini is a cost-effective version of GPT-4
    model="gpt-4o-mini",
    # Set the endpoint URL for Azure's model inference service
    endpoint="https://models.inference.ai.azure.com",
    # Use GitHub token for authentication (stored in environment variable)
    credential=AzureKeyCredential(os.getenv("GITHUB_TOKEN")),
    # Define model capabilities for the client to understand what features are available
    model_info={
        # Model can output structured JSON responses
        "json_output": True,
        # Model supports function calling capabilities
        "function_calling": True,
        # Model can process and understand images
        "vision": True,
        # Set family as unknown since this is a custom configuration
        "family": "unknown",
    },
)

## Vector Database Initialization

We initialize Azure AI Search with persistent storage and add enhanced sample documents. Azure AI Search will be used to store and retrieve documents that provide context for generating accurate responses.

In [None]:
# Initialize Azure AI Search with persistent storage
# Get the Azure Search service endpoint URL from environment variables
search_service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
# Get the API key for Azure Search authentication from environment variables
search_api_key = os.getenv("AZURE_SEARCH_API_KEY")
# Define the name of our search index that will store travel documents
index_name = "travel-documents"

# Create a SearchClient to perform search operations on our index
search_client = SearchClient(
    endpoint=search_service_endpoint,
    index_name=index_name,
    credential=AzureKeyCredential(search_api_key)
)

# Create a SearchIndexClient to manage the search index structure
index_client = SearchIndexClient(
    endpoint=search_service_endpoint,
    credential=AzureKeyCredential(search_api_key)
)

# Define the index schema - this determines what fields our documents will have
fields = [
    # Define 'id' field as a simple string that serves as the primary key
    SimpleField(name="id", type=SearchFieldDataType.String, key=True),
    # Define 'content' field as searchable text that can be queried
    SearchableField(name="content", type=SearchFieldDataType.String)
]

# Create a SearchIndex object with our defined schema
index = SearchIndex(name=index_name, fields=fields)

# Create the actual index in Azure Search service
index_client.create_index(index)

# Enhanced sample documents - these represent our knowledge base
documents = [
    # Document about Contoso's luxury travel services
    {"id": "1", "content": "Contoso Travel offers luxury vacation packages to exotic destinations worldwide."},
    # Document about premium travel features
    {"id": "2", "content": "Our premium travel services include personalized itinerary planning and 24/7 concierge support."},
    # Document about travel insurance coverage
    {"id": "3", "content": "Contoso's travel insurance covers medical emergencies, trip cancellations, and lost baggage."},
    # Document listing popular travel destinations
    {"id": "4", "content": "Popular destinations include the Maldives, Swiss Alps, and African safaris."},
    # Document about exclusive travel experiences
    {"id": "5", "content": "Contoso Travel provides exclusive access to boutique hotels and private guided tours."}
]

# Upload all documents to the search index for retrieval
search_client.upload_documents(documents)


In [8]:
def get_retrieval_context(query: str) -> str:
    """
    Performs semantic search on the Azure AI Search index to find relevant documents.
    Args:
        query: The user's search query
    Returns:
        Formatted string containing all relevant document content
    """
    # Execute search query against the Azure Search index
    results = search_client.search(query)
    # Initialize list to store formatted results
    context_strings = []
    # Iterate through search results and format each document
    for result in results:
        context_strings.append(f"Document: {result['content']}")
    # Join all results with double newlines, or return "No results found" if empty
    return "\n\n".join(context_strings) if context_strings else "No results found"

def get_weather_data(location: str) -> str:
    """
    Simulates retrieving weather data for a given location.
    In a real-world scenario, this would call a weather API.
    Args:
        location: The city/location to get weather for
    Returns:
        Formatted weather information string
    """
    # Simulated weather database - in production this would be an API call
    weather_database = {
        # Weather data for New York with temperature, condition, humidity, and wind
        "new york": {"temperature": 72, "condition": "Partly Cloudy", "humidity": 65, "wind": "10 mph"},
        # Weather data for London with rainy conditions
        "london": {"temperature": 60, "condition": "Rainy", "humidity": 80, "wind": "15 mph"},
        # Weather data for Tokyo with sunny conditions
        "tokyo": {"temperature": 75, "condition": "Sunny", "humidity": 50, "wind": "5 mph"},
        # Weather data for Sydney with clear skies
        "sydney": {"temperature": 80, "condition": "Clear", "humidity": 45, "wind": "12 mph"},
        # Weather data for Paris with cloudy conditions
        "paris": {"temperature": 68, "condition": "Cloudy", "humidity": 70, "wind": "8 mph"},
    }
    
    # Convert location to lowercase for consistent lookup
    location_key = location.lower()
    
    # Check if we have weather data for the requested location
    if location_key in weather_database:
        # Extract weather data for the location
        data = weather_database[location_key]
        # Return formatted weather information string
        return f"Weather for {location.title()}:\n" \
               f"Temperature: {data['temperature']}°F\n" \
               f"Condition: {data['condition']}\n" \
               f"Humidity: {data['humidity']}%\n" \
               f"Wind: {data['wind']}"
    else:
        # Return message if no data available for the location
        return f"No weather data available for {location}."

## Agent Configuration

We configure the retrieval and assistant agents. The retrieval agent is specialized in finding relevant information using semantic search, while the assistant generates detailed responses based on the retrieved information.

In [14]:
# Create agents with enhanced capabilities
# Initialize the main assistant agent that will generate responses
assistant = AssistantAgent(
    # Give the agent a descriptive name for identification
    name="assistant",
    # Connect the agent to our Azure AI Chat Completion client
    model_client=client,
    # Define the system message that instructs the agent's behavior
    system_message=(
        "You are a helpful AI assistant that provides answers using ONLY the provided context. "
        "Do NOT include any external information. Base your answer entirely on the context given below."
    ),
)

## RAGEvaluator Class

We define the `RAGEvaluator` class to evaluate the response based on various metrics like response length, source citations, response time, and context relevance.

In [15]:
class RAGEvaluator:
    """
    A class to evaluate the quality and performance of RAG (Retrieval-Augmented Generation) responses.
    Tracks various metrics to assess how well the system is performing.
    """
    def __init__(self):
        # Initialize empty list to store all response evaluations for analysis
        self.responses: List[Dict] = []

    def evaluate_response(self, query: str, response: str, context: List[Dict]) -> Dict:
        """
        Evaluates a RAG response against multiple quality metrics.
        Args:
            query: The original user query
            response: The generated response from the AI
            context: List of documents that were used as context
        Returns:
            Dictionary containing various evaluation metrics
        """
        # Record start time to measure evaluation performance
        start_time = time.time()
        # Calculate various quality metrics for the response
        metrics = {
            # Measure response length to assess verbosity
            'response_length': len(response),
            # Count how many source documents are actually cited in the response
            'source_citations': sum(1 for doc in context if doc["content"] in response),
            # Measure how long the evaluation process takes
            'evaluation_time': time.time() - start_time,
            # Calculate relevance score between query and available context
            'context_relevance': self._calculate_relevance(query, context)
        }
        # Store the evaluation data for later analysis
        self.responses.append({
            'query': query,
            'response': response,
            'metrics': metrics
        })
        # Return the calculated metrics
        return metrics

    def _calculate_relevance(self, query: str, context: List[Dict]) -> float:
        """
        Calculates a simple relevance score between the query and context documents.
        Args:
            query: The user's query
            context: List of context documents
        Returns:
            Float between 0 and 1 representing relevance score
        """
        # Count how many documents contain query terms (case-insensitive)
        # Divide by total documents to get a percentage score
        return sum(1 for c in context if query.lower() in c["content"].lower()) / len(context)

## Query Processing with RAG

We define the `ask_rag` function to send the query to the assistant, process the response, and evaluate it. This function handles the interaction with the assistant and uses the evaluator to measure the quality of the response.

In [16]:
async def ask_unified_rag(query: str, evaluator: RAGEvaluator, location: str = None):
    """
    A unified RAG function that combines both document retrieval and weather data
    based on the query and optional location parameter.
    
    Args:
        query: The user's question
        evaluator: The RAG evaluator to measure response quality
        location: Optional location for weather queries
    Returns:
        Dictionary containing response, metrics, and processing information
    """
    try:
        # Get context from both sources
        # Retrieve relevant documents from Azure Search based on the query
        retrieval_context = get_retrieval_context(query)
        
        # If location is provided, add weather data
        weather_context = ""
        if location:
            # Get current weather data for the specified location
            weather_context = get_weather_data(location)
            # Create a header for weather information section
            weather_intro = f"\nWeather Information for {location}:\n"
        else:
            # No weather intro if no location specified
            weather_intro = ""
        
        # Augment the query with both contexts if available
        # Combine retrieved documents and weather data into a comprehensive context
        augmented_query = (
            f"Retrieved Context:\n{retrieval_context}\n\n"
            f"{weather_intro}{weather_context}\n\n"
            f"User Query: {query}\n\n"
            "Based ONLY on the above context, please provide the answer."
        )

        # Send the augmented query as a user message
        # Record start time to measure response generation time
        start_time = time.time()
        # Send the augmented query to the assistant agent
        response = await assistant.on_messages(
            [TextMessage(content=augmented_query, source="user")],
            cancellation_token=CancellationToken(),
        )
        # Calculate how long it took to generate the response
        processing_time = time.time() - start_time

        # Create combined context for evaluation
        # Start with the existing travel documents
        combined_context = documents.copy()
        
        # Add weather as a document if it exists
        if location and weather_context:
            # Add weather data as a pseudo-document for evaluation purposes
            combined_context.append({"id": f"weather-{location}", "content": weather_context})
        
        # Evaluate the response using our RAG evaluator
        metrics = evaluator.evaluate_response(
            query=query,
            response=response.chat_message.content,
            context=combined_context
        )
        
        # Package the results into a comprehensive response object
        result = {
            'response': response.chat_message.content,
            'processing_time': processing_time,
            'metrics': metrics,
        }
        
        # Add location to result if provided for tracking purposes
        if location:
            result['location'] = location
            
        # Return the complete result package
        return result
    except Exception as e:
        # Handle any errors that occur during processing
        print(f"Error processing unified query: {e}")
        return None

# Example usage

We initialize the evaluator and define the queries that we want to process and evaluate.

In [17]:
async def main():
    """
    Main function that demonstrates the unified RAG system with different types of queries.
    Tests travel-only, weather-only, and combined queries to show system versatility.
    """
    # Initialize the RAG evaluator to track response quality
    evaluator = RAGEvaluator()
    
    # Define user queries similar to the Semantic Kernel example
    # Test different types of queries to demonstrate system capabilities
    user_inputs = [
        # Travel-only queries - tests document retrieval functionality
        {"query": "Can you explain Contoso's travel insurance coverage?"},
        
        # Weather-only queries - tests weather data integration
        {"query": "What's the current weather condition in London?", "location": "london"},
        
        # Combined queries - tests integration of both data sources
        {"query": "What is a good cold destination offered by Contoso and what is its temperature?", "location": "london"},
    ]
    
    # Print header for the demo
    print("Processing Queries:")
    # Process each query in the test set
    for query_data in user_inputs:
        # Extract the query text from the query data
        query = query_data["query"]
        # Extract location if specified (None if not present)
        location = query_data.get("location")
        
        # Print different headers based on whether location is specified
        if location:
            print(f"\nProcessing Query for {location}: {query}")
        else:
            print(f"\nProcessing Query: {query}")
        
        # Get the RAG context for printing (similar to the Semantic Kernel example)
        # Retrieve documents relevant to the query for transparency
        retrieval_context = get_retrieval_context(query)
        # Get weather data if location is specified
        weather_context = get_weather_data(location) if location else ""
        
        # Print the RAG context for transparency - show user what data is being used
        print("\n--- RAG Context ---")
        print(retrieval_context)
        if weather_context:
            print(f"\n--- Weather Context for {location} ---")
            print(weather_context)
        print("-------------------\n")
            
        # Process the query through our unified RAG system
        result = await ask_unified_rag(query, evaluator, location)
        # Display results if processing was successful
        if result:
            print("Response:", result['response'])
            print("\nMetrics:", result['metrics'])
        # Print separator between queries for readability
        print("\n" + "="*60 + "\n")

## Run the Script

We check if the script is running in an interactive environment or a standard script, and run the main function accordingly.

In [18]:
# Check if this script is being run directly (not imported as a module)
if __name__ == "__main__":
    # Check if we're already in an async event loop (like in Jupyter notebooks)
    if asyncio.get_event_loop().is_running():
        # If in an existing loop, await the main function directly
        await main()
    else:
        # If not in a loop, create a new event loop and run main function
        asyncio.run(main())

Processing Queries:

Processing Query: Can you explain Contoso's travel insurance coverage?

--- RAG Context ---
Document: Contoso's travel insurance covers medical emergencies, trip cancellations, and lost baggage.

Document: Contoso Travel offers luxury vacation packages to exotic destinations worldwide.

Document: Our premium travel services include personalized itinerary planning and 24/7 concierge support.

Document: Contoso Travel provides exclusive access to boutique hotels and private guided tours.
-------------------

Response: Contoso's travel insurance covers medical emergencies, trip cancellations, and lost baggage.

Metrics: {'response_length': 92, 'source_citations': 1, 'evaluation_time': 8.106231689453125e-06, 'context_relevance': 0.0}



Processing Query for london: What's the current weather condition in London?

--- RAG Context ---
Document: Popular destinations include the Maldives, Swiss Alps, and African safaris.

--- Weather Context for london ---
Weather for Lond