From b5f1780edb57b5008a15776aa14110aef5f7658c Mon Sep 17 00:00:00 2001 From: Clynt Neiko Rupinta Date: Fri, 22 May 2026 10:05:13 +0800 Subject: [PATCH] fix(transit/delta-message): ReceiveAllAsync survives transient transport disconnects via _receiveEof (fixes #297) --- .../DeltaMessageTransit.cs | 46 +++- ...aMessageTransitReceiveAllReconnectTests.cs | 209 ++++++++++++++++++ 2 files changed, 248 insertions(+), 7 deletions(-) create mode 100644 tests/NetConduit.Transit.DeltaMessage.UnitTests/DeltaMessageTransitReceiveAllReconnectTests.cs diff --git a/src/NetConduit.Transit.DeltaMessage/DeltaMessageTransit.cs b/src/NetConduit.Transit.DeltaMessage/DeltaMessageTransit.cs index ea48d65..855d05d 100644 --- a/src/NetConduit.Transit.DeltaMessage/DeltaMessageTransit.cs +++ b/src/NetConduit.Transit.DeltaMessage/DeltaMessageTransit.cs @@ -36,6 +36,11 @@ public sealed class DeltaMessageTransit : IAsyncDisposable // sender lock, and a cross-lock null write races with ComputeDelta and produces // either NRE or silent state divergence (issue #300). private int _resyncRequested; + // Set once when the inbound channel reaches real EOF (read returns 0 bytes). + // ReceiveAllAsync consults this — NOT IsConnected — so transient transport + // disconnects during auto-reconnect do not prematurely terminate the + // enumerable (#297). Mirrors the MessageTransit pattern established by #177. + private volatile bool _receiveEof; private volatile bool _disposed; private volatile bool _readyFired; private readonly object _readyLock = new(); @@ -344,19 +349,41 @@ private static bool RequiresFullState(List ops) => /// /// Receives all state updates as an async enumerable. /// + /// + /// Termination conditions: the supplied is + /// cancelled, the transit is disposed, or the inbound channel reaches real + /// end-of-stream. Transient transport disconnects do NOT terminate the + /// enumerable — when auto-reconnect is configured, iteration resumes after the + /// mux re-establishes the connection (#297). + /// public async IAsyncEnumerable ReceiveAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - while (!cancellationToken.IsCancellationRequested && IsConnected) + while (!cancellationToken.IsCancellationRequested && !_disposed) { - var state = await ReceiveAsync(cancellationToken).ConfigureAwait(false); - if (state is not null) + T? state; + try { - yield return state; + state = await ReceiveAsync(cancellationToken).ConfigureAwait(false); } - else if (!IsConnected) + catch (OperationCanceledException) { - break; + yield break; } + catch (ObjectDisposedException) + { + yield break; + } + + // EOF is signalled exclusively via _receiveEof. A null return is + // legitimate mid-stream control traffic (resync request received, + // delta-before-baseline triggering a resync request to the peer) — + // breaking on null would conflate control frames with real EOF and + // bail on every resync handshake (#297). + if (_receiveEof) + yield break; + + if (state is not null) + yield return state; } } @@ -468,7 +495,11 @@ private async ValueTask WriteMessageAsync(ReadOnlyMemory data, Cancellatio while (prefixRead < 4) { var bytesRead = await _readChannel.ReadAsync(lengthPrefix.AsMemory(prefixRead, 4 - prefixRead), cancellationToken).ConfigureAwait(false); - if (bytesRead == 0) return (null, 0); + if (bytesRead == 0) + { + _receiveEof = true; + return (null, 0); + } prefixRead += bytesRead; } @@ -486,6 +517,7 @@ private async ValueTask WriteMessageAsync(ReadOnlyMemory data, Cancellatio if (read == 0) { ArrayPool.Shared.Return(data); + _receiveEof = true; return (null, 0); } totalRead += read; diff --git a/tests/NetConduit.Transit.DeltaMessage.UnitTests/DeltaMessageTransitReceiveAllReconnectTests.cs b/tests/NetConduit.Transit.DeltaMessage.UnitTests/DeltaMessageTransitReceiveAllReconnectTests.cs new file mode 100644 index 0000000..d8d508c --- /dev/null +++ b/tests/NetConduit.Transit.DeltaMessage.UnitTests/DeltaMessageTransitReceiveAllReconnectTests.cs @@ -0,0 +1,209 @@ +using System.Buffers.Binary; +using System.Text.Json.Nodes; +using NetConduit.Enums; +using NetConduit.Events; +using NetConduit.Interfaces; +using NetConduit.Models; + +namespace NetConduit.Transit.DeltaMessage.UnitTests; + +/// +/// Regression for #297: 's loop +/// condition was IsConnected, which flips false on every transient transport +/// disconnect — terminating the enumerable mid-session even when auto-reconnect was +/// configured to recover. The fix mirrors MessageTransit.ReceiveAllAsync by +/// using a _receiveEof flag that is set only on real end-of-stream. +/// +public sealed class DeltaMessageTransitReceiveAllReconnectTests +{ + [Fact(Timeout = 30000)] + public async Task ReceiveAllAsync_TransportDisconnectMidStream_ContinuesAfterReconnect() + { + var read = new ScriptedReadChannel(); + await using var transit = new DeltaMessageTransit( + writeChannel: null, + readChannel: read, + JsonContext.Default.JsonObject); + + // Frame 1: full state. + read.EnqueueFrame(BuildFullStateFrame(new JsonObject { ["v"] = 1 })); + + var collected = new List(); + // Gate the foreach body so we can deterministically toggle IsConnected + // BEFORE the iterator re-evaluates its while condition for iteration 2. + var releaseAfterFirst = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); + + var consumer = Task.Run(async () => + { + await foreach (var s in transit.ReceiveAllAsync(cts.Token)) + { + collected.Add(s); + if (collected.Count == 1) + { + await releaseAfterFirst.Task.WaitAsync(cts.Token); + } + if (collected.Count >= 2) break; + } + }, cts.Token); + + await WaitUntilAsync(() => collected.Count == 1, cts.Token); + + // Iterator is parked inside the foreach body (awaiting releaseAfterFirst). + // The iterator's next MoveNextAsync (and thus its while-check) has NOT + // run yet. Toggle the channel offline to simulate a transient transport + // disconnect, then release the body so the iterator re-evaluates `while`. + read.SetConnected(false); + releaseAfterFirst.SetResult(); + + // Allow the iterator to wake, re-enter the while check, and (pre-fix) + // bail out. Post-fix, it parks in ReadAsync waiting for more data. + await Task.Delay(100, cts.Token); + + // Simulate transport reconnect: channel back online and a new frame + // arrives. Pre-fix this is never delivered because the enumerable + // already terminated. Post-fix the iterator is still parked and yields it. + read.SetConnected(true); + read.EnqueueFrame(BuildFullStateFrame(new JsonObject { ["v"] = 2 })); + + await consumer.WaitAsync(TimeSpan.FromSeconds(10)); + + Assert.Equal(2, collected.Count); + Assert.Equal(1, collected[0]["v"]!.GetValue()); + Assert.Equal(2, collected[1]["v"]!.GetValue()); + } + + [Fact(Timeout = 30000)] + public async Task ReceiveAllAsync_ReadReturnsZero_TerminatesViaEof() + { + var read = new ScriptedReadChannel(); + await using var transit = new DeltaMessageTransit( + writeChannel: null, + readChannel: read, + JsonContext.Default.JsonObject); + + read.EnqueueFrame(BuildFullStateFrame(new JsonObject { ["v"] = 1 })); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); + var collected = new List(); + + var consumer = Task.Run(async () => + { + await foreach (var s in transit.ReceiveAllAsync(cts.Token)) + { + collected.Add(s); + } + }, cts.Token); + + await WaitUntilAsync(() => collected.Count == 1, cts.Token); + + // Real EOF (read returns 0) — the enumerable MUST terminate. + read.SignalEof(); + await consumer; + + Assert.Single(collected); + } + + private static async Task WaitUntilAsync(Func predicate, CancellationToken ct) + { + while (!predicate()) + { + ct.ThrowIfCancellationRequested(); + await Task.Delay(10, ct); + } + } + + private static byte[] BuildFullStateFrame(JsonObject state) + { + var json = System.Text.Encoding.UTF8.GetBytes(state.ToJsonString()); + var payload = new byte[1 + json.Length]; + payload[0] = 0x00; + Buffer.BlockCopy(json, 0, payload, 1, json.Length); + var frame = new byte[4 + payload.Length]; + BinaryPrimitives.WriteInt32BigEndian(frame.AsSpan(0, 4), payload.Length); + Buffer.BlockCopy(payload, 0, frame, 4, payload.Length); + return frame; + } + + /// + /// IReadChannel test double whose IsConnected can be toggled and whose ReadAsync + /// parks on a TaskCompletionSource until either a frame is enqueued or EOF is signalled. + /// + private sealed class ScriptedReadChannel : IReadChannel + { + private readonly object _lock = new(); + private readonly Queue _buffer = new(); + private TaskCompletionSource _dataAvailable = new(TaskCreationOptions.RunContinuationsAsynchronously); + private bool _eof; + private bool _connected = true; + + public void EnqueueFrame(byte[] frame) + { + lock (_lock) + { + foreach (var b in frame) _buffer.Enqueue(b); + _dataAvailable.TrySetResult(); + } + } + + public void SetConnected(bool value) + { + lock (_lock) { _connected = value; } + } + + public void SignalEof() + { + lock (_lock) + { + _eof = true; + _dataAvailable.TrySetResult(); + } + } + + public async ValueTask ReadAsync(Memory buffer, CancellationToken ct = default) + { + while (true) + { + Task wait; + lock (_lock) + { + if (_buffer.Count > 0) + { + var take = Math.Min(buffer.Length, _buffer.Count); + for (var i = 0; i < take; i++) + buffer.Span[i] = _buffer.Dequeue(); + return take; + } + if (_eof) return 0; + if (_dataAvailable.Task.IsCompleted) + _dataAvailable = new(TaskCreationOptions.RunContinuationsAsynchronously); + wait = _dataAvailable.Task; + } + await wait.WaitAsync(ct).ConfigureAwait(false); + } + } + + public string ChannelId => "test"; + public ChannelState State => ChannelState.Open; + public bool IsReady => true; + public bool IsConnected + { + get { lock (_lock) { return _connected; } } + } + public ChannelPriority Priority => ChannelPriority.Normal; + public ChannelStats Stats { get; } = new(); + public ChannelCloseReason? CloseReason => null; + public Exception? CloseException => null; + + public event EventHandler? Ready { add { } remove { } } + public event EventHandler? Connected { add { } remove { } } + public event EventHandler? Disconnected { add { } remove { } } + public event EventHandler? Closed { add { } remove { } } + + public Task WaitForReadyAsync(CancellationToken ct = default) => Task.CompletedTask; + public ValueTask CloseAsync(CancellationToken ct = default) => ValueTask.CompletedTask; + public Stream AsStream() => throw new NotSupportedException(); + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + public void Dispose() { } + } +}