In [None]:
from typing import Annotated, List, Optional

import nest_asyncio
from pydantic import BaseModel

import autogen
from autogen import ConversableAgent
from autogen.agentchat.user_proxy_agent import UserProxyAgent
from autogen.agents.experimental import WebSurferAgent
from autogen.tools.dependency_injection import BaseContext, Depends

nest_asyncio.apply()

In [2]:
class Subtask(BaseModel):
    question: str
    answer: Optional[str] = None


class Task(BaseContext, BaseModel):
    in_progress_subtask: Optional[Subtask] = None
    subtasks: List[Subtask]
    answer: Optional[str] = None


task_context: Task = Task(subtasks=[])

In [3]:
config_list = autogen.config_list_from_json(
    "OAI_CONFIG_LIST",
    filter_dict={"tags": ["gpt-4o-mini"]},
)
llm_config = {"config_list": config_list}

In [None]:
initial_analyser = ConversableAgent(
    name="InitialAnalysisAgent",
    system_message=(
        "You are an expert at breaking down complex questions into smaller, focused subquestions.\n"
        "Your task is to take any question provided and divide it into clear, actionable subquestions that can be individually answered.\n"
        "Ensure the subquestions are logical, non-redundant, and cover all key aspects of the original question.\n"
        "Avoid providing answers or interpretations—focus solely on decomposition.\n"
        "Do NOT forward the initial task without breaking it down into subquestions!!!\n"
    ),
    llm_config=llm_config,
)


@initial_analyser.register_for_llm(
    description="Decompose the question into subtasks with the maximum of two subtasks. Web surfing will be done based on each subtask independently."
)
def decompose_question(
    subtasks: Annotated[
        list[str], "Each subtask should be related to the original question and aim to achieve partial answering of it"
    ],
    task_context: Annotated[Task, Depends(task_context)],
) -> None:
    task_context.subtasks.extend([Subtask(question=st) for st in subtasks])

In [None]:
websurfer = WebSurferAgent(
    llm_config=llm_config,
    name="WebSurferAgent",
    system_message=(
        "You are a web surfer agent responsible for gathering information from the web to answer the subquestions provided.\n"
        "Always perform searches one subquestion at a time. If you get First get the subquestion, then perform the search and then call the answer function.\n"
        "The process to repeat the following steps until all subquestions are answered:\n"
        "1. Get the next subquestion - 'get_next_subtask'\n"
        "2. Search the web for the answer - 'browser_use'\n"
        "3. Answer the subquestion - 'answer_subtask'\n"
    ),
)


NO_MORE_SUBTASKS = "No more subtasks, proceed to the next agent"


@websurfer.register_for_llm(description="Get next subtask")
def get_next_subtask(task_context: Annotated[Task, Depends(task_context)]) -> str:
    if task_context.in_progress_subtask:
        return (
            f"Subtask already in progress, first answer previous subtask: {task_context.in_progress_subtask.question}"
        )

    unanswered_subtasks = [st for st in task_context.subtasks if st.answer is None]

    if not unanswered_subtasks:
        return NO_MORE_SUBTASKS

    task_context.in_progress_subtask = unanswered_subtasks[0]

    return task_context.in_progress_subtask.question


@websurfer.register_for_llm(description="Answer the subtask")
def answer_subtask(answer: str, task_context: Annotated[Task, Depends(task_context)]) -> str:
    if not task_context.in_progress_subtask:
        return "No subtask in progress, first get a subtask"

    task_context.in_progress_subtask.answer = answer
    task_context.in_progress_subtask = None

    return "Subtask answered successfully"

In [6]:
summary_agent = ConversableAgent(
    name="SummaryAgent",
    system_message=(
        "You are a summarizer agent responsible for summarizing the answers to the subquestions provided."
        "Your task is to provide a concise summary of the answers to the subquestions."
        "Ensure the summary is coherent, logically structured, and covers all key aspects of the original question."
    ),
    llm_config=llm_config,
)


@summary_agent.register_for_llm(description="Request more information")
def request_more_information(request_explanation: str) -> str:
    return request_explanation

In [7]:
user_proxy = UserProxyAgent(
    name="user_proxy",
    system_message="A proxy to excute code",
    is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
    human_input_mode="NEVER",
    code_execution_config=False,
    function_map={
        answer_subtask.name: answer_subtask,
        get_next_subtask.name: get_next_subtask,
        decompose_question.name: decompose_question,
        request_more_information.name: request_more_information,
        websurfer.tool.name: websurfer.tool.func,
    },
)

In [8]:
def state_transition(last_speaker, groupchat):
    messages = groupchat.messages

    # always start with the user
    if len(messages) <= 1:
        return initial_analyser

    print(messages[-1])

    # if the last message is a tool call, return the tool_execution agent
    if "tool_calls" in messages[-1]:
        return user_proxy

    if last_speaker is user_proxy:
        penultimate_speaker = groupchat.agent_by_name(name=messages[-2].get("name", ""))

        if penultimate_speaker is initial_analyser:
            return websurfer

        if penultimate_speaker is summary_agent:
            return initial_analyser

        if messages[-1].get("content", "").startswith(NO_MORE_SUBTASKS):
            return summary_agent

        return groupchat.agent_by_name(name=messages[-2].get("name", ""))

    if last_speaker is initial_analyser:
        return websurfer

    if last_speaker is websurfer:
        return summary_agent

In [None]:
groupchat = autogen.GroupChat(
    agents=[initial_analyser, websurfer, summary_agent, user_proxy],
    messages=[],
    speaker_selection_method=state_transition,
    max_round=1000,
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

chat_result = user_proxy.initiate_chat(manager, message="Who are the founders of AG2")