Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
<!-- ════════════════════════════════════════════════════════════ -->
<!-- Testing -->
<!-- ════════════════════════════════════════════════════════════ -->
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="10.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="18.0.1" />
<PackageVersion Include="WTelegramClient" Version="4.4.1" />
<PackageVersion Include="xunit" Version="2.9.3" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ internal async Task ProcessAsync(NeedsLlmReplyEvent request)
var terminalState = LlmReplyTerminalState.Completed;
var errorCode = string.Empty;
var errorSummary = string.Empty;
TurnStreamingReplySink? streamingSink = TryBuildStreamingSink(request, actor);
using TurnStreamingReplySink? streamingSink = TryBuildStreamingSink(request, actor);

try
{
Expand Down
267 changes: 254 additions & 13 deletions agents/Aevatar.GAgents.ChannelRuntime/TurnStreamingReplySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,36 @@ namespace Aevatar.GAgents.ChannelRuntime;
/// different turns run on different sink instances by construction.
/// </summary>
/// <remarks>
/// <para>
/// The sink is responsible only for accumulating and throttling deltas; placeholder send, edit
/// dispatch, and streaming disable/fallback decisions are all owned by the conversation actor.
/// Chunk dispatches are awaited so the actor observes chunks in the same order they arrived from
/// the LLM stream, matching the ordering invariant required by the edit-in-place protocol.
/// </para>
/// <para>
/// Throttling rules:
/// <list type="bullet">
/// <item>The first delta and any delta that arrives after the throttle window has fully elapsed
/// dispatches immediately, so the user sees movement as soon as the LLM produces text.</item>
/// <item>Deltas inside an active throttle window do not get silently dropped — the latest
/// accumulated text is stashed in <c>_pendingText</c> and a deferred flush timer fires at window
/// close to publish it. Multiple deltas in the same window collapse onto the latest text.</item>
/// <item>While a dispatch is in flight, additional deltas (caller-driven or timer-driven) are
/// likewise stashed; the dispatch loop reflushes the most recent <c>_pendingText</c> after the
/// in-flight chunk completes (reflush-on-conflict).</item>
/// <item><see cref="FinalizeAsync"/> bypasses the throttle so the actor sees the complete text
/// once the stream ends; if a dispatch is in flight, the final text reflushes after it and
/// <see cref="FinalizeAsync"/> awaits the dispatch loop's drain signal before returning so the
/// caller (the inbox runtime) does not race the ready event past the final chunk.</item>
/// </list>
/// </para>
/// <para>
/// Concurrency: caller code (<c>NyxIdConversationReplyGenerator</c>) awaits each
/// <see cref="OnDeltaAsync"/> call serially, but the throttle timer fires on the
/// <see cref="TimeProvider"/>'s scheduling thread and may race with caller-driven flushes. All
/// mutable state is guarded by <c>_lock</c>; chunk dispatches are serialized through
/// <c>_dispatchInProgress</c> so the conversation actor observes a strict ordering of edits.
/// </para>
/// </remarks>
internal sealed class TurnStreamingReplySink : IStreamingReplySink
internal sealed class TurnStreamingReplySink : IStreamingReplySink, IDisposable
{
private readonly IActor _targetActor;
private readonly string _correlationId;
Expand All @@ -28,9 +52,20 @@ internal sealed class TurnStreamingReplySink : IStreamingReplySink
private readonly TimeProvider _timeProvider;
private readonly ILogger? _logger;

private readonly object _lock = new();
private string _lastEmittedText = string.Empty;
private DateTimeOffset _lastEmitAt = DateTimeOffset.MinValue;
private int _chunksEmitted;
private string _pendingText = string.Empty;
private bool _hasPending;
private ITimer? _flushTimer;
private bool _dispatchInProgress;
private bool _disposed;
// Signaled by the dispatch loop when it fully drains. FinalizeAsync awaits this when a
// dispatch is already in flight so the caller does not race the inbox runtime's
// LlmReplyReadyEvent past the final chunk dispatch (the ConversationGAgent
// processed-command guard would otherwise drop the late chunk).
private TaskCompletionSource<bool>? _drainTcs;

public TurnStreamingReplySink(
IActor targetActor,
Expand All @@ -52,27 +87,224 @@ public TurnStreamingReplySink(
_logger = logger;
}

public int ChunksEmitted => _chunksEmitted;
public int ChunksEmitted
{
get
{
lock (_lock) return _chunksEmitted;
}
}

public Task OnDeltaAsync(string accumulatedText, CancellationToken ct) =>
FlushAsync(accumulatedText, isFinal: false, ct);

/// <summary>
/// Applies the final accumulated text, bypassing the throttle so the actor can drive the final
/// edit once the stream ends.
/// edit once the stream ends. If a dispatch is already in flight, the final text is stashed and
/// this call awaits the dispatch loop's drain signal so the final chunk is on the wire before
/// the caller proceeds (the inbox runtime sends LlmReplyReadyEvent immediately after).
/// </summary>
public Task FinalizeAsync(string finalText, CancellationToken ct) =>
FlushAsync(finalText, isFinal: true, ct);

public void Dispose()
{
TaskCompletionSource<bool>? signal;
lock (_lock)
{
if (_disposed) return;
_disposed = true;
_flushTimer?.Dispose();
_flushTimer = null;
_hasPending = false;
_pendingText = string.Empty;
signal = _drainTcs;
_drainTcs = null;
}
// Unblock any FinalizeAsync caller still awaiting the drain signal. Disposing while a
// finalize is in flight is treated as "drained" — no further dispatches will run, so
// continuing to await would hang.
signal?.TrySetResult(false);
}

private async Task FlushAsync(string text, bool isFinal, CancellationToken ct)
{
if (string.IsNullOrWhiteSpace(text))
return;
if (string.Equals(text, _lastEmittedText, StringComparison.Ordinal))
return;
if (!isFinal && (_timeProvider.GetUtcNow() - _lastEmitAt) < _throttle)
return;

string? toDispatch = null;
Task? drainTask = null;

lock (_lock)
{
if (_disposed)
return;

if (string.Equals(text, _lastEmittedText, StringComparison.Ordinal))
{
// Already on the wire; clear any deferred copy so the timer doesn't republish
// identical content. Even for isFinal we can return here: the final text is
// already the most recent dispatched chunk, so the actor will see it before
// any subsequent ready event.
_pendingText = string.Empty;
_hasPending = false;
return;
}

if (_dispatchInProgress)
{
// A dispatch is in flight. Stash the latest text; the dispatch loop's reflush
// step picks up _pendingText after the in-flight chunk completes. No timer is
// needed because the loop polls _hasPending after every dispatch.
_pendingText = text;
_hasPending = true;
if (isFinal)
{
// Block FinalizeAsync until the dispatch loop drains the stashed final text.
// Without this wait, ChannelLlmReplyInboxRuntime sends LlmReplyReadyEvent
// first and ConversationGAgent's processed-command guard drops the late
// final chunk.
_drainTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
drainTask = _drainTcs.Task;
}
}
else
{
var elapsed = _timeProvider.GetUtcNow() - _lastEmitAt;
if (isFinal || elapsed >= _throttle)
{
CancelTimerLocked();
_pendingText = string.Empty;
_hasPending = false;
_dispatchInProgress = true;
toDispatch = text;
}
else
{
// Inside the throttle window: stash the latest text and arm the deferred
// flush timer if it isn't already armed. Subsequent deltas in this same
// window will simply overwrite _pendingText (collapse on the latest
// accumulated text).
_pendingText = text;
_hasPending = true;
if (_flushTimer is null)
{
var delay = _throttle - elapsed;
if (delay < TimeSpan.Zero)
delay = TimeSpan.Zero;
_flushTimer = _timeProvider.CreateTimer(
OnFlushTimerFired,
state: null,
dueTime: delay,
period: Timeout.InfiniteTimeSpan);
}
}
}
}

if (toDispatch is not null)
await DispatchLoopAsync(toDispatch, ct).ConfigureAwait(false);

if (drainTask is not null)
await drainTask.ConfigureAwait(false);
}

private void OnFlushTimerFired(object? state)
{
string? toDispatch = null;

lock (_lock)
{
_flushTimer?.Dispose();
_flushTimer = null;

if (_disposed || !_hasPending)
return;

// A caller-driven dispatch is already in flight. Its reflush loop will pick up
// _pendingText, so don't start a second concurrent dispatch.
if (_dispatchInProgress)
return;

if (string.Equals(_pendingText, _lastEmittedText, StringComparison.Ordinal))
{
_pendingText = string.Empty;
_hasPending = false;
return;
}

_dispatchInProgress = true;
toDispatch = _pendingText;
_pendingText = string.Empty;
_hasPending = false;
}

if (toDispatch is not null)
{
// Fire-and-forget: errors are caught inside DispatchLoopAsync so the timer never
// surfaces an unobserved exception. CancellationToken.None because the per-turn
// CancellationToken belongs to the caller's await chain, not the timer.
_ = DispatchLoopAsync(toDispatch, CancellationToken.None);
}
}

private async Task DispatchLoopAsync(string firstText, CancellationToken ct)
{
var current = firstText;
TaskCompletionSource<bool>? drainSignal = null;
try
{
while (true)
{
await DispatchOneAsync(current, ct).ConfigureAwait(false);

string? next;
lock (_lock)
{
if (_disposed || !_hasPending)
{
_dispatchInProgress = false;
drainSignal = _drainTcs;
_drainTcs = null;
break;
}

if (string.Equals(_pendingText, _lastEmittedText, StringComparison.Ordinal))
{
_pendingText = string.Empty;
_hasPending = false;
_dispatchInProgress = false;
drainSignal = _drainTcs;
_drainTcs = null;
break;
}

next = _pendingText;
_pendingText = string.Empty;
_hasPending = false;
}

current = next!;
}
}
catch (Exception ex)
{
TaskCompletionSource<bool>? errSignal;
lock (_lock)
{
_dispatchInProgress = false;
errSignal = _drainTcs;
_drainTcs = null;
}
errSignal?.TrySetException(ex);
throw;
}

drainSignal?.TrySetResult(true);
}

private async Task DispatchOneAsync(string text, CancellationToken ct)
{
var chunk = new LlmReplyStreamChunkEvent
{
CorrelationId = _correlationId,
Expand All @@ -94,10 +326,13 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct)

try
{
await _targetActor.HandleEventAsync(envelope, ct);
_lastEmittedText = text;
_lastEmitAt = _timeProvider.GetUtcNow();
_chunksEmitted++;
await _targetActor.HandleEventAsync(envelope, ct).ConfigureAwait(false);
lock (_lock)
{
_lastEmittedText = text;
_lastEmitAt = _timeProvider.GetUtcNow();
_chunksEmitted++;
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
Expand All @@ -111,4 +346,10 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct)
_correlationId);
}
}

private void CancelTimerLocked()
{
_flushTimer?.Dispose();
_flushTimer = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<PackageReference Include="FluentAssertions" />
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="NSubstitute" />
<PackageReference Include="xunit" />
Expand Down
Loading
Loading