From 3a1f2055ff8c3701263d6ee1df68b9d17d346b5a Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 18 Apr 2024 18:40:26 +0200 Subject: [PATCH 1/7] Fix data race leading to a deadlock. --- .../HttpClientHandlerTest.Http3.cs | 3 ++ .../src/System/Net/Quic/QuicConnection.cs | 11 +++++-- .../src/System/Net/Quic/QuicStream.cs | 31 +++++++++++++++++-- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs index 16abc260386b4..1b9e2aef9d44d 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs @@ -1098,7 +1098,10 @@ public async Task ConnectionAttemptCanceled_AuthorityNotBlocked() } catch (Exception ex) // Ignore exception and continue until a viable connection is established. { + System.Console.WriteLine(ex.ToString()); _output.WriteLine(ex.ToString()); + var qe = Assert.IsType(ex); + Assert.Equal(QuicError.ConnectionAborted, qe.QuicError); } } await connection.HandleRequestAsync(); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index e568eeb6f97b7..d56152e7e787f 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -424,7 +424,7 @@ public async ValueTask OpenOutboundStreamAsync(QuicStreamType type, await stream.StartAsync(cancellationToken).ConfigureAwait(false); } - catch + catch (Exception ex) { if (stream is not null) { @@ -433,8 +433,15 @@ public async ValueTask OpenOutboundStreamAsync(QuicStreamType type, // Propagate ODE if disposed in the meantime. ObjectDisposedException.ThrowIf(_disposed == 1, this); + + // In case of an incoming race when the connection is closed by the peer just before we open the stream, + // we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. To + // distinguish this case, we throw ConnectionAborted without ApplicationErrorCode. In such a case, we + // can expect the connection close exception to be already reported on the connection level (or very soon). + bool connectionAbortedByPeer = ex is QuicException qe && qe.QuicError == QuicError.ConnectionAborted && qe.ApplicationErrorCode is null; + // Propagate connection error if present. - if (_acceptQueue.Reader.Completion.IsFaulted) + if (_acceptQueue.Reader.Completion.IsFaulted || connectionAbortedByPeer) { await _acceptQueue.Reader.Completion.ConfigureAwait(false); } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index 6af0f5c5c099f..e8e15a2b3558c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -162,13 +162,24 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QuicStreamT try { QUIC_HANDLE* handle; - ThrowHelper.ThrowIfMsQuicError(MsQuicApi.Api.StreamOpen( + int status = MsQuicApi.Api.StreamOpen( connectionHandle, type == QuicStreamType.Unidirectional ? QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL : QUIC_STREAM_OPEN_FLAGS.NONE, &NativeCallback, (void*)GCHandle.ToIntPtr(context), - &handle), - "StreamOpen failed"); + &handle); + + if (status == QUIC_STATUS_ABORTED) + { + // Connection has been closed by the peer (either at transport or application level), + // we won't be receiving any event callback for shutdown on this stream, so we don't + // necessarily know which error to report. So we throw an exception which we can distinguish + // at the caller (ConnectionAborted normally has App error code) and throw the correct + // exception from there. + throw new QuicException(QuicError.ConnectionAborted, null, ""); + } + + ThrowHelper.ThrowIfMsQuicError(status, "StreamOpen failed"); _handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle); _handle.Disposable = _sendBuffers; } @@ -245,6 +256,20 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default) int status = MsQuicApi.Api.StreamStart( _handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); + + if (status == QUIC_STATUS_ABORTED) + { + // Connection has been closed by the peer (either at transport or application level), + // we won't be receiving any event callback for shutdown on this stream, so we don't + // necessarily know which error to report. So we throw an exception which we can distinguish + // at the caller (ConnectionAborted normally has App error code) and throw the correct + // exception from there. + // + // Also, avoid setting _startedTcs so that we don't try to wait for SHUTDOWN_COMPLETE + // in DisposeAsync(). + throw new QuicException(QuicError.ConnectionAborted, null, ""); + } + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) { _startedTcs.TrySetException(exception); From 3e99cd79316cd846938c0bfa8b90eac6ea1f40ee Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 18 Apr 2024 18:42:04 +0200 Subject: [PATCH 2/7] Remove unwanted change --- .../tests/FunctionalTests/HttpClientHandlerTest.Http3.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs index 1b9e2aef9d44d..16abc260386b4 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http3.cs @@ -1098,10 +1098,7 @@ public async Task ConnectionAttemptCanceled_AuthorityNotBlocked() } catch (Exception ex) // Ignore exception and continue until a viable connection is established. { - System.Console.WriteLine(ex.ToString()); _output.WriteLine(ex.ToString()); - var qe = Assert.IsType(ex); - Assert.Equal(QuicError.ConnectionAborted, qe.QuicError); } } await connection.HandleRequestAsync(); From 2819becf07823bd0614aa49f5737e77b69069b4a Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 22 Apr 2024 11:00:53 +0200 Subject: [PATCH 3/7] Code review feedback --- .../System/Net/Quic/Internal/ThrowHelper.cs | 24 ++++++++++++++---- .../src/System/Net/Quic/QuicStream.cs | 25 +++---------------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs index 114c39c49c1e5..69322bc476b53 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs @@ -27,13 +27,27 @@ internal static QuicException GetOperationAbortedException(string? message = nul return new QuicException(QuicError.OperationAborted, null, message ?? SR.net_quic_operationaborted); } - internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWhen(true)] out Exception? exception) + internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWhen(true)] out Exception? exception, bool streamWasSuccessfullyStarted = true, string? message = null) { if (status == QUIC_STATUS_ABORTED) { - // If status == QUIC_STATUS_ABORTED, we will receive an event later, which will complete the task source. - exception = null; - return false; + // Connection has been closed by the peer (either at transport or application level), + if (streamWasSuccessfullyStarted) + { + // we will receive an event later, which will complete the stream with concrete + // information why the connection was aborted. + exception = null; + return false; + } + else + { + // we won't be receiving any event callback for shutdown on this stream, so we don't + // necessarily know which error to report. So we throw an exception which we can distinguish + // at the caller (ConnectionAborted normally has App error code) and throw the correct + // exception from there. + exception = new QuicException(QuicError.ConnectionAborted, null, ""); + return true; + } } else if (status == QUIC_STATUS_INVALID_STATE) { @@ -43,7 +57,7 @@ internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWh } else if (StatusFailed(status)) { - exception = GetExceptionForMsQuicStatus(status); + exception = GetExceptionForMsQuicStatus(status, message: message); return true; } exception = null; diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs index e8e15a2b3558c..320ca9dbcda84 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs @@ -169,17 +169,11 @@ internal unsafe QuicStream(MsQuicContextSafeHandle connectionHandle, QuicStreamT (void*)GCHandle.ToIntPtr(context), &handle); - if (status == QUIC_STATUS_ABORTED) + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? ex, streamWasSuccessfullyStarted: false, message: "StreamOpen failed")) { - // Connection has been closed by the peer (either at transport or application level), - // we won't be receiving any event callback for shutdown on this stream, so we don't - // necessarily know which error to report. So we throw an exception which we can distinguish - // at the caller (ConnectionAborted normally has App error code) and throw the correct - // exception from there. - throw new QuicException(QuicError.ConnectionAborted, null, ""); + throw ex; } - ThrowHelper.ThrowIfMsQuicError(status, "StreamOpen failed"); _handle = new MsQuicContextSafeHandle(handle, context, SafeHandleType.Stream, connectionHandle); _handle.Disposable = _sendBuffers; } @@ -257,20 +251,7 @@ internal ValueTask StartAsync(CancellationToken cancellationToken = default) _handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT); - if (status == QUIC_STATUS_ABORTED) - { - // Connection has been closed by the peer (either at transport or application level), - // we won't be receiving any event callback for shutdown on this stream, so we don't - // necessarily know which error to report. So we throw an exception which we can distinguish - // at the caller (ConnectionAborted normally has App error code) and throw the correct - // exception from there. - // - // Also, avoid setting _startedTcs so that we don't try to wait for SHUTDOWN_COMPLETE - // in DisposeAsync(). - throw new QuicException(QuicError.ConnectionAborted, null, ""); - } - - if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception)) + if (ThrowHelper.TryGetStreamExceptionForMsQuicStatus(status, out Exception? exception, streamWasSuccessfullyStarted: false)) { _startedTcs.TrySetException(exception); } From f923936a3c8e4338e1bb6d2f7c73a071c991b88c Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 22 Apr 2024 16:42:47 +0200 Subject: [PATCH 4/7] Fix hang --- .../src/System/Net/Quic/QuicConnection.cs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index d56152e7e787f..1d8bacbea0f5d 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -541,12 +541,12 @@ private unsafe int HandleEventShutdownInitiatedByTransport(ref SHUTDOWN_INITIATE { Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetExceptionForMsQuicStatus(data.Status, (long)data.ErrorCode)); _connectedTcs.TrySetException(exception); - _acceptQueue.Writer.TryComplete(exception); + CompleteAndDrainAcceptQueue(exception); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownInitiatedByPeer(ref SHUTDOWN_INITIATED_BY_PEER_DATA data) { - _acceptQueue.Writer.TryComplete(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode))); + CompleteAndDrainAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode))); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownComplete() @@ -555,7 +555,7 @@ private unsafe int HandleEventShutdownComplete() _tlsSecret?.WriteSecret(); Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(_disposed == 1 ? new ObjectDisposedException(GetType().FullName) : ThrowHelper.GetOperationAbortedException()); - _acceptQueue.Writer.TryComplete(exception); + CompleteAndDrainAcceptQueue(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); _shutdownTcs.TrySetResult(final: true); @@ -666,6 +666,22 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, } } + private void CompleteAndDrainAcceptQueue(Exception? ex) + { + if (_acceptQueue.Writer.TryComplete(ex)) + { + // also drain the queue. Because stream shutdown events are indicated before connection shutdown, + // the QuicStream instances have already been signaled and closed internally. We only need to dispose them, + // which in this situation should complete synchronously. + while (_acceptQueue.Reader.TryRead(out QuicStream? stream)) + { + ValueTask task = stream.DisposeAsync(); + Debug.Assert(task.IsCompletedSuccessfully); + task.GetAwaiter().GetResult(); + } + } + } + /// /// If not closed explicitly by , closes the connection with the . /// And releases all resources associated with the connection. @@ -719,10 +735,6 @@ public async ValueTask DisposeAsync() } // Flush the queue and dispose all remaining streams. - _acceptQueue.Writer.TryComplete(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName))); - while (_acceptQueue.Reader.TryRead(out QuicStream? stream)) - { - await stream.DisposeAsync().ConfigureAwait(false); - } + CompleteAndDrainAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName))); } } From 1eac51136378a67241c03c9dada3cde709a3e5e1 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 22 Apr 2024 17:55:57 +0200 Subject: [PATCH 5/7] Add assert --- .../System.Net.Quic/src/System/Net/Quic/QuicConnection.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 1d8bacbea0f5d..014061ae87c47 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -680,6 +680,8 @@ private void CompleteAndDrainAcceptQueue(Exception? ex) task.GetAwaiter().GetResult(); } } + + Debug.Assert(_acceptQueue.Reader.Completion.IsCompleted); } /// From 2ea2729ee6b2304e0c886f5b5d419685004ccc16 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Tue, 23 Apr 2024 09:51:55 +0200 Subject: [PATCH 6/7] Fix potential crash --- .../src/System/Net/Quic/QuicConnection.cs | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 014061ae87c47..7a2a14163e050 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -541,12 +541,12 @@ private unsafe int HandleEventShutdownInitiatedByTransport(ref SHUTDOWN_INITIATE { Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetExceptionForMsQuicStatus(data.Status, (long)data.ErrorCode)); _connectedTcs.TrySetException(exception); - CompleteAndDrainAcceptQueue(exception); + CompleteAcceptQueue(exception, false); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownInitiatedByPeer(ref SHUTDOWN_INITIATED_BY_PEER_DATA data) { - CompleteAndDrainAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode))); + CompleteAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode)), false); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownComplete() @@ -555,7 +555,7 @@ private unsafe int HandleEventShutdownComplete() _tlsSecret?.WriteSecret(); Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(_disposed == 1 ? new ObjectDisposedException(GetType().FullName) : ThrowHelper.GetOperationAbortedException()); - CompleteAndDrainAcceptQueue(exception); + CompleteAcceptQueue(exception, true); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); _shutdownTcs.TrySetResult(final: true); @@ -666,22 +666,26 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, } } - private void CompleteAndDrainAcceptQueue(Exception? ex) + private void CompleteAcceptQueue(Exception? ex, bool drain) { - if (_acceptQueue.Writer.TryComplete(ex)) + _acceptQueue.Writer.TryComplete(ex); + + if (drain) { - // also drain the queue. Because stream shutdown events are indicated before connection shutdown, - // the QuicStream instances have already been signaled and closed internally. We only need to dispose them, - // which in this situation should complete synchronously. + // This should be only called after connection SHUTDOWN_COMPLETE has been indicated. + // At that point, all streams have been already shut down internally and we need + // only to close the handle via dispose, so DisposeAsync below should complete + // synchronously (which is necessary for this method to be callable from MsQuic + // event callback). while (_acceptQueue.Reader.TryRead(out QuicStream? stream)) { ValueTask task = stream.DisposeAsync(); Debug.Assert(task.IsCompletedSuccessfully); task.GetAwaiter().GetResult(); } - } - Debug.Assert(_acceptQueue.Reader.Completion.IsCompleted); + Debug.Assert(_acceptQueue.Reader.Completion.IsCompleted); + } } /// @@ -737,6 +741,6 @@ public async ValueTask DisposeAsync() } // Flush the queue and dispose all remaining streams. - CompleteAndDrainAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName))); + CompleteAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName)), true); } } From 5586d18de13ec43752ed316830704b583d62fa44 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 25 Apr 2024 11:19:05 +0200 Subject: [PATCH 7/7] Code review feedback --- .../System/Net/Quic/Internal/ThrowHelper.cs | 3 ++ .../src/System/Net/Quic/QuicConnection.cs | 54 ++++++++----------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs index 69322bc476b53..bb1cc7d25b7b3 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Internal/ThrowHelper.cs @@ -64,6 +64,9 @@ internal static bool TryGetStreamExceptionForMsQuicStatus(int status, [NotNullWh return false; } + // see TryGetStreamExceptionForMsQuicStatus for explanation + internal static bool IsConnectionAbortedWhenStartingStreamException(Exception ex) => ex is QuicException qe && qe.QuicError == QuicError.ConnectionAborted && qe.ApplicationErrorCode is null; + internal static Exception GetExceptionForMsQuicStatus(int status, long? errorCode = default, string? message = null) { Exception ex = GetExceptionInternal(status, errorCode, message); diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs index 7a2a14163e050..f266acb265a13 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/QuicConnection.cs @@ -108,6 +108,11 @@ static async ValueTask StartConnectAsync(QuicClientConnectionOpt /// private int _disposed; + /// + /// Completed when connection shutdown is initiated. + /// + private TaskCompletionSource _connectionCloseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly ValueTaskSource _connectedTcs = new ValueTaskSource(); private readonly ResettableValueTaskSource _shutdownTcs = new ResettableValueTaskSource() { @@ -435,15 +440,14 @@ public async ValueTask OpenOutboundStreamAsync(QuicStreamType type, ObjectDisposedException.ThrowIf(_disposed == 1, this); // In case of an incoming race when the connection is closed by the peer just before we open the stream, - // we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. To - // distinguish this case, we throw ConnectionAborted without ApplicationErrorCode. In such a case, we - // can expect the connection close exception to be already reported on the connection level (or very soon). - bool connectionAbortedByPeer = ex is QuicException qe && qe.QuicError == QuicError.ConnectionAborted && qe.ApplicationErrorCode is null; + // we receive QUIC_STATUS_ABORTED from MsQuic, but we don't know how the connection was closed. We throw + // special exception and handle it here where we can determine the shutdown reason. + bool connectionAbortedByPeer = ThrowHelper.IsConnectionAbortedWhenStartingStreamException(ex); // Propagate connection error if present. - if (_acceptQueue.Reader.Completion.IsFaulted || connectionAbortedByPeer) + if (_connectionCloseTcs.Task.IsFaulted || connectionAbortedByPeer) { - await _acceptQueue.Reader.Completion.ConfigureAwait(false); + await _connectionCloseTcs.Task.ConfigureAwait(false); } throw; } @@ -541,12 +545,15 @@ private unsafe int HandleEventShutdownInitiatedByTransport(ref SHUTDOWN_INITIATE { Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetExceptionForMsQuicStatus(data.Status, (long)data.ErrorCode)); _connectedTcs.TrySetException(exception); - CompleteAcceptQueue(exception, false); + _connectionCloseTcs.TrySetException(exception); + _acceptQueue.Writer.TryComplete(exception); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownInitiatedByPeer(ref SHUTDOWN_INITIATED_BY_PEER_DATA data) { - CompleteAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode)), false); + Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(ThrowHelper.GetConnectionAbortedException((long)data.ErrorCode)); + _connectionCloseTcs.TrySetException(exception); + _acceptQueue.Writer.TryComplete(exception); return QUIC_STATUS_SUCCESS; } private unsafe int HandleEventShutdownComplete() @@ -555,7 +562,8 @@ private unsafe int HandleEventShutdownComplete() _tlsSecret?.WriteSecret(); Exception exception = ExceptionDispatchInfo.SetCurrentStackTrace(_disposed == 1 ? new ObjectDisposedException(GetType().FullName) : ThrowHelper.GetOperationAbortedException()); - CompleteAcceptQueue(exception, true); + _connectionCloseTcs.TrySetException(exception); + _acceptQueue.Writer.TryComplete(exception); _connectedTcs.TrySetException(exception); _shutdownTokenSource.Cancel(); _shutdownTcs.TrySetResult(final: true); @@ -666,28 +674,6 @@ private static unsafe int NativeCallback(QUIC_HANDLE* connection, void* context, } } - private void CompleteAcceptQueue(Exception? ex, bool drain) - { - _acceptQueue.Writer.TryComplete(ex); - - if (drain) - { - // This should be only called after connection SHUTDOWN_COMPLETE has been indicated. - // At that point, all streams have been already shut down internally and we need - // only to close the handle via dispose, so DisposeAsync below should complete - // synchronously (which is necessary for this method to be callable from MsQuic - // event callback). - while (_acceptQueue.Reader.TryRead(out QuicStream? stream)) - { - ValueTask task = stream.DisposeAsync(); - Debug.Assert(task.IsCompletedSuccessfully); - task.GetAwaiter().GetResult(); - } - - Debug.Assert(_acceptQueue.Reader.Completion.IsCompleted); - } - } - /// /// If not closed explicitly by , closes the connection with the . /// And releases all resources associated with the connection. @@ -741,6 +727,10 @@ public async ValueTask DisposeAsync() } // Flush the queue and dispose all remaining streams. - CompleteAcceptQueue(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName)), true); + _acceptQueue.Writer.TryComplete(ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(GetType().FullName))); + while (_acceptQueue.Reader.TryRead(out QuicStream? stream)) + { + await stream.DisposeAsync().ConfigureAwait(false); + } } }