Skip to content

Commit b100339

Browse files
stephentoubCopilotSteveSandersonMS
authored
Serialize event dispatch in .NET and Go SDKs (#791)
* Serialize event dispatch in .NET and Go SDKs In .NET, StreamJsonRpc dispatches notifications concurrently on the thread pool. The old code invoked user event handlers inline from DispatchEvent, which meant handlers could run concurrently and out of order. In Go, the JSON-RPC read loop is single-threaded, so user handlers were already serialized. However, broadcast handlers (tool calls, permission requests) ran inline on the read loop, which deadlocked when a handler issued an RPC request back through the same connection. This PR decouples user handler dispatch from the transport by routing events through a channel (Go) / Channel<T> (.NET). A single consumer goroutine/task drains the channel and invokes user handlers serially, in FIFO order. This matches the guarantees provided by the Node.js and Python SDKs (which get natural serialization from their single-threaded event loops) while fitting Go's and .NET's multi-threaded runtimes. Broadcast handlers (tool calls, permission requests) are fired as fire-and-forget directly from the dispatch entry point, outside the channel, so a stalled handler cannot block event delivery. This matches the existing Node.js (void this._executeToolAndRespond()) and Python (asyncio.ensure_future()) behavior. Go changes: - Add eventCh channel to Session; start processEvents consumer goroutine - dispatchEvent enqueues to channel and fires broadcast handler goroutine - Close channel on Disconnect to stop the consumer - Update unit tests and E2E tests for async delivery .NET changes: - Add unbounded Channel<SessionEvent> to CopilotSession; start ProcessEventsAsync consumer task in constructor - DispatchEvent enqueues to channel and fires broadcast handler task - Complete channel on DisposeAsync - Per-handler error catching via ImmutableArray iteration - Cache handler array snapshot to avoid repeated allocation - Inline broadcast error handling into HandleBroadcastEventAsync - Update Should_Receive_Session_Events test to await async delivery - Add Handler_Exception_Does_Not_Halt_Event_Delivery test - Add DisposeAsync_From_Handler_Does_Not_Deadlock test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix panic on send to closed event channel in Go SDK Protect dispatchEvent with a recover guard so that a notification arriving after Disconnect does not crash the process. Also wrap the channel close in sync.Once so Disconnect is safe to call more than once. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Steve Sanderson <SteveSandersonMS@users.noreply.github.com>
1 parent 10c4d02 commit b100339

File tree

10 files changed

+411
-135
lines changed

10 files changed

+411
-135
lines changed

dotnet/src/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance
412412

413413
// Create and register the session before issuing the RPC so that
414414
// events emitted by the CLI (e.g. session.start) are not dropped.
415-
var session = new CopilotSession(sessionId, connection.Rpc);
415+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
416416
session.RegisterTools(config.Tools ?? []);
417417
session.RegisterPermissionHandler(config.OnPermissionRequest);
418418
if (config.OnUserInputRequest != null)
@@ -516,7 +516,7 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes
516516

517517
// Create and register the session before issuing the RPC so that
518518
// events emitted by the CLI (e.g. session.start) are not dropped.
519-
var session = new CopilotSession(sessionId, connection.Rpc);
519+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
520520
session.RegisterTools(config.Tools ?? []);
521521
session.RegisterPermissionHandler(config.OnPermissionRequest);
522522
if (config.OnUserInputRequest != null)

dotnet/src/Session.cs

Lines changed: 113 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
* Copyright (c) Microsoft Corporation. All rights reserved.
33
*--------------------------------------------------------------------------------------------*/
44

5+
using GitHub.Copilot.SDK.Rpc;
56
using Microsoft.Extensions.AI;
7+
using Microsoft.Extensions.Logging;
68
using StreamJsonRpc;
9+
using System.Collections.Immutable;
710
using System.Text.Json;
811
using System.Text.Json.Nodes;
912
using System.Text.Json.Serialization;
10-
using GitHub.Copilot.SDK.Rpc;
13+
using System.Threading.Channels;
1114

1215
namespace GitHub.Copilot.SDK;
1316

@@ -52,22 +55,27 @@ namespace GitHub.Copilot.SDK;
5255
/// </example>
5356
public sealed partial class CopilotSession : IAsyncDisposable
5457
{
55-
/// <summary>
56-
/// Multicast delegate used as a thread-safe, insertion-ordered handler list.
57-
/// The compiler-generated add/remove accessors use a lock-free CAS loop over the backing field.
58-
/// Dispatch reads the field once (inherent snapshot, no allocation).
59-
/// Expected handler count is small (typically 1–3), so Delegate.Combine/Remove cost is negligible.
60-
/// </summary>
61-
private event SessionEventHandler? EventHandlers;
6258
private readonly Dictionary<string, AIFunction> _toolHandlers = [];
6359
private readonly JsonRpc _rpc;
60+
private readonly ILogger _logger;
61+
6462
private volatile PermissionRequestHandler? _permissionHandler;
6563
private volatile UserInputHandler? _userInputHandler;
64+
private ImmutableArray<SessionEventHandler> _eventHandlers = ImmutableArray<SessionEventHandler>.Empty;
65+
6666
private SessionHooks? _hooks;
6767
private readonly SemaphoreSlim _hooksLock = new(1, 1);
6868
private SessionRpc? _sessionRpc;
6969
private int _isDisposed;
7070

71+
/// <summary>
72+
/// Channel that serializes event dispatch. <see cref="DispatchEvent"/> enqueues;
73+
/// a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues and
74+
/// invokes handlers one at a time, preserving arrival order.
75+
/// </summary>
76+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
77+
new() { SingleReader = true });
78+
7179
/// <summary>
7280
/// Gets the unique identifier for this session.
7381
/// </summary>
@@ -93,15 +101,20 @@ public sealed partial class CopilotSession : IAsyncDisposable
93101
/// </summary>
94102
/// <param name="sessionId">The unique identifier for this session.</param>
95103
/// <param name="rpc">The JSON-RPC connection to the Copilot CLI.</param>
104+
/// <param name="logger">Logger for diagnostics.</param>
96105
/// <param name="workspacePath">The workspace path if infinite sessions are enabled.</param>
97106
/// <remarks>
98107
/// This constructor is internal. Use <see cref="CopilotClient.CreateSessionAsync"/> to create sessions.
99108
/// </remarks>
100-
internal CopilotSession(string sessionId, JsonRpc rpc, string? workspacePath = null)
109+
internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? workspacePath = null)
101110
{
102111
SessionId = sessionId;
103112
_rpc = rpc;
113+
_logger = logger;
104114
WorkspacePath = workspacePath;
115+
116+
// Start the asynchronous processing loop.
117+
_ = ProcessEventsAsync();
105118
}
106119

107120
private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
@@ -186,7 +199,7 @@ public async Task<string> SendAsync(MessageOptions options, CancellationToken ca
186199
CancellationToken cancellationToken = default)
187200
{
188201
var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(60);
189-
var tcs = new TaskCompletionSource<AssistantMessageEvent?>();
202+
var tcs = new TaskCompletionSource<AssistantMessageEvent?>(TaskCreationOptions.RunContinuationsAsynchronously);
190203
AssistantMessageEvent? lastAssistantMessage = null;
191204

192205
void Handler(SessionEvent evt)
@@ -236,7 +249,9 @@ void Handler(SessionEvent evt)
236249
/// Multiple handlers can be registered and will all receive events.
237250
/// </para>
238251
/// <para>
239-
/// Handler exceptions are allowed to propagate so they are not lost.
252+
/// Handlers are invoked serially in event-arrival order on a background thread.
253+
/// A handler will never be called concurrently with itself or with other handlers
254+
/// on the same session.
240255
/// </para>
241256
/// </remarks>
242257
/// <example>
@@ -259,27 +274,53 @@ void Handler(SessionEvent evt)
259274
/// </example>
260275
public IDisposable On(SessionEventHandler handler)
261276
{
262-
EventHandlers += handler;
263-
return new ActionDisposable(() => EventHandlers -= handler);
277+
ImmutableInterlocked.Update(ref _eventHandlers, array => array.Add(handler));
278+
return new ActionDisposable(() => ImmutableInterlocked.Update(ref _eventHandlers, array => array.Remove(handler)));
264279
}
265280

266281
/// <summary>
267-
/// Dispatches an event to all registered handlers.
282+
/// Enqueues an event for serial dispatch to all registered handlers.
268283
/// </summary>
269284
/// <param name="sessionEvent">The session event to dispatch.</param>
270285
/// <remarks>
271-
/// This method is internal. Handler exceptions are allowed to propagate so they are not lost.
272-
/// Broadcast request events (external_tool.requested, permission.requested) are handled
273-
/// internally before being forwarded to user handlers.
286+
/// This method is non-blocking. Broadcast request events (external_tool.requested,
287+
/// permission.requested) are fired concurrently so that a stalled handler does not
288+
/// block event delivery. The event is then placed into an in-memory channel and
289+
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
290+
/// which guarantees user handlers see events one at a time, in order.
274291
/// </remarks>
275292
internal void DispatchEvent(SessionEvent sessionEvent)
276293
{
277-
// Handle broadcast request events (protocol v3) before dispatching to user handlers.
278-
// Fire-and-forget: the response is sent asynchronously via RPC.
279-
HandleBroadcastEventAsync(sessionEvent);
294+
// Fire broadcast work concurrently (fire-and-forget with error logging).
295+
// This is done outside the channel so broadcast handlers don't block the
296+
// consumer loop — important when a secondary client's handler intentionally
297+
// never completes (multi-client permission scenario).
298+
_ = HandleBroadcastEventAsync(sessionEvent);
299+
300+
// Queue the event for serial processing by user handlers.
301+
_eventChannel.Writer.TryWrite(sessionEvent);
302+
}
280303

281-
// Reading the field once gives us a snapshot; delegates are immutable.
282-
EventHandlers?.Invoke(sessionEvent);
304+
/// <summary>
305+
/// Single-reader consumer loop that processes events from the channel.
306+
/// Ensures user event handlers are invoked serially and in FIFO order.
307+
/// </summary>
308+
private async Task ProcessEventsAsync()
309+
{
310+
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
311+
{
312+
foreach (var handler in _eventHandlers)
313+
{
314+
try
315+
{
316+
handler(sessionEvent);
317+
}
318+
catch (Exception ex)
319+
{
320+
LogEventHandlerError(ex);
321+
}
322+
}
323+
}
283324
}
284325

285326
/// <summary>
@@ -355,37 +396,44 @@ internal async Task<PermissionRequestResult> HandlePermissionRequestAsync(JsonEl
355396
/// Implements the protocol v3 broadcast model where tool calls and permission requests
356397
/// are broadcast as session events to all clients.
357398
/// </summary>
358-
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
399+
private async Task HandleBroadcastEventAsync(SessionEvent sessionEvent)
359400
{
360-
switch (sessionEvent)
401+
try
361402
{
362-
case ExternalToolRequestedEvent toolEvent:
363-
{
364-
var data = toolEvent.Data;
365-
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
366-
return;
367-
368-
var tool = GetTool(data.ToolName);
369-
if (tool is null)
370-
return; // This client doesn't handle this tool; another client will.
371-
372-
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
373-
break;
374-
}
375-
376-
case PermissionRequestedEvent permEvent:
377-
{
378-
var data = permEvent.Data;
379-
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
380-
return;
381-
382-
var handler = _permissionHandler;
383-
if (handler is null)
384-
return; // This client doesn't handle permissions; another client will.
385-
386-
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
387-
break;
388-
}
403+
switch (sessionEvent)
404+
{
405+
case ExternalToolRequestedEvent toolEvent:
406+
{
407+
var data = toolEvent.Data;
408+
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
409+
return;
410+
411+
var tool = GetTool(data.ToolName);
412+
if (tool is null)
413+
return; // This client doesn't handle this tool; another client will.
414+
415+
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
416+
break;
417+
}
418+
419+
case PermissionRequestedEvent permEvent:
420+
{
421+
var data = permEvent.Data;
422+
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
423+
return;
424+
425+
var handler = _permissionHandler;
426+
if (handler is null)
427+
return; // This client doesn't handle permissions; another client will.
428+
429+
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
430+
break;
431+
}
432+
}
433+
}
434+
catch (Exception ex) when (ex is not OperationCanceledException)
435+
{
436+
LogBroadcastHandlerError(ex);
389437
}
390438
}
391439

@@ -707,6 +755,11 @@ public async Task LogAsync(string message, SessionLogRequestLevel? level = null,
707755
/// <returns>A task representing the dispose operation.</returns>
708756
/// <remarks>
709757
/// <para>
758+
/// The caller should ensure the session is idle (e.g., <see cref="SendAndWaitAsync"/>
759+
/// has returned) before disposing. If the session is not idle, in-flight event handlers
760+
/// or tool handlers may observe failures.
761+
/// </para>
762+
/// <para>
710763
/// Session state on disk (conversation history, planning state, artifacts) is
711764
/// preserved, so the conversation can be resumed later by calling
712765
/// <see cref="CopilotClient.ResumeSessionAsync"/> with the session ID. To
@@ -735,6 +788,8 @@ public async ValueTask DisposeAsync()
735788
return;
736789
}
737790

791+
_eventChannel.Writer.TryComplete();
792+
738793
try
739794
{
740795
await InvokeRpcAsync<object>(
@@ -749,12 +804,18 @@ await InvokeRpcAsync<object>(
749804
// Connection is broken or closed
750805
}
751806

752-
EventHandlers = null;
807+
_eventHandlers = ImmutableInterlocked.InterlockedExchange(ref _eventHandlers, ImmutableArray<SessionEventHandler>.Empty);
753808
_toolHandlers.Clear();
754809

755810
_permissionHandler = null;
756811
}
757812

813+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in broadcast event handler")]
814+
private partial void LogBroadcastHandlerError(Exception exception);
815+
816+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
817+
private partial void LogEventHandlerError(Exception exception);
818+
758819
internal record SendMessageRequest
759820
{
760821
public string SessionId { get; init; } = string.Empty;

dotnet/test/Harness/TestHelper.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public static class TestHelper
1010
CopilotSession session,
1111
TimeSpan? timeout = null)
1212
{
13-
var tcs = new TaskCompletionSource<AssistantMessageEvent>();
13+
var tcs = new TaskCompletionSource<AssistantMessageEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
1414
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));
1515

1616
AssistantMessageEvent? finalAssistantMessage = null;
@@ -78,7 +78,7 @@ public static async Task<T> GetNextEventOfTypeAsync<T>(
7878
CopilotSession session,
7979
TimeSpan? timeout = null) where T : SessionEvent
8080
{
81-
var tcs = new TaskCompletionSource<T>();
81+
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
8282
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));
8383

8484
using var subscription = session.On(evt =>

dotnet/test/MultiClientTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ public async Task Both_Clients_See_Tool_Request_And_Completion_Events()
109109
});
110110

111111
// Set up event waiters BEFORE sending the prompt to avoid race conditions
112-
var client1Requested = new TaskCompletionSource<bool>();
113-
var client2Requested = new TaskCompletionSource<bool>();
114-
var client1Completed = new TaskCompletionSource<bool>();
115-
var client2Completed = new TaskCompletionSource<bool>();
112+
var client1Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
113+
var client2Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
114+
var client1Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
115+
var client2Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
116116

117117
using var sub1 = session1.On(evt =>
118118
{

0 commit comments

Comments
 (0)