# RAG Retrieval-score Check and Web search with [AutoGen](https://github.com/microsoft/autogen)

This sample demonstrates how to evaluate the retrieval score of a RAG application and how to use the AutoGen library to search the web for relevant information.

> ✨ **_Note_** <br>
> Please check the Azure AI Agent Capability in [quick start page](https://learn.microsoft.com/en-us/azure/ai-services/agents/quickstart?pivots=programming-language-python-azure)

## Prerequisites

Configure a Python virtual environment for 3.10 or later:

1.  open the Command Palette (Ctrl+Shift+P).
1.  Search for Python: Create Environment.
1.  select Venv / Conda and choose where to create the new environment.
1.  Select the Python interpreter version. Create with version 3.10 or later.

## Set up your environment

Git clone the repository to your local machine.


```bash
git clone https://github.com/Azure/rag-innovator-lab
```

Create a virtual environment and install the required packages.
```bash
python3 -m venv your_env_name
source your_env_name/bin/activate
pip install -r requirements.txt
```

Create an .env file based on the .env-sample file. Copy the new .env file to the folder containing your notebook and update the variables.

## 🔨 Current Support and Limitations (as of 2025-01-12)
- Check the available models for bing grounding. Grounding with Bing Search only works with the following Azure OpenAI models: gpt-3.5-turbo-0125, gpt-4-0125-preview, gpt-4-turbo-2024-04-09, gpt-4o-0513 https://learn.microsoft.com/en-us/azure/ai-services/agents/how-to/tools/bing-grounding?tabs=python&pivots=overview
- Check the region support for the Azure AI Evaluation SDK. https://learn.microsoft.com/en-us/azure/ai-studio/concepts/evaluation-metrics-built-in?tabs=warning#region-support

In [None]:
# Get packages
import os
import asyncio
import json
import requests
import logging
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.core.credentials import AzureKeyCredential
from openai import AzureOpenAI
from dotenv import load_dotenv
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
from autogen_core import DefaultTopicId, default_subscription, type_subscription
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import BingGroundingTool
from dataclasses import dataclass

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
    type_subscription,
)
from autogen_core.models import LLMMessage, ChatCompletionClient, SystemMessage, UserMessage, AssistantMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core import CancellationToken
from autogen_core.tools import FunctionTool, Tool, ToolSchema
from typing_extensions import Annotated
from typing import List
from autogen_core.tool_agent import ToolAgent, tool_agent_caller_loop

load_dotenv(override=True)

aoai_api_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
aoai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-06-01")
aoai_chat_deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
aoai_embedding_deployment_name = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME")
azure_openai_embedding_dimensions = int(os.getenv("AZURE_OPENAI_EMBEDDING_DIMENSIONS", 1536))
bing_connnection_name = os.getenv("BING_CONNECTION_NAME")
azure_ai_pjt_connection_str = os.getenv("AZURE_AI_PROJECT_CONNECTION_STR")

In [2]:
# Query rewrite - Langchain based implementation 
from langchain_openai import AzureChatOpenAI 
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

llm = AzureChatOpenAI (
    deployment_name=aoai_chat_deployment_name, 
    azure_endpoint=aoai_api_endpoint,
    api_version=azure_openai_api_version,
    openai_api_key=aoai_api_key,
    )

rewriter_prompt_template = """
Generate search keyword from a user question 
to be more specific, detailed, and likely to retrieve relevant information, allowing for a more accurate response through web search.
Don't include the additional context from the user question.

User question: {user_question}
Revised web search query:
"""

rewriter_prompt = ChatPromptTemplate.from_template(rewriter_prompt_template)
rewriter_chain = rewriter_prompt | llm | StrOutputParser()

user_question ="Tell me some fun things I can do in Cornwall"

search_query = rewriter_chain.invoke({"user_question": user_question})
print(search_query)

"Fun activities and attractions in Cornwall for tourists"


In [3]:
@dataclass
class Message:
    content: str
    source: str

@dataclass
class ClassifiedMessage:
    intent: str
    content: str
    source: str

default_topic_type = "default"
general_type = "GeneralAgent"
websearch_type = "WebSearchAgent"
user_topic_type = "User"
retrieve_topic_type = "RetrieveAgent"
groundness_topic_type = "EvalAgent"


In [4]:
# Initialize the azclient
aoai_client = AzureOpenAIChatCompletionClient(
    azure_endpoint=aoai_api_endpoint,
    model = aoai_chat_deployment_name,
    api_version=azure_openai_api_version,
    api_key=aoai_api_key
)

from pydantic import BaseModel
class IntentType(BaseModel):
    type: str

# Step 1: Create an client object which will contain the connection string
project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(),
    # conn_str='Your Azure AI Foundation Connection String',
    # copied from your Azure AI Foundry project.
    conn_str=azure_ai_pjt_connection_str
)

In [6]:
@type_subscription(topic_type=default_topic_type)
class IntentRouterAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A Intent classification agent.")
        self._system_message = SystemMessage(
            content=(
                """
                    You are an expert at routing a user question to LLM or vectorstore or websearch.
                    The LLM covers casual topic such as greeting, small talks.
                    Use the LLM for questions on casual topics.
                    The vectorstore contains documents related to hotel information in New York.
                    Use the vectorstore for questions on the hotel related topics. For all else, websearch.
                    response inent_type such as LLM, vectorstore, or websearch.
                """
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_user_question(self, message: UserMessage, ctx: MessageContext) -> None:
        print(f"{'-'*80}\n{self.id.type}:\n{message.content}")
        prompt = f"User question: {message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            extra_create_args={"response_format": IntentType},
            cancellation_token=ctx.cancellation_token
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")
        
        await self.publish_message(ClassifiedMessage(intent=json.loads(response)["type"], content=prompt, source=self.id.key), topic_id=TopicId(type=retrieve_topic_type, source="default"))

In [7]:
@type_subscription(topic_type=retrieve_topic_type)
class GeneralAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A general agent.")
        self._system_message = SystemMessage(
            content=(
                """
                    You are an helper agent that can answer general questions.
                """
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_user_question(self, message: ClassifiedMessage, ctx: MessageContext) -> None:
        print(message)
        if(message.intent == "LLM"):
            prompt = message.content
            llm_result = await self._model_client.create(
                messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
                cancellation_token=ctx.cancellation_token,
            )
            response = llm_result.content
            assert isinstance(response, str)
            print(f"{'-'*80}\n{self.id.type}:\n{response}")
            
            await self.publish_message(AssistantMessage(content=response, source=self.id.key), topic_id=TopicId(type=user_topic_type, source=self.id.key))

## Create and connect a new Grounding with Bing Search resource
> ✨ **_Note_** <br>
> Your usage of Grounding with Bing Search may incur costs. See the pricing page for details. [pricing](https://www.microsoft.com/en-us/bing/apis/grounding-pricing)

1. Create a new Grounding with Bing Search resource in the Azure portal, and select the different fields in the creation form. Make sure you create this Grounding with Bing Search resource in the same resource group as your Azure AI Agent, AI Project, and other resources.
![bing grounding](https://learn.microsoft.com/en-us/azure/ai-services/agents/media/tools/bing/resource-azure-portal.png#lightbox)
2. Select the Grounding with Bing Search resource you have created and copy any of the API keys.
![copy keys](https://learn.microsoft.com/en-us/azure/ai-services/agents/media/tools/bing/key-resource-azure-portal.png#lightbox)
3. Go to the Azure AI Foundry > management center > Create Connection > API key 
- Endpoint: https://api.bing.microsoft.com/
- Key: YOUR_API_KEY
- Connection name: YOUR_CONNECTION_NAME. This name will be used in the notebook to connect to the Bing Search resource.
- Access: you can choose either this project only or shared to all projects.
4. Copy your connection name and paste it in the .env file.
5. Copy your project connection string and paste it in the .env file.
![copy project string](../images/portal-connection-string.png)


In [8]:
# Step 2: Enable the Grounding with Bing search tool
# Azure Open AI connection이 많은 경우 에러가 날 수 있으니 주의 
# https://github.com/Azure/azure-sdk-for-python/issues/38921


bing_connection = project_client.connections.get(
    # connection_name='Your Bing Connection Name'
    connection_name=bing_connnection_name
)

conn_id = bing_connection.id


In [9]:
# To bing grounding connection Test
# once you test the connection, you need to restart the kernel to use the new connection in the other code block

bing = BingGroundingTool(connection_id=conn_id)

agent = project_client.agents.create_agent(
    model=aoai_chat_deployment_name,
    name="my-assistant",
    instructions="""
        web search
    """,
    tools=bing.definitions,
    headers={"x-ms-enable-preview": "true"}
)
print(f"Created agent, ID: {agent.id}")

# Create thread for communication
thread = project_client.agents.create_thread()
print(f"Created thread, ID: {thread.id}")

# Create message to thread
message = project_client.agents.create_message(
    thread_id=thread.id,
    role="user",
    content="삼성전자 CES 2025 모니터 신모델",
)

print(message)

In [10]:
from autogen_core.tools import FunctionTool, ToolSchema

# Step 3: Create agent using project client with the bing tool and process assistant run
async def bing_search_tool(query: str) -> str:
    print("This is Bing for Azure AI Agent Service .......")
    bing = BingGroundingTool(connection_id=conn_id)
    print(f"query ID: {query}")
    print(project_client)
        
    
    agent = project_client.agents.create_agent(
        model=aoai_chat_deployment_name,
        name="bing-search-assistant",
        instructions="""
            Your only tool is websearch_tool - use it to find information.
            You make only one search call at a time.
            Once you have the results, you never do calculations based on them.
        """,
        tools=bing.definitions,
        headers={"x-ms-enable-preview": "true"}
    )
    print(f"Created agent, ID: {agent.id}")

    # Create thread for communication
    thread = project_client.agents.create_thread()
    print(f"Created thread, ID: {thread.id}")

    # Create message to thread
    message = project_client.agents.create_message(
        thread_id=thread.id,
        role="user",
        content=query,
    )
    print(f"SMS: {message}")
    # Create and process agent run in thread with tools
    run = project_client.agents.create_and_process_run(thread_id=thread.id, assistant_id=agent.id)
    print(f"Run finished with status: {run.status}")

    if run.status == "failed":
        print(f"Run failed: {run.last_error}")

    # Delete the assistant when done
    project_client.agents.delete_agent(agent.id)
    print("Deleted agent")

    # Fetch and log all messages
    messages = project_client.agents.list_messages(thread_id=thread.id)
    print("Messages:"+ messages["data"][0]["content"][0]["text"]["value"])
    return messages["data"][0]["content"][0]["text"]["value"]

In [11]:
@type_subscription(topic_type=retrieve_topic_type)
class WebSearchUseAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient, tool_schema: List[ToolSchema], tool_agent_type: str) -> None:
        super().__init__("Use tools to solve tasks.")
        self._model_client = model_client
        self._system_message = SystemMessage(
            content=(
                """
                You are a tool usage agent who can delegate your task to the ToolAgent.
                """
            )
        )
        self._tool_schema = tool_schema
        self._tool_agent_id = AgentId(tool_agent_type, self.id.key)

    @message_handler
    async def handle_user_question(self, message: ClassifiedMessage, ctx: MessageContext) -> None:
        if(message.intent == "websearch"):
            
            # Create a session of messages.
            session: List[LLMMessage] = [UserMessage(content=message.content, source="user")]
            # Run the caller loop to handle tool calls.
            messages = await tool_agent_caller_loop(
                self,
                tool_agent_id=self._tool_agent_id,
                model_client=self._model_client,
                input_messages=session,
                tool_schema=self._tool_schema,
                cancellation_token=ctx.cancellation_token,
            )

            # Return the final response.
            assert isinstance(messages[-1].content, str)
            await self.publish_message(AssistantMessage(content=messages[-1].content, source=self.id.key), topic_id=TopicId(type=user_topic_type, source=self.id.key))            

In [12]:
@type_subscription(topic_type=user_topic_type)
class UserAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("A user agent that outputs the final copy to the user.")

    @message_handler
    async def handle_final_copy(self, message: AssistantMessage, ctx: MessageContext) -> None:
        print(f"\n{'-'*80}\n{self.id.type} received final copy:\n{message.content}")

In [13]:
runtime = SingleThreadedAgentRuntime()

tools: List[Tool] = [FunctionTool(bing_search_tool, description="web search tool")]

await IntentRouterAgent.register(runtime, type=default_topic_type, factory=lambda: IntentRouterAgent(model_client=aoai_client))

await GeneralAgent.register(runtime, type=general_type, factory=lambda: GeneralAgent(model_client=aoai_client))

await ToolAgent.register(runtime, "tool_executor_agent", lambda: ToolAgent("tool_executor_agent", tools))

await WebSearchUseAgent.register(runtime, type=websearch_type, factory=lambda: WebSearchUseAgent(aoai_client, [tool.schema for tool in tools], "tool_executor_agent"))

await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())

AgentType(type='User')

In [17]:
runtime.start()

await runtime.publish_message(UserMessage(content="삼성전자 CES 2025 모니터 신모델", source="User"), topic_id=TopicId(type=default_topic_type, source="default"))

await runtime.stop_when_idle()

--------------------------------------------------------------------------------
default:
삼성전자 CES 2025 모니터 신모델


--------------------------------------------------------------------------------
default:
{"type":"websearch"}
ClassifiedMessage(intent='websearch', content='User question: 삼성전자 CES 2025 모니터 신모델', source='default')
This is Bing for Azure AI Agent Service .......
query ID: 삼성전자 CES 2025 모니터 신모델
<azure.ai.projects._patch.AIProjectClient object at 0x7f3fc39f3e50>
Created agent, ID: asst_idILSFOvwT3GVA1sobIV5KEY
Created thread, ID: thread_dQtDTtIwGPWjgOhCBiGSgEr1
SMS: {'id': 'msg_4dguem1NYgsOwWjNVyZS6TsP', 'object': 'thread.message', 'created_at': 1736556043, 'assistant_id': None, 'thread_id': 'thread_dQtDTtIwGPWjgOhCBiGSgEr1', 'run_id': None, 'role': 'user', 'content': [{'type': 'text', 'text': {'value': '삼성전자 CES 2025 모니터 신모델', 'annotations': []}}], 'attachments': [], 'metadata': {}}
Run finished with status: completed
Deleted agent
Messages:삼성전자는 CES 2025에서 총 5종의 새로운 모니터를 공개했습니다. 새로운 모니터 모델에는 다음과 같은 제품들이 포함되었습니다:

1. **스마트 모니터 M9**: AI 기능이 대폭 향상된 32형 스마트 모니터.
2. **오디세이 OLED G8**: 업계 최초로