Skip to content

Python: Multi-agent orchestration: Handoff #12046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio

from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.contents import AuthorRole, ChatMessageContent
from semantic_kernel.functions import kernel_function

"""
The following sample demonstrates how to create a handoff orchestration that represents
a customer support triage system. The orchestration consists of 4 agents, each specialized
in a different area of customer support: triage, refunds, order status, and order returns.

Depending on the customer's request, agents can hand off the conversation to the appropriate
agent.

Human in the loop is achieved via a callback function similar to the one used in group chat
orchestration. Except that in the handoff orchestration, all agents have access to the
human response function, whereas in the group chat orchestration, only the manager has access
to the human response function.

This sample demonstrates the basic steps of creating and starting a runtime, creating
a handoff orchestration, invoking the orchestration, and finally waiting for the results.
"""


class OrderStatusPlugin:
@kernel_function
def check_order_status(self, order_id: str) -> str:
"""Check the status of an order."""
# Simulate checking the order status
return f"Order {order_id} is shipped and will arrive in 2-3 days."


class OrderRefundPlugin:
@kernel_function
def process_refund(self, order_id: str, reason: str) -> str:
"""Process a refund for an order."""
# Simulate processing a refund
print(f"Processing refund for order {order_id} due to: {reason}")
return f"Refund for order {order_id} has been processed successfully."


class OrderReturnPlugin:
@kernel_function
def process_return(self, order_id: str, reason: str) -> str:
"""Process a return for an order."""
# Simulate processing a return
print(f"Processing return for order {order_id} due to: {reason}")
return f"Return for order {order_id} has been processed successfully."


def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:
"""Return a list of agents that will participate in the Handoff orchestration and the handoff relationships.

Feel free to add or remove agents and handoff connections.
"""
support_agent = ChatCompletionAgent(
name="TriageAgent",
description="A customer support agent that triages issues.",
instructions="Handle customer requests.",
service=OpenAIChatCompletion(),
)

refund_agent = ChatCompletionAgent(
name="RefundAgent",
description="A customer support agent that handles refunds.",
instructions="Handle refund requests.",
service=OpenAIChatCompletion(),
plugins=[OrderRefundPlugin()],
)

order_status_agent = ChatCompletionAgent(
name="OrderStatusAgent",
description="A customer support agent that checks order status.",
instructions="Handle order status requests.",
service=OpenAIChatCompletion(),
plugins=[OrderStatusPlugin()],
)

order_return_agent = ChatCompletionAgent(
name="OrderReturnAgent",
description="A customer support agent that handles order returns.",
instructions="Handle order return requests.",
service=OpenAIChatCompletion(),
plugins=[OrderReturnPlugin()],
)

# Define the handoff relationships between agents
handoffs = (
OrchestrationHandoffs()
.add_many(
source_agent=support_agent.name,
target_agents={
refund_agent.name: "Transfer to this agent if the issue is refund related",
order_status_agent.name: "Transfer to this agent if the issue is order status related",
order_return_agent.name: "Transfer to this agent if the issue is order return related",
},
)
.add(
source_agent=refund_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not refund related",
)
.add(
source_agent=order_status_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not order status related",
)
.add(
source_agent=order_return_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not order return related",
)
)

return [support_agent, refund_agent, order_status_agent, order_return_agent], handoffs


def agent_response_callback(message: ChatMessageContent) -> None:
"""Observer function to print the messages from the agents."""
print(f"{message.name}: {message.content}")


def human_response_function() -> ChatMessageContent:
"""Observer function to print the messages from the agents."""
user_input = input("User: ")
return ChatMessageContent(role=AuthorRole.USER, content=user_input)


async def main():
"""Main function to run the agents."""
# 1. Create a handoff orchestration with multiple agents
agents, handoffs = get_agents()
handoff_orchestration = HandoffOrchestration(
members=agents,
handoffs=handoffs,
agent_response_callback=agent_response_callback,
human_response_function=human_response_function,
)

# 2. Create a runtime and start it
runtime = InProcessRuntime()
runtime.start()

# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await handoff_orchestration.invoke(
task="A customer is on the line.",
runtime=runtime,
)

# 4. Wait for the results
value = await orchestration_result.get()
print(value)

# 5. Stop the runtime after the invocation is complete
await runtime.stop_when_idle()

"""
Sample output:
TriageAgent: Hello! Thank you for reaching out. How can I assist you today?
User: I'd like to track the status of my order
OrderStatusAgent: Sure, I can help you with that. Could you please provide me with your order ID?
User: My order ID is 123
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. Is there anything
else I can assist you with?
User: I want to return another order of mine
OrderReturnAgent: I can help you with returning your order. Could you please provide the order ID for the return
and the reason you'd like to return it?
User: Order ID 321
OrderReturnAgent: Please provide the reason for returning the order with ID 321.
User: Broken item
Processing return for order 321 due to: Broken item
OrderReturnAgent: The return for your order with ID 321 has been successfully processed due to the broken item.
Is there anything else I can assist you with?
User: No, bye
Task is completed with summary: Handled order return for order ID 321 due to a broken item, and successfully
processed the return.
"""


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
from enum import Enum

from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.contents import AuthorRole, ChatMessageContent
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel_pydantic import KernelBaseModel

"""
The following sample demonstrates how to create a handoff orchestration that can triage
GitHub issues based on their content. The orchestration consists of 3 agents, each
specialized in a different area.

The input to the orchestration is not longer a string or a chat message, but a Pydantic
model (i.e. structure input). The model will get transformed into a chat message before
being passed to the agents. This allows the orchestration to become more flexible and
easier reusable.

This sample demonstrates the basic steps of creating and starting a runtime, creating
a handoff orchestration, invoking the orchestration, and finally waiting for the results.
"""


class GitHubLabels(Enum):
"""Enum representing GitHub labels."""

PYTHON = "python"
DOTNET = ".NET"
BUG = "bug"
ENHANCEMENT = "enhancement"
QUESTION = "question"
VECTORSTORE = "vectorstore"
AGENT = "agent"


class GithubIssue(KernelBaseModel):
"""Model representing a GitHub issue."""

id: str
title: str
body: str
labels: list[str] = []


class Plan(KernelBaseModel):
"""Model representing a plan for resolving a GitHub issue."""

tasks: list[str]


class GithubPlugin:
"""Plugin for GitHub related operations."""

@kernel_function
async def add_labels(self, issue_id: str, labels: list[GitHubLabels]) -> None:
"""Add labels to a GitHub issue."""
await asyncio.sleep(1) # Simulate network delay
print(f"Adding labels {labels} to issue {issue_id}")

@kernel_function(description="Create a plan to resolve the issue.")
async def create_plan(self, issue_id: str, plan: Plan) -> None:
"""Create tasks for a GitHub issue."""
await asyncio.sleep(1) # Simulate network delay
print(f"Creating plan for issue {issue_id} with tasks:\n{plan.model_dump_json(indent=2)}")


def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:
"""Return a list of agents that will participate in the Handoff orchestration and the handoff relationships.

Feel free to add or remove agents and handoff connections.
"""
triage_agent = ChatCompletionAgent(
name="TriageAgent",
description="An agent that triages GitHub issues",
instructions="Given a GitHub issue, triage it.",
service=OpenAIChatCompletion(),
)
python_agent = ChatCompletionAgent(
name="PythonAgent",
description="An agent that handles Python related issues",
instructions="You are an agent that handles Python related GitHub issues.",
service=OpenAIChatCompletion(),
plugins=[GithubPlugin()],
)
dotnet_agent = ChatCompletionAgent(
name="DotNetAgent",
description="An agent that handles .NET related issues",
instructions="You are an agent that handles .NET related GitHub issues.",
service=OpenAIChatCompletion(),
plugins=[GithubPlugin()],
)

# Define the handoff relationships between agents
handoffs = {
triage_agent.name: {
python_agent.name: "Transfer to this agent if the issue is Python related",
dotnet_agent.name: "Transfer to this agent if the issue is .NET related",
},
}

return [triage_agent, python_agent, dotnet_agent], handoffs


GithubIssueSample = GithubIssue(
id="12345",
title=(
"Bug: SQLite Error 1: 'ambiguous column name:' when including VectorStoreRecordKey in "
"VectorSearchOptions.Filter"
),
body=(
"Describe the bug"
"When using column names marked as [VectorStoreRecordData(IsFilterable = true)] in "
"VectorSearchOptions.Filter, the query runs correctly."
"However, using the column name marked as [VectorStoreRecordKey] in VectorSearchOptions.Filter, the query "
"throws exception 'SQLite Error 1: ambiguous column name: StartUTC"
""
"To Reproduce"
"Add a filter for the column marked [VectorStoreRecordKey]. Since that same column exists in both the "
"vec_TestTable and TestTable, the data for both columns cannot be returned."
""
"Expected behavior"
"The query should explicitly list the vec_TestTable column names to retrieve and should omit the "
"[VectorStoreRecordKey] column since it will be included in the primary TestTable columns."
""
"Platform"
""
"Microsoft.SemanticKernel.Connectors.Sqlite v1.46.0-preview"
"Additional context"
"Normal DBContext logging shows only normal context queries. Queries run by VectorizedSearchAsync() don't "
"appear in those logs and I could not find a way to enable logging in semantic search so that I could "
"actually see the exact query that is failing. It would have been very useful to see the failing semantic "
"query."
),
labels=[],
)


# The default input transform will attempt to serialize an object into a string by using
# `json.dump()`. However, an object of a Pydantic model type cannot be directly serialize
# by `json.dump()`. Thus, we will need a custom transform.
def custom_input_transform(input_message: GithubIssue) -> ChatMessageContent:
return ChatMessageContent(role=AuthorRole.USER, content=input_message.model_dump_json())


async def main():
"""Main function to run the agents."""
# 1. Create a handoff orchestration with multiple agents
# and a custom input transform.
# To enable structured input, you must specify the input transform
# and the generic types for the orchestration,
agents, handoffs = get_agents()
handoff_orchestration = HandoffOrchestration[GithubIssue, ChatMessageContent](
members=agents,
handoffs=handoffs,
input_transform=custom_input_transform,
)

# 2. Create a runtime and start it
runtime = InProcessRuntime()
runtime.start()

# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await handoff_orchestration.invoke(
task=GithubIssueSample,
runtime=runtime,
)

# 4. Wait for the results
value = await orchestration_result.get(timeout=100)
print(value)

# 5. Stop the runtime when idle
await runtime.stop_when_idle()

"""
Sample output:
Adding labels [<GitHubLabels.BUG: 'bug'>, <GitHubLabels.DOTNET: '.NET'>, <GitHubLabels.VECTORSTORE: 'vectorstore'>]
to issue 12345
Creating plan for issue 12345 with tasks:
{
"tasks": [
"Investigate the issue to confirm the ambiguity in the SQL query when using VectorStoreRecordKey in filters.",
"Modify the query generation logic to explicitly list column names for vec_TestTable and prevent ambiguity.",
"Test the solution to ensure VectorStoreRecordKey can be used in filters without causing SQLite errors.",
"Update documentation to provide guidance on using VectorStoreRecordKey in filters to avoid similar issues.",
"Consider adding logging capability to track semantic search queries for easier debugging in the future."
]
}
Task is completed with summary: No handoff agent name provided and no human response function set. Ending task.
"""


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions python/semantic_kernel/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"ToolSpec": ".agent",
"ConcurrentOrchestration": ".orchestration.concurrent",
"SequentialOrchestration": ".orchestration.sequential",
"HandoffOrchestration": ".orchestration.handoffs",
"OrchestrationHandoffs": ".orchestration.handoffs",
"GroupChatOrchestration": ".orchestration.group_chat",
}

Expand Down
Loading
Loading