Skip to content

Commit ad797d1

Browse files
authored
.NET Agents - Add streaming support to orchestrations (#12455)
### Motivation and Context <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> Enable support for streamed agent responses for any multi-agent orchestration. ### Description <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> - Added streaming result callback on `AgentOrchestration` - Updated samples ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
1 parent bbc66df commit ad797d1

File tree

16 files changed

+231
-70
lines changed

16 files changed

+231
-70
lines changed

dotnet/samples/GettingStartedWithAgents/Orchestration/Step01_Concurrent.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,21 @@ namespace GettingStarted.Orchestration;
1414
/// </summary>
1515
public class Step01_Concurrent(ITestOutputHelper output) : BaseOrchestrationTest(output)
1616
{
17-
[Fact]
18-
public async Task ConcurrentTaskAsync()
17+
[Theory]
18+
[InlineData(false)]
19+
[InlineData(true)]
20+
public async Task ConcurrentTaskAsync(bool streamedResponse)
1921
{
2022
// Define the agents
2123
ChatCompletionAgent physicist =
2224
this.CreateAgent(
2325
instructions: "You are an expert in physics. You answer questions from a physics perspective.",
26+
name: "Physicist",
2427
description: "An expert in physics");
2528
ChatCompletionAgent chemist =
2629
this.CreateAgent(
2730
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective.",
31+
name: "Chemist",
2832
description: "An expert in chemistry");
2933

3034
// Create a monitor to capturing agent responses (via ResponseCallback)
@@ -36,8 +40,9 @@ public async Task ConcurrentTaskAsync()
3640
ConcurrentOrchestration orchestration =
3741
new(physicist, chemist)
3842
{
39-
ResponseCallback = monitor.ResponseCallback,
4043
LoggerFactory = this.LoggerFactory,
44+
ResponseCallback = monitor.ResponseCallback,
45+
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
4146
};
4247

4348
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step02_Sequential.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ namespace GettingStarted.Orchestration;
1515
/// </summary>
1616
public class Step02_Sequential(ITestOutputHelper output) : BaseOrchestrationTest(output)
1717
{
18-
[Fact]
19-
public async Task SequentialTaskAsync()
18+
[Theory]
19+
[InlineData(false)]
20+
[InlineData(true)]
21+
public async Task SequentialTaskAsync(bool streamedResponse)
2022
{
2123
// Define the agents
2224
ChatCompletionAgent analystAgent =
@@ -58,8 +60,9 @@ give format and make it polished. Output the final improved copy as a single tex
5860
SequentialOrchestration orchestration =
5961
new(analystAgent, writerAgent, editorAgent)
6062
{
63+
LoggerFactory = this.LoggerFactory,
6164
ResponseCallback = monitor.ResponseCallback,
62-
LoggerFactory = this.LoggerFactory
65+
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
6366
};
6467

6568
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step03_GroupChat.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ namespace GettingStarted.Orchestration;
2020
/// </remarks>
2121
public class Step03_GroupChat(ITestOutputHelper output) : BaseOrchestrationTest(output)
2222
{
23-
[Fact]
24-
public async Task GroupChatAsync()
23+
[Theory]
24+
[InlineData(false)]
25+
[InlineData(true)]
26+
public async Task GroupChatAsync(bool streamedResponse)
2527
{
2628
// Define the agents
2729
ChatCompletionAgent writer =
@@ -62,8 +64,9 @@ Consider suggestions when refining an idea.
6264
writer,
6365
editor)
6466
{
65-
ResponseCallback = monitor.ResponseCallback,
6667
LoggerFactory = this.LoggerFactory,
68+
ResponseCallback = monitor.ResponseCallback,
69+
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
6770
};
6871

6972
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step03a_GroupChatWithHumanInTheLoop.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ Consider suggestions when refining an idea.
4343
If not, provide insight on how to refine suggested copy without example.
4444
""");
4545

46+
// Create a monitor to capturing agent responses (via ResponseCallback)
47+
// to display at the end of this sample. (optional)
48+
// NOTE: Create your own callback to capture responses in your application or service.
49+
OrchestrationMonitor monitor = new();
50+
4651
// Define the orchestration
4752
GroupChatOrchestration orchestration =
4853
new(
@@ -59,7 +64,8 @@ Consider suggestions when refining an idea.
5964
writer,
6065
editor)
6166
{
62-
LoggerFactory = this.LoggerFactory
67+
LoggerFactory = this.LoggerFactory,
68+
ResponseCallback = monitor.ResponseCallback,
6369
};
6470

6571
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step03b_GroupChatWithAIManager.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ You are in a debate. Feel free to challenge the other participants with respect.
107107
You are in a debate. Feel free to challenge the other participants with respect.
108108
""");
109109

110+
// Create a monitor to capturing agent responses (via ResponseCallback)
111+
// to display at the end of this sample. (optional)
112+
// NOTE: Create your own callback to capture responses in your application or service.
113+
OrchestrationMonitor monitor = new();
114+
110115
// Define the orchestration
111116
const string topic = "What does a good life mean to you personally?";
112117
Kernel kernel = this.CreateKernelWithChatCompletion();
@@ -127,7 +132,8 @@ You are in a debate. Feel free to challenge the other participants with respect.
127132
immigrant,
128133
doctor)
129134
{
130-
LoggerFactory = this.LoggerFactory
135+
LoggerFactory = this.LoggerFactory,
136+
ResponseCallback = monitor.ResponseCallback,
131137
};
132138

133139
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step04_Handoff.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ namespace GettingStarted.Orchestration;
1616
/// </summary>
1717
public class Step04_Handoff(ITestOutputHelper output) : BaseOrchestrationTest(output)
1818
{
19-
[Fact]
20-
public async Task OrderSupportAsync()
19+
[Theory]
20+
[InlineData(false)]
21+
[InlineData(true)]
22+
public async Task OrderSupportAsync(bool streamedResponse)
2123
{
2224
// Define the agents & tools
2325
ChatCompletionAgent triageAgent =
@@ -76,8 +78,9 @@ public async Task OrderSupportAsync()
7678
Console.WriteLine($"\n# INPUT: {input}\n");
7779
return ValueTask.FromResult(new ChatMessageContent(AuthorRole.User, input));
7880
},
81+
LoggerFactory = this.LoggerFactory,
7982
ResponseCallback = monitor.ResponseCallback,
80-
LoggerFactory = this.LoggerFactory
83+
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
8184
};
8285

8386
// Start the runtime

dotnet/samples/GettingStartedWithAgents/Orchestration/Step04a_HandoffWithStructuredInput.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public async Task HandoffStructuredInputAsync()
4040
description: "An agent that handles .NET related issues");
4141
dotnetAgent.Kernel.Plugins.Add(plugin);
4242

43+
// Create a monitor to capturing agent responses (via ResponseCallback)
44+
// to display at the end of this sample. (optional)
45+
// NOTE: Create your own callback to capture responses in your application or service.
46+
OrchestrationMonitor monitor = new();
47+
4348
// Define the orchestration
4449
HandoffOrchestration<GithubIssue, string> orchestration =
4550
new(OrchestrationHandoffs
@@ -49,7 +54,8 @@ public async Task HandoffStructuredInputAsync()
4954
pythonAgent,
5055
dotnetAgent)
5156
{
52-
LoggerFactory = this.LoggerFactory
57+
LoggerFactory = this.LoggerFactory,
58+
ResponseCallback = monitor.ResponseCallback,
5359
};
5460

5561
GithubIssue input =

dotnet/samples/GettingStartedWithAgents/Orchestration/Step05_Magentic.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ public class Step05_Magentic(ITestOutputHelper output) : BaseOrchestrationTest(o
2828
/// </summary>
2929
protected override bool ForceOpenAI => true;
3030

31-
[Fact]
32-
public async Task MagenticTaskAsync()
31+
[Theory]
32+
[InlineData(false)]
33+
[InlineData(true)]
34+
public async Task MagenticTaskAsync(bool streamedResponse)
3335
{
3436
// Define the agents
3537
Kernel researchKernel = CreateKernelWithOpenAIChatCompletion(ResearcherModel);
@@ -64,8 +66,9 @@ await agentsClient.Administration.CreateAgentAsync(
6466
MagenticOrchestration orchestration =
6567
new(manager, researchAgent, coderAgent)
6668
{
67-
ResponseCallback = monitor.ResponseCallback,
6869
LoggerFactory = this.LoggerFactory,
70+
ResponseCallback = monitor.ResponseCallback,
71+
StreamingResponseCallback = streamedResponse ? monitor.StreamingResultCallback : null,
6972
};
7073

7174
// Start the runtime

dotnet/src/Agents/Orchestration/AgentActor.cs

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6-
using System.Runtime.CompilerServices;
76
using System.Threading;
87
using System.Threading.Tasks;
98
using Microsoft.Extensions.Logging;
@@ -51,10 +50,7 @@ protected AgentActor(AgentId id, IAgentRuntime runtime, OrchestrationContext con
5150
/// <summary>
5251
/// Optionally overridden to create custom invocation options for the agent.
5352
/// </summary>
54-
protected virtual AgentInvokeOptions? CreateInvokeOptions()
55-
{
56-
return null;
57-
}
53+
protected virtual AgentInvokeOptions CreateInvokeOptions(Func<ChatMessageContent, Task> messageHandler) => new() { OnIntermediateMessage = messageHandler };
5854

5955
/// <summary>
6056
/// Optionally overridden to introduce customer filtering logic for the response callback.
@@ -89,8 +85,7 @@ protected ValueTask<ChatMessageContent> InvokeAsync(ChatMessageContent input, Ca
8985
}
9086

9187
/// <summary>
92-
/// Invokes the agent with multiple chat messages.
93-
/// Processes the response items and consolidates the messages into a single <see cref="ChatMessageContent"/>.
88+
/// Invokes the agent with input messages and respond with both streamed and regular messages.
9489
/// </summary>
9590
/// <param name="input">The list of chat messages to send.</param>
9691
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
@@ -99,51 +94,77 @@ protected async ValueTask<ChatMessageContent> InvokeAsync(IList<ChatMessageConte
9994
{
10095
this.Context.Cancellation.ThrowIfCancellationRequested();
10196

102-
AgentResponseItem<ChatMessageContent>[] responses =
103-
await this.Agent.InvokeAsync(
104-
input,
105-
this.Thread,
106-
this.GetInvokeOptions(),
107-
cancellationToken).ToArrayAsync(cancellationToken).ConfigureAwait(false);
108-
109-
AgentResponseItem<ChatMessageContent>? firstResponse = responses.FirstOrDefault();
110-
this.Thread ??= firstResponse?.Thread;
97+
ChatMessageContent? response = null;
11198

112-
// The vast majority of responses will be a single message. Responses with multiple messages will have their content merged.
113-
ChatMessageContent response = new(firstResponse?.Message.Role ?? AuthorRole.Assistant, string.Join("\n\n", responses.Select(response => response.Message)))
99+
AgentInvokeOptions options = this.GetInvokeOptions(HandleMessage);
100+
if (this.Context.StreamingResponseCallback == null)
101+
{
102+
// No need to utilize streaming if no callback is provided
103+
await this.InvokeAsync(input, options, cancellationToken).ConfigureAwait(false);
104+
}
105+
else
114106
{
115-
AuthorName = firstResponse?.Message.AuthorName,
116-
};
107+
await this.InvokeStreamingAsync(input, options, cancellationToken).ConfigureAwait(false);
108+
}
109+
110+
return response ?? new ChatMessageContent(AuthorRole.Assistant, string.Empty);
117111

118-
if (this.Context.ResponseCallback is not null && !this.ResponseCallbackFilter(response))
112+
async Task HandleMessage(ChatMessageContent message)
119113
{
120-
await this.Context.ResponseCallback.Invoke(response).ConfigureAwait(false);
114+
response = message; // Keep track of most recent response for both invocation modes
115+
116+
if (this.Context.ResponseCallback is not null && !this.ResponseCallbackFilter(message))
117+
{
118+
await this.Context.ResponseCallback.Invoke(message).ConfigureAwait(false);
119+
}
121120
}
121+
}
122122

123-
return response;
123+
private async Task InvokeAsync(IList<ChatMessageContent> input, AgentInvokeOptions options, CancellationToken cancellationToken)
124+
{
125+
AgentResponseItem<ChatMessageContent>? lastResponse =
126+
await this.Agent.InvokeAsync(
127+
input,
128+
this.Thread,
129+
options,
130+
cancellationToken).LastOrDefaultAsync(cancellationToken).ConfigureAwait(false);
131+
132+
this.Thread ??= lastResponse?.Thread;
124133
}
125134

126-
/// <summary>
127-
/// Invokes the agent and streams chat message responses asynchronously.
128-
/// Yields each streaming message as it becomes available.
129-
/// </summary>
130-
/// <param name="input">The chat message content to send.</param>
131-
/// <param name="cancellationToken">A cancellation token that can be used to cancel the stream.</param>
132-
/// <returns>An asynchronous stream of <see cref="StreamingChatMessageContent"/> responses.</returns>
133-
protected async IAsyncEnumerable<StreamingChatMessageContent> InvokeStreamingAsync(ChatMessageContent input, [EnumeratorCancellation] CancellationToken cancellationToken)
135+
private async Task InvokeStreamingAsync(IList<ChatMessageContent> input, AgentInvokeOptions options, CancellationToken cancellationToken)
134136
{
135-
this.Context.Cancellation.ThrowIfCancellationRequested();
137+
IAsyncEnumerable<AgentResponseItem<StreamingChatMessageContent>> streamedResponses =
138+
this.Agent.InvokeStreamingAsync(
139+
input,
140+
this.Thread,
141+
options,
142+
cancellationToken);
143+
144+
StreamingChatMessageContent? lastStreamedResponse = null;
145+
await foreach (AgentResponseItem<StreamingChatMessageContent> streamedResponse in streamedResponses.ConfigureAwait(false))
146+
{
147+
this.Context.Cancellation.ThrowIfCancellationRequested();
148+
149+
this.Thread ??= streamedResponse.Thread;
150+
151+
await HandleStreamedMessage(lastStreamedResponse, isFinal: false).ConfigureAwait(false);
152+
153+
lastStreamedResponse = streamedResponse.Message;
154+
}
136155

137-
var responseStream = this.Agent.InvokeStreamingAsync([input], this.Thread, this.GetInvokeOptions(), cancellationToken);
156+
await HandleStreamedMessage(lastStreamedResponse, isFinal: true).ConfigureAwait(false);
138157

139-
await foreach (AgentResponseItem<StreamingChatMessageContent> response in responseStream.ConfigureAwait(false))
158+
async ValueTask HandleStreamedMessage(StreamingChatMessageContent? streamedResponse, bool isFinal)
140159
{
141-
this.Thread ??= response.Thread;
142-
yield return response.Message;
160+
if (this.Context.StreamingResponseCallback != null && streamedResponse != null)
161+
{
162+
await this.Context.StreamingResponseCallback.Invoke(streamedResponse, isFinal).ConfigureAwait(false);
163+
}
143164
}
144165
}
145166

146-
private AgentInvokeOptions? GetInvokeOptions() => this._options ??= this.CreateInvokeOptions();
167+
private AgentInvokeOptions GetInvokeOptions(Func<ChatMessageContent, Task> messageHandler) => this._options ??= this.CreateInvokeOptions(messageHandler);
147168

148169
private static string VerifyDescription(Agent agent)
149170
{

dotnet/src/Agents/Orchestration/AgentOrchestration.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ namespace Microsoft.SemanticKernel.Agents.Orchestration;
1919
/// <param name="response">The agent response</param>
2020
public delegate ValueTask OrchestrationResponseCallback(ChatMessageContent response);
2121

22+
/// <summary>
23+
/// Called to expose the streamed response produced by any agent.
24+
/// </summary>
25+
/// <param name="response">The agent response</param>
26+
/// <param name="isFinal">Indicates if streamed content is final chunk of the message.</param>
27+
public delegate ValueTask OrchestrationStreamingCallback(StreamingChatMessageContent response, bool isFinal);
28+
2229
/// <summary>
2330
/// Called when human interaction is requested.
2431
/// </summary>
@@ -74,6 +81,11 @@ protected AgentOrchestration(params Agent[] members)
7481
/// </summary>
7582
public OrchestrationResponseCallback? ResponseCallback { get; init; }
7683

84+
/// <summary>
85+
/// Optional callback that is invoked for every agent response.
86+
/// </summary>
87+
public OrchestrationStreamingCallback? StreamingResponseCallback { get; init; }
88+
7789
/// <summary>
7890
/// Gets the list of member targets involved in the orchestration.
7991
/// </summary>
@@ -102,7 +114,13 @@ public async ValueTask<OrchestrationResult<TOutput>> InvokeAsync(
102114

103115
CancellationTokenSource orchestrationCancelSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
104116

105-
OrchestrationContext context = new(this.OrchestrationLabel, topic, this.ResponseCallback, this.LoggerFactory, cancellationToken);
117+
OrchestrationContext context =
118+
new(this.OrchestrationLabel,
119+
topic,
120+
this.ResponseCallback,
121+
this.StreamingResponseCallback,
122+
this.LoggerFactory,
123+
cancellationToken);
106124

107125
ILogger logger = this.LoggerFactory.CreateLogger(this.GetType());
108126

dotnet/src/Agents/Orchestration/Handoff/HandoffActor.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public HandoffActor(AgentId id, IAgentRuntime runtime, OrchestrationContext cont
5959
protected override bool ResponseCallbackFilter(ChatMessageContent response) => response.Role == AuthorRole.Tool;
6060

6161
/// <inheritdoc/>
62-
protected override AgentInvokeOptions? CreateInvokeOptions()
62+
protected override AgentInvokeOptions CreateInvokeOptions(Func<ChatMessageContent, Task> messageHandler)
6363
{
6464
// Clone kernel to avoid modifying the original
6565
Kernel kernel = this.Agent.Kernel.Clone();
@@ -71,7 +71,8 @@ public HandoffActor(AgentId id, IAgentRuntime runtime, OrchestrationContext cont
7171
new()
7272
{
7373
Kernel = kernel,
74-
KernelArguments = new(new PromptExecutionSettings { FunctionChoiceBehavior = FunctionChoiceBehavior.Auto() })
74+
KernelArguments = new(new PromptExecutionSettings { FunctionChoiceBehavior = FunctionChoiceBehavior.Auto() }),
75+
OnIntermediateMessage = messageHandler,
7576
};
7677

7778
return options;

0 commit comments

Comments
 (0)