From 94409bd38279c8f5c64bc8fe7b13b689aa45a2e0 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Sat, 25 Apr 2026 11:17:10 +0800 Subject: [PATCH 1/2] Defer throttled streaming chunks via pending-flush timer Previously a delta that arrived inside the throttle window was silently dropped until the next delta showed up; when the LLM stalled near the window edge (cold-start, router handoff, tail-of-thought) the user saw the reply freeze for the rest of the interval before the next batch landed. Stash the latest accumulated text on each in-window delta and arm a one-shot deferred flush timer that publishes the stash when the window closes. Multiple in-window deltas collapse onto the latest text. While a dispatch is in flight, additional deltas (caller-driven or timer-driven) are stashed and the dispatch loop reflushes the most recent _pendingText after the in-flight chunk completes (reflush-on-conflict), so the conversation actor still observes strict edit ordering. Make the sink IDisposable so ChannelLlmReplyInboxRuntime cleans up an unfired timer on paths that skip FinalizeAsync (interactive reply, empty reply, exception). Adopts Microsoft.Extensions.TimeProvider.Testing.FakeTimeProvider for deterministic timer-driven test control. The hand-rolled FakeTimeProvider in TurnStreamingReplySinkTests is replaced with the BCL-aligned one. Refs #405 for the follow-up phase-state-machine + centralized unavailable-guard refactor on ConversationGAgent. Co-Authored-By: Claude Opus 4.7 (1M context) --- Directory.Packages.props | 1 + .../ChannelLlmReplyInboxRuntime.cs | 2 +- .../TurnStreamingReplySink.cs | 224 +++++++++++++++++- ...evatar.GAgents.ChannelRuntime.Tests.csproj | 1 + .../TurnStreamingReplySinkTests.cs | 91 ++++++- 5 files changed, 295 insertions(+), 24 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index a6529b1f..67ff150c 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -158,6 +158,7 @@ + diff --git a/agents/Aevatar.GAgents.ChannelRuntime/ChannelLlmReplyInboxRuntime.cs b/agents/Aevatar.GAgents.ChannelRuntime/ChannelLlmReplyInboxRuntime.cs index 7de9fc64..26cdf63e 100644 --- a/agents/Aevatar.GAgents.ChannelRuntime/ChannelLlmReplyInboxRuntime.cs +++ b/agents/Aevatar.GAgents.ChannelRuntime/ChannelLlmReplyInboxRuntime.cs @@ -142,7 +142,7 @@ internal async Task ProcessAsync(NeedsLlmReplyEvent request) var terminalState = LlmReplyTerminalState.Completed; var errorCode = string.Empty; var errorSummary = string.Empty; - TurnStreamingReplySink? streamingSink = TryBuildStreamingSink(request, actor); + using TurnStreamingReplySink? streamingSink = TryBuildStreamingSink(request, actor); try { diff --git a/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs b/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs index a0dc8726..063a28ef 100644 --- a/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs +++ b/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs @@ -13,12 +13,34 @@ namespace Aevatar.GAgents.ChannelRuntime; /// different turns run on different sink instances by construction. /// /// +/// /// The sink is responsible only for accumulating and throttling deltas; placeholder send, edit /// dispatch, and streaming disable/fallback decisions are all owned by the conversation actor. -/// Chunk dispatches are awaited so the actor observes chunks in the same order they arrived from -/// the LLM stream, matching the ordering invariant required by the edit-in-place protocol. +/// +/// +/// Throttling rules: +/// +/// The first delta and any delta that arrives after the throttle window has fully elapsed +/// dispatches immediately, so the user sees movement as soon as the LLM produces text. +/// Deltas inside an active throttle window do not get silently dropped — the latest +/// accumulated text is stashed in _pendingText and a deferred flush timer fires at window +/// close to publish it. Multiple deltas in the same window collapse onto the latest text. +/// While a dispatch is in flight, additional deltas (caller-driven or timer-driven) are +/// likewise stashed; the dispatch loop reflushes the most recent _pendingText after the +/// in-flight chunk completes (reflush-on-conflict). +/// bypasses the throttle so the actor sees the complete text +/// once the stream ends; if a dispatch is in flight, the final text reflushes after it. +/// +/// +/// +/// Concurrency: caller code (NyxIdConversationReplyGenerator) awaits each +/// call serially, but the throttle timer fires on the +/// 's scheduling thread and may race with caller-driven flushes. All +/// mutable state is guarded by _lock; chunk dispatches are serialized through +/// _dispatchInProgress so the conversation actor observes a strict ordering of edits. +/// /// -internal sealed class TurnStreamingReplySink : IStreamingReplySink +internal sealed class TurnStreamingReplySink : IStreamingReplySink, IDisposable { private readonly IActor _targetActor; private readonly string _correlationId; @@ -28,9 +50,15 @@ internal sealed class TurnStreamingReplySink : IStreamingReplySink private readonly TimeProvider _timeProvider; private readonly ILogger? _logger; + private readonly object _lock = new(); private string _lastEmittedText = string.Empty; private DateTimeOffset _lastEmitAt = DateTimeOffset.MinValue; private int _chunksEmitted; + private string _pendingText = string.Empty; + private bool _hasPending; + private ITimer? _flushTimer; + private bool _dispatchInProgress; + private bool _disposed; public TurnStreamingReplySink( IActor targetActor, @@ -52,27 +80,188 @@ public TurnStreamingReplySink( _logger = logger; } - public int ChunksEmitted => _chunksEmitted; + public int ChunksEmitted + { + get + { + lock (_lock) return _chunksEmitted; + } + } public Task OnDeltaAsync(string accumulatedText, CancellationToken ct) => FlushAsync(accumulatedText, isFinal: false, ct); /// /// Applies the final accumulated text, bypassing the throttle so the actor can drive the final - /// edit once the stream ends. + /// edit once the stream ends. If a dispatch is already in flight, the final text is stashed + /// and the in-flight dispatch's reflush loop publishes it on completion. /// public Task FinalizeAsync(string finalText, CancellationToken ct) => FlushAsync(finalText, isFinal: true, ct); + public void Dispose() + { + lock (_lock) + { + if (_disposed) return; + _disposed = true; + _flushTimer?.Dispose(); + _flushTimer = null; + _hasPending = false; + _pendingText = string.Empty; + } + } + private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) { if (string.IsNullOrWhiteSpace(text)) return; - if (string.Equals(text, _lastEmittedText, StringComparison.Ordinal)) - return; - if (!isFinal && (_timeProvider.GetUtcNow() - _lastEmitAt) < _throttle) - return; + string? toDispatch = null; + + lock (_lock) + { + if (_disposed) + return; + + if (string.Equals(text, _lastEmittedText, StringComparison.Ordinal)) + { + // Already on the wire; clear any deferred copy so the timer doesn't republish + // identical content. + _pendingText = string.Empty; + _hasPending = false; + return; + } + + if (_dispatchInProgress) + { + // A dispatch is in flight. Stash the latest text; the dispatch loop's reflush + // step picks up _pendingText after the in-flight chunk completes. No timer is + // needed because the loop polls _hasPending after every dispatch. + _pendingText = text; + _hasPending = true; + return; + } + + var elapsed = _timeProvider.GetUtcNow() - _lastEmitAt; + if (isFinal || elapsed >= _throttle) + { + CancelTimerLocked(); + _pendingText = string.Empty; + _hasPending = false; + _dispatchInProgress = true; + toDispatch = text; + } + else + { + // Inside the throttle window: stash the latest text and arm the deferred flush + // timer if it isn't already armed. Subsequent deltas in this same window will + // simply overwrite _pendingText (collapse on the latest accumulated text). + _pendingText = text; + _hasPending = true; + if (_flushTimer is null) + { + var delay = _throttle - elapsed; + if (delay < TimeSpan.Zero) + delay = TimeSpan.Zero; + _flushTimer = _timeProvider.CreateTimer( + OnFlushTimerFired, + state: null, + dueTime: delay, + period: Timeout.InfiniteTimeSpan); + } + } + } + + if (toDispatch is not null) + await DispatchLoopAsync(toDispatch, ct).ConfigureAwait(false); + } + + private void OnFlushTimerFired(object? state) + { + string? toDispatch = null; + + lock (_lock) + { + _flushTimer?.Dispose(); + _flushTimer = null; + + if (_disposed || !_hasPending) + return; + + // A caller-driven dispatch is already in flight. Its reflush loop will pick up + // _pendingText, so don't start a second concurrent dispatch. + if (_dispatchInProgress) + return; + + if (string.Equals(_pendingText, _lastEmittedText, StringComparison.Ordinal)) + { + _pendingText = string.Empty; + _hasPending = false; + return; + } + + _dispatchInProgress = true; + toDispatch = _pendingText; + _pendingText = string.Empty; + _hasPending = false; + } + + if (toDispatch is not null) + { + // Fire-and-forget: errors are caught inside DispatchLoopAsync so the timer never + // surfaces an unobserved exception. CancellationToken.None because the per-turn + // CancellationToken belongs to the caller's await chain, not the timer. + _ = DispatchLoopAsync(toDispatch, CancellationToken.None); + } + } + + private async Task DispatchLoopAsync(string firstText, CancellationToken ct) + { + var current = firstText; + try + { + while (true) + { + await DispatchOneAsync(current, ct).ConfigureAwait(false); + + string? next; + lock (_lock) + { + if (_disposed || !_hasPending) + { + _dispatchInProgress = false; + return; + } + + if (string.Equals(_pendingText, _lastEmittedText, StringComparison.Ordinal)) + { + _pendingText = string.Empty; + _hasPending = false; + _dispatchInProgress = false; + return; + } + + next = _pendingText; + _pendingText = string.Empty; + _hasPending = false; + } + + current = next!; + } + } + catch + { + lock (_lock) + { + _dispatchInProgress = false; + } + throw; + } + } + + private async Task DispatchOneAsync(string text, CancellationToken ct) + { var chunk = new LlmReplyStreamChunkEvent { CorrelationId = _correlationId, @@ -94,10 +283,13 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) try { - await _targetActor.HandleEventAsync(envelope, ct); - _lastEmittedText = text; - _lastEmitAt = _timeProvider.GetUtcNow(); - _chunksEmitted++; + await _targetActor.HandleEventAsync(envelope, ct).ConfigureAwait(false); + lock (_lock) + { + _lastEmittedText = text; + _lastEmitAt = _timeProvider.GetUtcNow(); + _chunksEmitted++; + } } catch (OperationCanceledException) when (ct.IsCancellationRequested) { @@ -111,4 +303,10 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) _correlationId); } } + + private void CancelTimerLocked() + { + _flushTimer?.Dispose(); + _flushTimer = null; + } } diff --git a/test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj b/test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj index 16c90881..718da72b 100644 --- a/test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj +++ b/test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj @@ -24,6 +24,7 @@ + diff --git a/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs b/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs index 08336049..fafae5b1 100644 --- a/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs +++ b/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs @@ -3,6 +3,7 @@ using Aevatar.GAgents.Channel.Runtime; using FluentAssertions; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; using NSubstitute; namespace Aevatar.GAgents.ChannelRuntime.Tests; @@ -25,7 +26,7 @@ public async Task OnDeltaAsync_FirstDelta_DispatchesChunkEventToActor() } [Fact] - public async Task OnDeltaAsync_WithinThrottle_DropsSubsequentDeltas() + public async Task OnDeltaAsync_WithinThrottle_DefersUntilTimerFires() { var (actor, envelopes) = BuildRecordingActor(); var sink = CreateSink(actor, throttleMs: 750, out var time); @@ -36,8 +37,19 @@ public async Task OnDeltaAsync_WithinThrottle_DropsSubsequentDeltas() time.Advance(TimeSpan.FromMilliseconds(200)); await sink.OnDeltaAsync("chunk 1 more text", CancellationToken.None); + // Still inside the throttle window: only the first delta has dispatched. The subsequent + // two are stashed; the deferred flush timer has not yet fired. envelopes.Should().ContainSingle(); sink.ChunksEmitted.Should().Be(1); + + // Cross the throttle boundary so the deferred timer fires; only the latest stashed text + // should publish (collapse-on-latest), not every individual delta. + time.Advance(TimeSpan.FromMilliseconds(400)); + + envelopes.Should().HaveCount(2); + envelopes[1].Payload.Unpack().AccumulatedText + .Should().Be("chunk 1 more text"); + sink.ChunksEmitted.Should().Be(2); } [Fact] @@ -80,6 +92,44 @@ public async Task FinalizeAsync_NoNewText_DoesNotEmitRedundantChunk() envelopes.Should().ContainSingle(); } + [Fact] + public async Task FinalizeAsync_CancelsPendingFlushTimer() + { + var (actor, envelopes) = BuildRecordingActor(); + var sink = CreateSink(actor, throttleMs: 750, out var time); + + await sink.OnDeltaAsync("chunk one", CancellationToken.None); + time.Advance(TimeSpan.FromMilliseconds(200)); + await sink.OnDeltaAsync("chunk one two", CancellationToken.None); + await sink.FinalizeAsync("final text", CancellationToken.None); + + // Finalize should publish the final text immediately and prevent the deferred timer from + // firing afterwards (otherwise we'd see an extra "chunk one two" emission). + envelopes.Should().HaveCount(2); + envelopes[1].Payload.Unpack().AccumulatedText.Should().Be("final text"); + + time.Advance(TimeSpan.FromMilliseconds(2000)); + envelopes.Should().HaveCount(2); + } + + [Fact] + public async Task PendingTimerEqualsLastEmitted_DoesNotEmitDuplicate() + { + var (actor, envelopes) = BuildRecordingActor(); + var sink = CreateSink(actor, throttleMs: 750, out var time); + + await sink.OnDeltaAsync("hello", CancellationToken.None); + time.Advance(TimeSpan.FromMilliseconds(100)); + // A duplicate "hello" inside the throttle window should clear any deferred copy and not + // schedule a duplicate emission when the timer fires. + await sink.OnDeltaAsync("hello", CancellationToken.None); + + time.Advance(TimeSpan.FromMilliseconds(1000)); + + envelopes.Should().ContainSingle(); + sink.ChunksEmitted.Should().Be(1); + } + [Fact] public async Task OnDeltaAsync_EmptyText_IsIgnored() { @@ -120,6 +170,36 @@ public async Task OnDeltaAsync_ActorDispatchThrows_DropsChunkWithoutPropagating( sink.ChunksEmitted.Should().Be(0); } + [Fact] + public async Task Dispose_PreventsLaterTimerFlush() + { + var (actor, envelopes) = BuildRecordingActor(); + var sink = CreateSink(actor, throttleMs: 750, out var time); + + await sink.OnDeltaAsync("first", CancellationToken.None); + time.Advance(TimeSpan.FromMilliseconds(100)); + await sink.OnDeltaAsync("first plus more", CancellationToken.None); + + sink.Dispose(); + time.Advance(TimeSpan.FromMilliseconds(2000)); + + // The deferred copy should be discarded by Dispose before the timer would have fired. + envelopes.Should().ContainSingle(); + } + + [Fact] + public async Task Dispose_AfterFinalize_IsIdempotent() + { + var (actor, _) = BuildRecordingActor(); + var sink = CreateSink(actor, throttleMs: 0, out _); + + await sink.OnDeltaAsync("first", CancellationToken.None); + await sink.FinalizeAsync("first plus", CancellationToken.None); + + sink.Dispose(); + sink.Dispose(); + } + private static TurnStreamingReplySink CreateSink( IActor actor, int throttleMs, @@ -162,13 +242,4 @@ private static (IActor actor, List envelopes) BuildRecordingActor .Do(call => envelopes.Add(call.Arg())); return (actor, envelopes); } - - private sealed class FakeTimeProvider(DateTimeOffset start) : TimeProvider - { - private DateTimeOffset _now = start; - - public void Advance(TimeSpan delta) => _now = _now.Add(delta); - - public override DateTimeOffset GetUtcNow() => _now; - } } From e4006fd12972827d4462174d52092085f70f55ec Mon Sep 17 00:00:00 2001 From: eanzhao Date: Sat, 25 Apr 2026 13:26:57 +0800 Subject: [PATCH 2/2] Wait for in-flight dispatch when finalizing stream `FinalizeAsync` previously stashed the final text and returned immediately when a dispatch was in flight. The inbox runtime then sent `LlmReplyReadyEvent` right after, racing the late final chunk past `ConversationGAgent`'s processed-command guard which would silently drop it. `TurnStreamingReplySink` now creates a `_drainTcs` when `FinalizeAsync` stashes onto an in-flight dispatch and awaits it. The dispatch loop signals that TCS once it fully drains (success, swallowed dispatch failure, or exception), so `FinalizeAsync` only returns after the final chunk has been dispatched. `Dispose` also releases any pending waiter so a finalize-then- dispose path cannot hang. Adds `FinalizeAsync_DispatchInFlight_WaitsForFinalChunkOnWire` regression covering the race using a gated mock actor. Codex review: https://github.com/aevatarAI/aevatar/pull/407#discussion_r3141420183 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../TurnStreamingReplySink.cs | 107 ++++++++++++------ .../TurnStreamingReplySinkTests.cs | 47 ++++++++ 2 files changed, 122 insertions(+), 32 deletions(-) diff --git a/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs b/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs index 063a28ef..a98e96e5 100644 --- a/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs +++ b/agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs @@ -29,7 +29,9 @@ namespace Aevatar.GAgents.ChannelRuntime; /// likewise stashed; the dispatch loop reflushes the most recent _pendingText after the /// in-flight chunk completes (reflush-on-conflict). /// bypasses the throttle so the actor sees the complete text -/// once the stream ends; if a dispatch is in flight, the final text reflushes after it. +/// once the stream ends; if a dispatch is in flight, the final text reflushes after it and +/// awaits the dispatch loop's drain signal before returning so the +/// caller (the inbox runtime) does not race the ready event past the final chunk. /// /// /// @@ -59,6 +61,11 @@ internal sealed class TurnStreamingReplySink : IStreamingReplySink, IDisposable private ITimer? _flushTimer; private bool _dispatchInProgress; private bool _disposed; + // Signaled by the dispatch loop when it fully drains. FinalizeAsync awaits this when a + // dispatch is already in flight so the caller does not race the inbox runtime's + // LlmReplyReadyEvent past the final chunk dispatch (the ConversationGAgent + // processed-command guard would otherwise drop the late chunk). + private TaskCompletionSource? _drainTcs; public TurnStreamingReplySink( IActor targetActor, @@ -93,14 +100,16 @@ public Task OnDeltaAsync(string accumulatedText, CancellationToken ct) => /// /// Applies the final accumulated text, bypassing the throttle so the actor can drive the final - /// edit once the stream ends. If a dispatch is already in flight, the final text is stashed - /// and the in-flight dispatch's reflush loop publishes it on completion. + /// edit once the stream ends. If a dispatch is already in flight, the final text is stashed and + /// this call awaits the dispatch loop's drain signal so the final chunk is on the wire before + /// the caller proceeds (the inbox runtime sends LlmReplyReadyEvent immediately after). /// public Task FinalizeAsync(string finalText, CancellationToken ct) => FlushAsync(finalText, isFinal: true, ct); public void Dispose() { + TaskCompletionSource? signal; lock (_lock) { if (_disposed) return; @@ -109,7 +118,13 @@ public void Dispose() _flushTimer = null; _hasPending = false; _pendingText = string.Empty; + signal = _drainTcs; + _drainTcs = null; } + // Unblock any FinalizeAsync caller still awaiting the drain signal. Disposing while a + // finalize is in flight is treated as "drained" — no further dispatches will run, so + // continuing to await would hang. + signal?.TrySetResult(false); } private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) @@ -118,6 +133,7 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) return; string? toDispatch = null; + Task? drainTask = null; lock (_lock) { @@ -127,7 +143,9 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) if (string.Equals(text, _lastEmittedText, StringComparison.Ordinal)) { // Already on the wire; clear any deferred copy so the timer doesn't republish - // identical content. + // identical content. Even for isFinal we can return here: the final text is + // already the most recent dispatched chunk, so the actor will see it before + // any subsequent ready event. _pendingText = string.Empty; _hasPending = false; return; @@ -140,41 +158,55 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct) // needed because the loop polls _hasPending after every dispatch. _pendingText = text; _hasPending = true; - return; - } - - var elapsed = _timeProvider.GetUtcNow() - _lastEmitAt; - if (isFinal || elapsed >= _throttle) - { - CancelTimerLocked(); - _pendingText = string.Empty; - _hasPending = false; - _dispatchInProgress = true; - toDispatch = text; + if (isFinal) + { + // Block FinalizeAsync until the dispatch loop drains the stashed final text. + // Without this wait, ChannelLlmReplyInboxRuntime sends LlmReplyReadyEvent + // first and ConversationGAgent's processed-command guard drops the late + // final chunk. + _drainTcs ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + drainTask = _drainTcs.Task; + } } else { - // Inside the throttle window: stash the latest text and arm the deferred flush - // timer if it isn't already armed. Subsequent deltas in this same window will - // simply overwrite _pendingText (collapse on the latest accumulated text). - _pendingText = text; - _hasPending = true; - if (_flushTimer is null) + var elapsed = _timeProvider.GetUtcNow() - _lastEmitAt; + if (isFinal || elapsed >= _throttle) { - var delay = _throttle - elapsed; - if (delay < TimeSpan.Zero) - delay = TimeSpan.Zero; - _flushTimer = _timeProvider.CreateTimer( - OnFlushTimerFired, - state: null, - dueTime: delay, - period: Timeout.InfiniteTimeSpan); + CancelTimerLocked(); + _pendingText = string.Empty; + _hasPending = false; + _dispatchInProgress = true; + toDispatch = text; + } + else + { + // Inside the throttle window: stash the latest text and arm the deferred + // flush timer if it isn't already armed. Subsequent deltas in this same + // window will simply overwrite _pendingText (collapse on the latest + // accumulated text). + _pendingText = text; + _hasPending = true; + if (_flushTimer is null) + { + var delay = _throttle - elapsed; + if (delay < TimeSpan.Zero) + delay = TimeSpan.Zero; + _flushTimer = _timeProvider.CreateTimer( + OnFlushTimerFired, + state: null, + dueTime: delay, + period: Timeout.InfiniteTimeSpan); + } } } } if (toDispatch is not null) await DispatchLoopAsync(toDispatch, ct).ConfigureAwait(false); + + if (drainTask is not null) + await drainTask.ConfigureAwait(false); } private void OnFlushTimerFired(object? state) @@ -219,6 +251,7 @@ private void OnFlushTimerFired(object? state) private async Task DispatchLoopAsync(string firstText, CancellationToken ct) { var current = firstText; + TaskCompletionSource? drainSignal = null; try { while (true) @@ -231,7 +264,9 @@ private async Task DispatchLoopAsync(string firstText, CancellationToken ct) if (_disposed || !_hasPending) { _dispatchInProgress = false; - return; + drainSignal = _drainTcs; + _drainTcs = null; + break; } if (string.Equals(_pendingText, _lastEmittedText, StringComparison.Ordinal)) @@ -239,7 +274,9 @@ private async Task DispatchLoopAsync(string firstText, CancellationToken ct) _pendingText = string.Empty; _hasPending = false; _dispatchInProgress = false; - return; + drainSignal = _drainTcs; + _drainTcs = null; + break; } next = _pendingText; @@ -250,14 +287,20 @@ private async Task DispatchLoopAsync(string firstText, CancellationToken ct) current = next!; } } - catch + catch (Exception ex) { + TaskCompletionSource? errSignal; lock (_lock) { _dispatchInProgress = false; + errSignal = _drainTcs; + _drainTcs = null; } + errSignal?.TrySetException(ex); throw; } + + drainSignal?.TrySetResult(true); } private async Task DispatchOneAsync(string text, CancellationToken ct) diff --git a/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs b/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs index fafae5b1..e1f2551a 100644 --- a/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs +++ b/test/Aevatar.GAgents.ChannelRuntime.Tests/TurnStreamingReplySinkTests.cs @@ -112,6 +112,53 @@ public async Task FinalizeAsync_CancelsPendingFlushTimer() envelopes.Should().HaveCount(2); } + [Fact] + public async Task FinalizeAsync_DispatchInFlight_WaitsForFinalChunkOnWire() + { + // Regression for the race where FinalizeAsync would return as soon as the final text + // was stashed (while a prior dispatch was still in flight), letting the inbox runtime + // send LlmReplyReadyEvent past the late final chunk and triggering the + // ConversationGAgent processed-command guard to drop it. + var firstDispatchGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var envelopes = new List(); + var dispatchCount = 0; + + var actor = Substitute.For(); + actor.Id.Returns("target-actor"); + actor.HandleEventAsync(Arg.Any(), Arg.Any()) + .Returns(call => + { + envelopes.Add(call.Arg()); + dispatchCount++; + return dispatchCount == 1 ? firstDispatchGate.Task : Task.CompletedTask; + }); + + var sink = CreateSink(actor, throttleMs: 0, out _); + + // First dispatch enters the actor and suspends on firstDispatchGate. + var deltaTask = sink.OnDeltaAsync("first", CancellationToken.None); + + // FinalizeAsync must observe _dispatchInProgress and wait for the dispatch loop's drain + // signal — not return immediately after stashing the final text. + var finalizeTask = sink.FinalizeAsync("first plus final", CancellationToken.None); + + deltaTask.IsCompleted.Should().BeFalse(); + finalizeTask.IsCompleted.Should().BeFalse(); + envelopes.Should().ContainSingle("only the gated first chunk has been dispatched"); + + // Releasing the gate lets the loop dispatch the stashed final text; only then should + // FinalizeAsync complete. + firstDispatchGate.SetResult(); + + await deltaTask; + await finalizeTask; + + envelopes.Should().HaveCount(2); + envelopes[1].Payload.Unpack().AccumulatedText + .Should().Be("first plus final"); + sink.ChunksEmitted.Should().Be(2); + } + [Fact] public async Task PendingTimerEqualsLastEmitted_DoesNotEmitDuplicate() {