# Autogen (handoff)

### Setup

In [8]:
import datetime
import os
import sqlite3
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, TypedDict, cast

from autogen_ext.models.openai import AzureOpenAIChatCompletionClient
from azure.core.credentials_async import AsyncTokenCredential
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from azure.search.documents.aio import AsyncSearchItemPaged, SearchClient
from azure.search.documents.models import (
    QueryCaptionResult,
    QueryType,
    VectorizedQuery,
    VectorQuery,
)
from dotenv import load_dotenv
from openai import AsyncAzureOpenAI
from openai.types.chat import ChatCompletion
from openai_messages_token_helper import build_messages, get_token_limit

In [1]:
import json
import uuid

from autogen_core import (
    FunctionCall,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    FunctionExecutionResult,
    FunctionExecutionResultMessage,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
from pydantic import BaseModel

### openai connection

In [2]:
load_dotenv(dotenv_path=r"..\.env")

OPENAI_HOST = os.getenv("OPENAI_HOST", "azure")
AZURE_OPENAI_CHATGPT_DEPLOYMENT = os.getenv("AZURE_OPENAI_CHATGPT_DEPLOYMENT")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AZURE_OPENAI_CHATGPT_MODEL = os.getenv("AZURE_OPENAI_CHATGPT_MODEL")
AZURE_OPENAI_SERVICE = os.getenv("AZURE_OPENAI_SERVICE")

OPENAI_EMB_MODEL = os.getenv("AZURE_OPENAI_EMB_MODEL_NAME", "text-embedding-ada-002")
OPENAI_EMB_DIMENSIONS = int(os.getenv("AZURE_OPENAI_EMB_DIMENSIONS", 1536))
AZURE_OPENAI_EMB_DEPLOYMENT = (
    os.getenv("AZURE_OPENAI_EMB_DEPLOYMENT")
    if OPENAI_HOST.startswith("azure")
    else None
)

AZURE_SEARCH_SERVICE = os.environ["AZURE_SEARCH_SERVICE"]
AZURE_SEARCH_INDEX = os.environ["AZURE_SEARCH_INDEX"]
AZURE_SEARCH_ENDPOINT = f"https://{AZURE_SEARCH_SERVICE}.search.windows.net"
AZURE_SEARCH_QUERY_LANGUAGE = os.getenv("AZURE_SEARCH_QUERY_LANGUAGE", "en-us")
AZURE_SEARCH_QUERY_SPELLER = os.getenv("AZURE_SEARCH_QUERY_SPELLER", "lexicon")

CHATGPT_TOKEN_LIMIT = get_token_limit(AZURE_OPENAI_CHATGPT_MODEL)

NameError: name 'load_dotenv' is not defined

In [1]:
azure_credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)

token_provider = get_bearer_token_provider(
    azure_credential, "https://cognitiveservices.azure.com/.default"
)

autogen_openai_client = AzureOpenAIChatCompletionClient(
    azure_deployment=AZURE_OPENAI_CHATGPT_DEPLOYMENT,
    model=AZURE_OPENAI_CHATGPT_MODEL,
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=f"https://{AZURE_OPENAI_SERVICE}.openai.azure.com",
    azure_ad_token_provider=token_provider,
)

openai_client = AsyncAzureOpenAI(
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=f"https://{AZURE_OPENAI_SERVICE}.openai.azure.com",
    azure_ad_token_provider=token_provider,
)

NameError: name 'DefaultAzureCredential' is not defined

In [5]:
# # testing openai connection
# from autogen_agentchat.agents import AssistantAgent
# from autogen_agentchat.ui import Console

# async def get_weather(city: str) -> str:
#     """Get the weather for a given city."""
#     return f"The weather in {city} is 73 degrees and Sunny."


# # Define an AssistantAgent with the model, tool, system message, and reflection enabled.
# # The system message instructs the agent via natural language.
# agent = AssistantAgent(
#     name="weather_agent",
#     model_client=autogen_openai_client,
#     tools=[get_weather],
#     system_message="You are a helpful assistant.",
#     reflect_on_tool_use=True,
#     model_client_stream=True,  # Enable streaming tokens from the model client.
# )


# # Run the agent and stream the messages to the console.
# async def main() -> None:
#     await Console(agent.run_stream(task="What is the weather in New York?"))


# # NOTE: if running this inside a Python script you'll need to use asyncio.run(main()).
# await main()

### Parameters

In [6]:
SEARCH_MAX_RESULTS = 10
TEMPERATURE = 0.0
SEED = 1234
USE_TEXT_SEARCH = "hybrid"
USE_VECTOR_SEARCH = "hybrid"
USE_SEMANTIC_RANKER = True
USE_SEMANTIC_CAPTIONS = False
MINIMUM_SEARCH_SCORE = 0.022
MINIMUM_RERANKER_SCORE = 2.21
COSINE_SIMILARITY_THRESHOLD = 0.90
VECTOR_WEIGHT = 1.75

RESPONSE_TOKEN_LIMIT = 512
CHATGPT_TOKEN_LIMIT = 128000

### Define Azure search functions

In [7]:
@dataclass
class Document:
    id: Optional[str]
    parent_id: Optional[str]
    title: Optional[str]
    pr_name: Optional[str]
    date_modified: Optional[str]
    cover_image_url: Optional[str]
    full_url: Optional[str]
    content_category: Optional[str]
    chunks: Optional[str]
    embedding: Optional[list[float]]
    captions: list[QueryCaptionResult]
    score: Optional[float] = None
    reranker_score: Optional[float] = None

    def serialize_for_results(self) -> dict[str, Any]:
        return {
            "id": self.id,
            "parent_id": self.parent_id,
            "title": self.title,
            "cover_image_url": self.cover_image_url,
            "full_url": self.full_url,
            "content_category": self.content_category,
            "chunks": self.chunks,
            "embedding": Document.trim_embedding(self.embedding),
            "captions": (
                [
                    {
                        "additional_properties": caption.additional_properties,
                        "text": caption.text,
                        "highlights": caption.highlights,
                    }
                    for caption in self.captions
                ]
                if self.captions
                else []
            ),
            "score": self.score,
            "reranker_score": self.reranker_score,
        }

    @classmethod
    def trim_embedding(cls, embedding: Optional[list[float]]) -> Optional[list[float]]:
        """Returns a trimmed list of floats from the vector embedding."""
        if embedding:
            if len(embedding) > 2:
                # Format the embedding list to show the first 2 items followed by the count of the remaining items."""
                return f"[{embedding[0]}, {embedding[1]} ...+{len(embedding) - 2} more]"
            else:
                return str(embedding)

        return None

In [8]:
async def compute_text_embedding(q: str):
    SUPPORTED_DIMENSIONS_MODEL = {
        "text-embedding-ada-002": False,
        "text-embedding-3-small": True,
        "text-embedding-3-large": True,
    }

    class ExtraArgs(TypedDict, total=False):
        dimensions: int

    dimensions_args: ExtraArgs = (
        {"dimensions": OPENAI_EMB_DIMENSIONS}
        if SUPPORTED_DIMENSIONS_MODEL[OPENAI_EMB_MODEL]
        else {}
    )
    embedding = await openai_client.embeddings.create(  # noqa: F704
        # Azure OpenAI takes the deployment name as the model name
        model=(
            AZURE_OPENAI_EMB_DEPLOYMENT
            if AZURE_OPENAI_EMB_DEPLOYMENT
            else OPENAI_EMB_MODEL
        ),
        input=q,
        **dimensions_args,
    )
    query_vector = embedding.data[0].embedding
    return VectorizedQuery(
        vector=query_vector, k_nearest_neighbors=50, fields="embedding"
    )

In [None]:
async def perform_search_and_fetch_documents(
    endpoint: str,
    index_name: str,
    azure_credential: AsyncTokenCredential,
    query_text: str,
    vectors: list[VectorQuery],
    usetextsearch: bool,
    usevectorsearch: bool,
    usesemanticranker: bool,
    top: int,
    usesemanticcaptions: bool,
    filter: str,
    # query_language: str,
    # query_speller: str,
) -> list[Document]:

    async with SearchClient(
        endpoint=endpoint,
        index_name=index_name,
        credential=azure_credential,
    ) as search_client:
        search_text = query_text if usetextsearch else ""
        search_vectors = vectors if usevectorsearch else []
        if usesemanticranker:
            results: AsyncSearchItemPaged = await search_client.search(
                search_text=search_text,
                filter=filter,
                top=top,
                query_caption=(
                    "extractive|highlight-false" if usesemanticcaptions else None
                ),
                vector_queries=search_vectors,
                query_type=QueryType.SEMANTIC,
                # query_language=query_language,
                # query_speller=query_speller,
                semantic_configuration_name="default",
                semantic_query=query_text,
            )
        else:
            results: AsyncSearchItemPaged = await search_client.search(
                search_text=search_text,
                filter=filter,
                top=top,
                vector_queries=search_vectors,
            )

        documents = []
        async for page in results.by_page():
            async for document in page:
                documents.append(
                    Document(
                        id=document.get("id"),
                        parent_id=document.get("parent_id"),
                        title=document.get("title"),
                        pr_name=document.get("pr_name"),
                        date_modified=document.get("date_modified"),
                        cover_image_url=document.get("cover_image_url"),
                        full_url=document.get("full_url"),
                        content_category=document.get("content_category"),
                        chunks=document.get("chunks"),
                        embedding=document.get("embedding"),
                        captions=cast(
                            list[QueryCaptionResult], document.get("@search.captions")
                        ),
                        score=document.get("@search.score"),
                        reranker_score=document.get("@search.reranker_score"),
                    )
                )

        return documents


def get_citation(sourcepage: str, use_image_citation: bool) -> str:
    if use_image_citation:
        return sourcepage
    else:
        path, ext = os.path.splitext(sourcepage)
        if ext.lower() == ".png":
            page_idx = path.rfind("-")
            page_number = int(path[page_idx + 1 :])
            return f"{path[:page_idx]}.pdf#page={page_number}"

        return sourcepage


def nonewlines(s: str) -> str:
    return s.replace("\n", " ").replace("\r", " ")


async def get_sources_content(
    results: list[Document], use_semantic_captions: bool, use_image_citation: bool
) -> list[str]:
    if use_semantic_captions:
        return [
            {
                "index_id": doc.id or "",  # noqa: F821
                "article_id": doc.parent_id or "",  # noqa: F821
                "title": doc.title or "",  # noqa: F821
                "pr_name": doc.pr_name or "",  # noqa: F821
                "url": get_citation(
                    (doc.full_url or ""), use_image_citation
                )  # noqa: F821
                + " ["
                + (doc.date_modified or "no date")  # noqa: F821
                + "]",
                "captions": nonewlines(
                    " . ".join([cast(str, c.text) for c in (doc["captions"] or [])])
                ),  # noqa: F821
                "search_score": doc.score or "",
                "reranker_score": doc.reranker_score or "",
                "embedding": doc.embedding or "",
            }
            for doc in results
        ]
    else:
        return [
            {
                "index_id": doc.id or "",
                "article_id": doc.parent_id or "",
                "title": doc.title or "",
                "pr_name": doc.pr_name or "",
                "url": get_citation((doc.full_url or ""), use_image_citation)
                + " ["
                + (doc.date_modified or "no date")
                + "]",
                "chunk": (get_citation((doc.title or ""), use_image_citation))
                + ": "
                + nonewlines(doc.chunks or ""),
                "search_score": doc.score or "",
                "reranker_score": doc.reranker_score or "",
                "embedding": doc.embedding or "",
            }
            for doc in results
        ]

### Define tool functions

In [10]:
def fetch_vaccination_history(user_id: int) -> List[Dict]:
    """Retrieve user's past vaccinations from the database."""
    conn = sqlite3.connect(r"..\data\vaccination_db.sqlite")
    cursor = conn.cursor()
    # sql query to filtered by user_id in descending vaccination date
    query = """
        SELECT vr.record_id, v.vaccine_name, vr.dose_number, vr.vaccination_date, p.polyclinic_name 
        FROM VaccineRecords vr
        JOIN Vaccines v ON vr.vaccine_id = v.vaccine_id
        JOIN Polyclinics p ON vr.polyclinic_id = p.polyclinic_id
        WHERE vr.user_id = ?
        ORDER BY vr.vaccination_date DESC;
    """
    cursor.execute(query, (user_id,))
    records = cursor.fetchall()
    conn.close()
    vaccination_history_records = [
        {
            "record_id": r[0],
            "vaccine_name": r[1],
            "dose_number": r[2],
            "date": r[3],
            "polyclinic": r[4],
        }
        for r in records
    ]
    return vaccination_history_records

In [11]:
def fetch_user_profile(user_id: int) -> Optional[Dict]:
    """Retrieve user profile details."""
    conn = sqlite3.connect(r"..\data\vaccination_db.sqlite")
    cursor = conn.cursor()
    query = "SELECT user_id, name, email, date_of_birth FROM Users WHERE user_id = ?;"
    cursor.execute(query, (user_id,))
    user = cursor.fetchone()
    conn.close()
    if user:
        return {
            "user_id": user[0],
            "name": user[1],
            "email": user[2],
            "date_of_birth": user[3],
        }
    return None

In [12]:
def check_available_slots(vaccine_name: str, polyclinic_name: str) -> List[Dict]:
    """Find available slots for a specific vaccine."""
    conn = sqlite3.connect(r"..\data\vaccination_db.sqlite")
    cursor = conn.cursor()
    query = """
        SELECT bs.slot_id, p.polyclinic_name, bs.slot_datetime 
        FROM BookingSlots bs
        JOIN Polyclinics p ON bs.polyclinic_id = p.polyclinic_id
        JOIN Vaccines v ON bs.vaccine_id = v.vaccine_id
        WHERE v.vaccine_name = ? AND bs.is_booked = 0
    """
    params = [vaccine_name]
    query += " AND p.polyclinic_name LIKE ?"
    params.append(f"%{polyclinic_name}%")
    query += " ORDER BY bs.slot_datetime ASC;"
    cursor.execute(query, tuple(params))
    slots = cursor.fetchall()
    conn.close()
    return [{"slot_id": s[0], "polyclinic": s[1], "datetime": s[2]} for s in slots]

In [13]:
def book_appointment(
    slot_id: int, user_id: int, action: str, new_slot_id: int = None
) -> str:
    """Handles booking, rescheduling, or cancelling an appointment."""
    conn = sqlite3.connect(r"..\data\vaccination_db.sqlite")
    cursor = conn.cursor()

    if action == "new":
        # Try to book a new appointment if the slot is available
        cursor.execute(
            "SELECT is_booked FROM BookingSlots WHERE slot_id = ?;", (slot_id,)
        )
        result = cursor.fetchone()

        if result and result[0] == 0:  # Slot is available
            cursor.execute(
                "UPDATE BookingSlots SET is_booked = 1 WHERE slot_id = ?;", (slot_id,)
            )
            # conn.commit()
            conn.close()
            return "Appointment booked successfully."
        conn.close()
        return "Slot is no longer available."

    elif action == "reschedule":
        # Cancel the old appointment (set is_booked to 0)
        cursor.execute(
            "SELECT is_booked FROM BookingSlots WHERE slot_id = ?;", (slot_id,)
        )
        result = cursor.fetchone()

        if result and result[0] == 1:  # Slot is booked
            cursor.execute(
                "UPDATE BookingSlots SET is_booked = 0 WHERE slot_id = ?;", (slot_id,)
            )
            # conn.commit()
            print(
                "Existing appointment has been cancelled. Rebooking new appointment slot.."
            )

        # Book the new slot if available
        cursor.execute(
            "SELECT is_booked FROM BookingSlots WHERE slot_id = ?;", (new_slot_id,)
        )
        result = cursor.fetchone()

        if result and result[0] == 0:  # Slot is available
            cursor.execute(
                "UPDATE BookingSlots SET is_booked = 1 WHERE slot_id = ?;",
                (new_slot_id,),
            )
            # conn.commit()
            conn.close()
            return "Appointment successfully rescheduled."
        conn.close()
        return "New slot is not available."

    elif action == "cancel":
        # Cancel the old appointment (set is_booked to 0)
        cursor.execute(
            "SELECT is_booked FROM BookingSlots WHERE slot_id = ?;", (slot_id,)
        )
        result = cursor.fetchone()

        if result and result[0] == 1:  # Slot is booked
            cursor.execute(
                "UPDATE BookingSlots SET is_booked = 0 WHERE slot_id = ?;", (slot_id,)
            )
            # conn.commit()
            conn.close()
            return "Appointment successfully cancelled."

        conn.close()
        return "No appointment found to cancel."

    conn.close()
    return "Invalid action."

In [None]:
async def recommend_vaccines(user_id: int):
    """
    Recommend vaccines based on user profile and vaccination history.
    """
    # Step 1: Fetch user profile (age, gender)
    conn = sqlite3.connect(r"..\data\vaccination_db.sqlite")
    cursor = conn.cursor()
    cursor.execute("SELECT date_of_birth FROM Users WHERE user_id = ?", (user_id,))
    user = cursor.fetchone()
    if not user:
        return {"error": "User not found"}

    birth_date = datetime.datetime.strptime(user[0], "%Y-%m-%d").date()
    age = (datetime.date.today() - birth_date).days // 365

    # cursor.execute("SELECT gender FROM Users WHERE user_id = ?", (user_id,))
    # gender = cursor.fetchone()[0]
    gender = "male"  # missing gender info in data for now

    print(age, gender)

    # Step 2: Query Azure Index for recommended vaccines
    query_text = f"vaccination guidelines for a {age}-year-old {gender}"
    vectors = [await compute_text_embedding(query_text)]
    documents = await perform_search_and_fetch_documents(
        endpoint=AZURE_SEARCH_ENDPOINT,
        index_name=AZURE_SEARCH_INDEX,
        azure_credential=azure_credential,
        query_text=query_text,
        vectors=vectors,
        usetextsearch=USE_TEXT_SEARCH,
        usevectorsearch=USE_VECTOR_SEARCH,
        usesemanticranker=USE_SEMANTIC_RANKER,
        top=5,  # Get top 5 recommendations
        usesemanticcaptions=USE_SEMANTIC_CAPTIONS,
        filter="parent_id eq '1434610_content_js'",
        # filter=None
    )

    sources_content = await get_sources_content(documents, USE_SEMANTIC_CAPTIONS, False)
    content = " ".join(doc["chunk"] for doc in sources_content)

    vaccination_history_records = fetch_vaccination_history(user_id)

    # Step 3: Use LLM to recommend vaccinations
    vaccine_recommendation_prompt = f"""
    Given the following vaccination guidelines, list the recommended vaccines for a {age}-year-old {gender}.
    Exclude any vaccines that are irrelevant for the age group and vaccinations that user has already taken based on the vaccination history. Provide a brief purpose for each vaccine.

    Guidelines:
    {content}

    Vaccination history:
    {vaccination_history_records}

    Respond in a numbered bullet format of the recommended vaccine and its purpose.
    """

    messages = build_messages(
        model=AZURE_OPENAI_CHATGPT_MODEL,
        system_prompt=vaccine_recommendation_prompt,
        max_tokens=CHATGPT_TOKEN_LIMIT - RESPONSE_TOKEN_LIMIT,
    )

    chat_completion: ChatCompletion = await openai_client.chat.completions.create(
        # Azure OpenAI takes the deployment name as the model name
        model=AZURE_OPENAI_CHATGPT_DEPLOYMENT,
        messages=messages,
        temperature=0,
        max_tokens=RESPONSE_TOKEN_LIMIT,
        n=1,
        stream=False,
        seed=SEED,
    )

    return chat_completion.choices[0].message.content

### Handoff workflow setup

#### Define the message protocols for agents to communicate

In [15]:
# UserLogin is a message published by the runtime when a user logs in and starts a new session.


class UserLogin(BaseModel):
    pass


# UserTask is a message containing the chat history of the user session.
# When an AI agent hands off a task to other agents, it also publishes a UserTask message.


class UserTask(BaseModel):
    context: List[LLMMessage]


# AgentResponse is a message published by the AI agents and the Human Agent,
# it also contains the chat history as well as a topic type for the customer to reply to.


class AgentResponse(BaseModel):
    reply_to_topic_type: str
    context: List[LLMMessage]

#### Define AIAgent and UserAgent class

In [None]:
class AIAgent(RoutedAgent):
    def __init__(
        self,
        description: str,
        system_message: SystemMessage,
        model_client: ChatCompletionClient,
        tools: List[Tool],
        delegate_tools: List[Tool],
        agent_topic_type: str,
        user_topic_type: str,
    ) -> None:
        super().__init__(description)
        self._system_message = system_message
        self._model_client = model_client
        self._tools = dict([(tool.name, tool) for tool in tools])
        self._tool_schema = [tool.schema for tool in tools]
        self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
        self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
        self._agent_topic_type = agent_topic_type
        self._user_topic_type = user_topic_type

    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
        # Send the task to the LLM.
        llm_result = await self._model_client.create(  # noqa: F704
            messages=[self._system_message] + message.context,
            tools=self._tool_schema + self._delegate_tool_schema,
            cancellation_token=ctx.cancellation_token,
        )
        print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
        # Process the LLM result.
        while isinstance(llm_result.content, list) and all(
            isinstance(m, FunctionCall) for m in llm_result.content
        ):
            tool_call_results: List[FunctionExecutionResult] = []
            delegate_targets: List[Tuple[str, UserTask]] = []
            # Process each function call.
            for call in llm_result.content:
                arguments = json.loads(call.arguments)
                if call.name in self._tools:
                    # Execute the tool directly.
                    result = await self._tools[call.name].run_json(
                        arguments, ctx.cancellation_token
                    )
                    result_as_str = self._tools[call.name].return_value_as_string(
                        result
                    )
                    tool_call_results.append(
                        FunctionExecutionResult(
                            call_id=call.id,
                            content=result_as_str,
                            is_error=False,
                            name=call.name,
                        )
                    )
                elif call.name in self._delegate_tools:
                    # Execute the tool to get the delegate agent's topic type.
                    result = await self._delegate_tools[
                        call.name
                    ].run_json(  # noqa: F704
                        arguments, ctx.cancellation_token
                    )
                    topic_type = self._delegate_tools[call.name].return_value_as_string(
                        result
                    )
                    # Create the context for the delegate agent, including the function call and the result.
                    delegate_messages = list(message.context) + [
                        AssistantMessage(content=[call], source=self.id.type),
                        FunctionExecutionResultMessage(
                            content=[
                                FunctionExecutionResult(
                                    call_id=call.id,
                                    content=f"Transferred to {topic_type}. Adopt persona immediately.",
                                    is_error=False,
                                    name=call.name,
                                )
                            ]
                        ),
                    ]
                    delegate_targets.append(
                        (topic_type, UserTask(context=delegate_messages))
                    )
                else:
                    raise ValueError(f"Unknown tool: {call.name}")
            if len(delegate_targets) > 0:
                # Delegate the task to other agents by publishing messages to the corresponding topics.
                for topic_type, task in delegate_targets:
                    print(
                        f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}",
                        flush=True,
                    )
                    await self.publish_message(
                        task, topic_id=TopicId(topic_type, source=self.id.key)
                    )
            if len(tool_call_results) > 0:
                print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
                # Make another LLM call with the results.
                message.context.extend(
                    [
                        AssistantMessage(
                            content=llm_result.content, source=self.id.type
                        ),
                        FunctionExecutionResultMessage(content=tool_call_results),
                    ]
                )
                llm_result = await self._model_client.create(
                    messages=[self._system_message] + message.context,
                    tools=self._tool_schema + self._delegate_tool_schema,
                    cancellation_token=ctx.cancellation_token,
                )
                print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
            else:
                # The task has been delegated, so we are done.
                return
        # The task has been completed, publish the final result.
        assert isinstance(llm_result.content, str)
        message.context.append(
            AssistantMessage(content=llm_result.content, source=self.id.type)
        )
        await self.publish_message(
            AgentResponse(
                context=message.context, reply_to_topic_type=self._agent_topic_type
            ),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )

In [17]:
class UserAgent(RoutedAgent):
    def __init__(
        self, description: str, user_topic_type: str, agent_topic_type: str
    ) -> None:
        super().__init__(description)
        self._user_topic_type = user_topic_type
        self._agent_topic_type = agent_topic_type

    @message_handler
    async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
        print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
        # Get the user's initial input after login.
        user_input = input("User: ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
        await self.publish_message(
            UserTask(context=[UserMessage(content=user_input, source="User")]),
            topic_id=TopicId(self._agent_topic_type, source=self.id.key),
        )

    # User Login: First Interaction
    # Triggered when a user logs in.
    # Asks for user input and sends it to the Triage Agent.
    # The Triage Agent then determines which agent should handle the request.

    @message_handler
    async def handle_task_result(
        self, message: AgentResponse, ctx: MessageContext
    ) -> None:
        # Get the user's input after receiving a response from an agent.
        user_input = input("User (type 'exit' to close the session): ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
        if user_input.strip().lower() == "exit":
            print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
            return
        message.context.append(UserMessage(content=user_input, source="User"))
        await self.publish_message(
            UserTask(context=message.context),
            topic_id=TopicId(message.reply_to_topic_type, source=self.id.key),
        )

    # Handling Agent Responses: Continuing the Conversation
    # Waits for an agent's response.
    # Prompts the user for their next input.
    # If the user types "exit", the session ends.
    # Otherwise, the conversation continues by forwarding the updated context.

#### Create FunctionTools and Topics

In [1]:
fetch_vaccination_history_tool = FunctionTool(
    fetch_vaccination_history,
    description="Use to retrieve user's vaccination history records based on user id.",
)
fetch_user_profile_tool = FunctionTool(
    fetch_user_profile,
    description="Use to retrieve user profile information such as gender and date of birth based on user id.",
)
recommend_vaccines_tool = FunctionTool(
    recommend_vaccines,
    description="Provide personalised vaccine recommendations based on user's vaccination history, age and gender.",
)
check_slots_tool = FunctionTool(
    check_available_slots,
    description="Check for available vaccination appointment slots based on vaccine name, polyclinic name and date.",
)
book_appointment_tool = FunctionTool(
    book_appointment,
    description="User to book, cancel or reschedule a vaccination appointment.",
)

# These tools can be passed to an agent system to be executed or used by other agents.
# description parameter provides context for how the tool should be used.

NameError: name 'FunctionTool' is not defined

In [19]:
# define the topic types each of the agents will subscribe to
vaccine_records_topic_type = "VaccineRecordsAgent"
vaccine_recommendation_topic_type = "VaccineRecommenderAgent"
appointment_topic_type = "AppointmentAgent"

triage_agent_topic_type = "TriageAgent"
user_topic_type = "User"

In [20]:
def transfer_to_vaccine_records_agent() -> str:
    return vaccine_records_topic_type


def transfer_to_recommender_agent() -> str:
    return vaccine_recommendation_topic_type


def transfer_to_appointment_agent() -> str:
    return appointment_topic_type


def transfer_back_to_triage() -> str:
    return triage_agent_topic_type


transfer_to_vaccine_records_agent_tool = FunctionTool(
    transfer_to_vaccine_records_agent,
    description="Use for retrieval of vaccination records history.",
)
transfer_to_recommender_agent_tool = FunctionTool(
    transfer_to_recommender_agent,
    description="Use for recommendation of vaccinations for user based on user's vaccination history, age and gender.",
)
transfer_to_appointment_agent_tool = FunctionTool(
    transfer_to_appointment_agent,
    description="Use for vaccination-related appointments enquiry, booking, cancellation and rescheduling.",
)
transfer_back_to_triage_tool = FunctionTool(
    transfer_back_to_triage,
    description="Call this if the user brings up a topic outside of your purview.",
)

#### Create Agents

In [21]:
runtime = SingleThreadedAgentRuntime()

In [22]:
triage_agent_prompt = """
You are an intelligent triage assistant for a vaccination enquiry and booking system. Your goal is to efficiently guide users by gathering key details and directing them to the appropriate service.
Start by introducing yourself briefly. Ask clear, natural, and relevant questions to collect necessary information without overwhelming the user. Be polite, concise, and proactive.
Gather information to direct the customer to the right agent. 
If the user requests a vaccination appointment but does not specify a preferred date or location or vaccine name, ask them to provide the missing details before proceeding.
If the request is unclear, politely ask for more details before routing them.
"""

In [None]:
triage_agent_type = await AIAgent.register(  # noqa: F704
    runtime,
    type=triage_agent_topic_type,  # Using the topic type as the agent type.
    factory=lambda: AIAgent(
        description="A triage agent.",  # The agent's role description, which indicates that it is a customer service bot for triaging issues.
        system_message=SystemMessage(content=triage_agent_prompt),
        model_client=autogen_openai_client,
        tools=[],
        delegate_tools=[  # delegate tools to transfer tasks to other agents
            transfer_to_vaccine_records_agent_tool,
            transfer_to_recommender_agent_tool,
            transfer_to_appointment_agent_tool,
        ],
        agent_topic_type=triage_agent_topic_type,  # defines the context of the agent
        user_topic_type=user_topic_type,
    ),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
# subscribes the triage agent to its topic (triage_agent_topic_type).
# This ensures that the agent will receive messages that are published to this specific topic.
# The subscription enables the triage agent to handle and respond to incoming messages directed at it.
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(
        topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type
    )
)
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(
        topic_type=appointment_topic_type, agent_type=triage_agent_type.type
    )
)

In [None]:
vaccine_records_agent_type = await AIAgent.register(  # noqa: F704
    runtime,
    type=vaccine_records_topic_type,  # Using the topic type as the agent type.  listens for messages under the SalesAgent topic.
    factory=lambda: AIAgent(
        description="An agent responsible for retrieving user vaccination history.",
        system_message=SystemMessage(
            content="You are responsible for fetching a user's vaccination records."
            "Given a user ID, retrieve their vaccination history."
            "If no records are found, inform the requesting agent."
        ),
        model_client=autogen_openai_client,
        tools=[
            fetch_vaccination_history_tool
        ],  # agent can execute orders when the user agrees to buy.
        delegate_tools=[
            transfer_back_to_triage_tool,
            transfer_to_recommender_agent_tool,
        ],  # If necessary, the agent can transfer the user back to the Triage Agent.
        agent_topic_type=vaccine_records_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
# Sales Agent subscribes to the SalesAgent topic, meaning it will only process messages published to that topic.
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(
        topic_type=vaccine_records_topic_type,
        agent_type=vaccine_records_agent_type.type,
    )
)

In [None]:
vaccine_recommender_agent_type = await AIAgent.register(  # noqa: F704
    runtime,
    type=vaccine_recommendation_topic_type,  # Using the topic type as the agent type.  listens for messages under the SalesAgent topic.
    factory=lambda: AIAgent(
        description="An agent responsible for recommending vaccines based on user vaccination history, age, and gender.",
        system_message=SystemMessage(
            content="You are responsible for providing personalized vaccine recommendations."
            "Given a user's vaccination history, age, and gender, suggest appropriate vaccines."
            "Exclude vaccines the user has already received. Provide a brief purpose for each recommended vaccine."
        ),
        model_client=autogen_openai_client,
        tools=[
            fetch_vaccination_history_tool,
            fetch_user_profile_tool,
            recommend_vaccines_tool,
        ],  # agent can execute orders when the user agrees to buy.
        delegate_tools=[
            transfer_back_to_triage_tool,
            transfer_to_appointment_agent_tool,
        ],  # If necessary, the agent can transfer the user back to the Triage Agent.
        agent_topic_type=vaccine_recommendation_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
# Sales Agent subscribes to the SalesAgent topic, meaning it will only process messages published to that topic.
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(
        topic_type=vaccine_recommendation_topic_type,
        agent_type=vaccine_recommender_agent_type.type,
    )
)

In [None]:
appointment_agent_type = await AIAgent.register(  # noqa: F704
    runtime,
    type=appointment_topic_type,  # Using the topic type as the agent type.  listens for messages under the SalesAgent topic.
    factory=lambda: AIAgent(
        description="An agent responsible for managing vaccination appointments, including checking availability and booking or modifying appointments.",
        system_message=SystemMessage(
            content="You are responsible for managing vaccination appointments."
            "You help users check available slots, book new appointments, reschedule existing ones, or cancel appointments."
            "Ensure all necessary information is provided, such as vaccine name, polyclinic location, and preferred date."
            "If any information is missing, request clarification before proceeding."
        ),
        model_client=autogen_openai_client,
        tools=[
            check_slots_tool,
            book_appointment_tool,
        ],  # agent can execute orders when the user agrees to buy.
        delegate_tools=[
            transfer_back_to_triage_tool,
            transfer_to_recommender_agent_tool,
        ],  # If necessary, the agent can transfer the user back to the Triage Agent.
        agent_topic_type=appointment_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
# Sales Agent subscribes to the SalesAgent topic, meaning it will only process messages published to that topic.
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(
        topic_type=appointment_topic_type, agent_type=appointment_agent_type.type
    )
)

In [None]:
user_agent_type = await UserAgent.register(  # noqa: F704
    runtime,
    type=user_topic_type,  # agent listens to messages under the "User" topic.
    factory=lambda: UserAgent(
        description="A user agent.",
        user_topic_type=user_topic_type,
        agent_topic_type=triage_agent_topic_type,  # Start with the triage agent.
    ),
)
# Add subscriptions for the user agent: it will receive messages published to its own topic only.
# Ensures the User Agent only processes messages under the "User" topic.
await runtime.add_subscription(  # noqa: F704
    TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type)
)

### Running the workflow

In [None]:
# Start the runtime.
runtime.start()

# Create a new session for the user.
session_id = str(uuid.uuid4())
await runtime.publish_message(  # noqa: F704
    UserLogin(), topic_id=TopicId(user_topic_type, source=session_id)
)

# Run until completion.
await runtime.stop_when_idle()  # noqa: F704

--------------------------------------------------------------------------------
User login, session ID: 26d96ce4-978e-41f1-b53b-64c948520116.
--------------------------------------------------------------------------------
User:

--------------------------------------------------------------------------------
TriageAgent:
Hello! I'm here to assist you with vaccination enquiries and bookings. How can I help you today?
--------------------------------------------------------------------------------
User:
check my vaccination history
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_Twc4P3ejAdsR5LeQymbv3guv', arguments='{}', name='transfer_to_vaccine_records_agent')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to VaccineRecordsAgent
--------------------------------------------------------------------------------
VaccineRecordsAgent:
Please provide me with your u

## End