From b9822b96bfc5934b39d07357896748ed751e4b61 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:54:39 -0700 Subject: [PATCH 1/4] reset _continuation on completion --- .../Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 3ce4906d5b2b70..8ba828efe51751 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -225,6 +225,7 @@ private void SignalCompletion() _continuationState = null; object? context = _capturedContext; _capturedContext = null; + _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; if (context is null) { From e61ae102fc23967ad6aed67dff1da14045742a55 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Tue, 21 Apr 2026 17:59:44 -0700 Subject: [PATCH 2/4] some theoretical races and retentions --- .../Sources/ManualResetValueTaskSourceCore.cs | 75 +++++++----- .../Threading/Channels/AsyncOperation.cs | 107 +++++++++++------- 2 files changed, 111 insertions(+), 71 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 8ba828efe51751..758e3e7942a548 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -34,8 +34,6 @@ public struct ManualResetValueTaskSourceCore private TResult? _result; /// The current version of this value, used to help prevent misuse. private short _version; - /// Whether the current operation has completed. - private bool _completed; /// Whether to force continuations to run asynchronously. private bool _runContinuationsAsynchronously; @@ -51,13 +49,18 @@ public bool RunContinuationsAsynchronously public void Reset() { // Reset/update state for the next use/await of this instance. + // Order of assignments is unimportant here. + // The outer user always ensures that the state is not accessed across + // reset point by when implementing Rent/Return operations. _version++; + Debug.Assert(_continuation == null || IsCompleted); _continuation = null; + Debug.Assert(_continuationState == null); + Debug.Assert(_capturedContext == null); _continuationState = null; _capturedContext = null; _error = null; _result = default; - _completed = false; } /// Completes with a successful result. @@ -79,13 +82,16 @@ public void SetException(Exception error) /// Gets the operation version. public short Version => _version; + /// Gets whether the operation has completed. + internal bool IsCompleted => ReferenceEquals(Volatile.Read(ref _continuation), ManualResetValueTaskSourceCoreShared.s_sentinel); + /// Gets the status of the operation. /// Opaque value that was provided to the 's constructor. public ValueTaskSourceStatus GetStatus(short token) { ValidateToken(token); return - Volatile.Read(ref _continuation) is null || !_completed ? ValueTaskSourceStatus.Pending : + !IsCompleted ? ValueTaskSourceStatus.Pending : _error is null ? ValueTaskSourceStatus.Succeeded : _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : ValueTaskSourceStatus.Faulted; @@ -96,7 +102,7 @@ public ValueTaskSourceStatus GetStatus(short token) [StackTraceHidden] public TResult GetResult(short token) { - if (token != _version || !_completed || _error is not null) + if (token != _version || !IsCompleted || _error is not null) { ThrowForFailedGetResult(); } @@ -125,6 +131,17 @@ public void OnCompleted(Action continuation, object? state, short token } ValidateToken(token); + // We need to store the state before the CompareExchange, so that if it completes immediately + // after the CompareExchange, it'll find the state already stored. If someone misuses this + // and schedules multiple continuations erroneously, we could end up using the wrong state. + // Make a best-effort attempt to catch such misuse. + if (_continuationState is not null) + { + ThrowHelper.ThrowInvalidOperationException(); + } + _continuationState = state; + + Debug.Assert(_capturedContext is null); if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { _capturedContext = ExecutionContext.Capture(); @@ -151,35 +168,36 @@ public void OnCompleted(Action continuation, object? state, short token } } - // We need to set the continuation state before we swap in the delegate, so that - // if there's a race between this and SetResult/Exception and SetResult/Exception - // sees the _continuation as non-null, it'll be able to invoke it with the state - // stored here. However, this also means that if this is used incorrectly (e.g. - // awaited twice concurrently), _continuationState might get erroneously overwritten. - // To minimize the chances of that, we check preemptively whether _continuation - // is already set to something other than the completion sentinel. - object? storedContinuation = _continuation; - if (storedContinuation is null) + // Try to set the provided continuation into _continuation. If this succeeds, that means the operation + // has not yet completed, and the completer will be responsible for invoking the callback. If this fails, + // that means the operation has already completed, and we must invoke the callback, but because we're still + // inside the awaiter's OnCompleted method and we want to avoid possible stack dives, we must invoke + // the continuation asynchronously rather than synchronously. + Action? prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prevContinuation is null) { - _continuationState = state; - storedContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); - if (storedContinuation is null) - { - // Operation hadn't already completed, so we're done. The continuation will be - // invoked when SetResult/Exception is called at some later point. - return; - } + // Operation hadn't already completed, so we're done. The continuation will be + // invoked when SetResult/Exception is called at some later point. + return; } - // Operation already completed, so we need to queue the supplied callback. - // At this point the storedContinuation should be the sentinal; if it's not, the instance was misused. - Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null"); - if (!ReferenceEquals(storedContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) + // Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order + // to avoid stack diving; this path happens in the rare race when we're setting up to await and the + // object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted. + + // We no longer need the stored values as we will be passing the state when queuing directly + _continuationState = null; + object? capturedContext = _capturedContext; + _capturedContext = null; + + // If the set failed because there's already a delegate in _continuation, but that delegate is + // something other than the completion sentinel, something went wrong, which should only happen if + // the instance was erroneously used, likely to hook up multiple continuations. + if (!ReferenceEquals(prevContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) { ThrowHelper.ThrowInvalidOperationException(); } - object? capturedContext = _capturedContext; switch (capturedContext) { case null: @@ -209,11 +227,10 @@ private void ValidateToken(short token) /// Signals that the operation has completed. Invoked after the result or error has been set. private void SignalCompletion() { - if (_completed) + if (IsCompleted) { ThrowHelper.ThrowInvalidOperationException(); } - _completed = true; Action? continuation = Volatile.Read(ref _continuation) ?? diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index be40c6d7924131..b22501ca78a818 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -139,7 +139,7 @@ private CancellationToken CancellationToken #endif /// Gets whether the operation has completed. - internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel); + internal bool IsCompleted => ReferenceEquals(Volatile.Read(ref _continuation), s_completedSentinel); /// Completes the operation with a failed state and the specified error. /// The error. @@ -267,12 +267,13 @@ private void SetCompletionAndInvokeContinuation() Debug.Assert(_continuation is not null); object? ctx = _capturedContext; + _capturedContext = null; + ExecutionContext? ec = ctx is null ? null : ctx as ExecutionContext ?? (ctx as CapturedSchedulerAndExecutionContext)?._executionContext; - _capturedContext = null; if (ec is null) { Action c = _continuation!; @@ -359,43 +360,52 @@ public void OnCompleted(Action continuation, object? state, short token // inside the awaiter's OnCompleted method and we want to avoid possible stack dives, we must invoke // the continuation asynchronously rather than synchronously. Action? prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); - if (prevContinuation is not null) + if (prevContinuation is null) { - // If the set failed because there's already a delegate in _continuation, but that delegate is - // something other than s_completedSentinel, something went wrong, which should only happen if - // the instance was erroneously used, likely to hook up multiple continuations. - Debug.Assert(IsCompleted, $"Expected IsCompleted"); - if (!ReferenceEquals(prevContinuation, s_completedSentinel)) - { - Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel."); - ThrowMultipleContinuations(); - } + // Operation hadn't already completed, so we're done. The continuation will be + // invoked when SetResult/Exception is called at some later point. + return; + } - // Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order - // to avoid stack diving; this path happens in the rare race when we're setting up to await and the - // object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted. - if (_capturedContext is null) - { - ChannelUtilities.UnsafeQueueUserWorkItem(continuation, state); - } - else if (sc is not null) - { - sc.Post(static s => - { - var t = (KeyValuePair, object?>)s!; - t.Key(t.Value); - }, new KeyValuePair, object?>(continuation, state)); - } - else if (ts is not null) - { - Debug.Assert(ts is not null); - Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); - } - else + // Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order + // to avoid stack diving; this path happens in the rare race when we're setting up to await and the + // object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted. + + // We no longer need the stored values as we will be passing the state when queuing directly + _continuationState = null; + object? capturedContext = _capturedContext; + _capturedContext = null; + + // If the set failed because there's already a delegate in _continuation, but that delegate is + // something other than the completion sentinel, something went wrong, which should only happen if + // the instance was erroneously used, likely to hook up multiple continuations. + Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel."); + Debug.Assert(IsCompleted, $"Expected IsCompleted"); + if (!ReferenceEquals(prevContinuation, s_completedSentinel)) + { + ThrowMultipleContinuations(); + } + + if (capturedContext is null) + { + ChannelUtilities.UnsafeQueueUserWorkItem(continuation, state); + } + else if (sc is not null) + { + sc.Post(static s => { - Debug.Assert(_capturedContext is ExecutionContext); - ChannelUtilities.QueueUserWorkItem(continuation, state); - } + var t = (KeyValuePair, object?>)s!; + t.Key(t.Value); + }, new KeyValuePair, object?>(continuation, state)); + } + else if (ts is not null) + { + Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + } + else + { + Debug.Assert(capturedContext is ExecutionContext); + ChannelUtilities.QueueUserWorkItem(continuation, state); } } @@ -470,11 +480,20 @@ void IValueTaskSource.GetResult(short token) if (_pooled) { - Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data + ClearRetainedState(); + // enable reuse only after fetching all needed data + Volatile.Write(ref _continuation, s_availableSentinel); } error?.Throw(); } + + internal virtual void ClearRetainedState() + { + _error = null; + Debug.Assert(_continuationState == null); + Debug.Assert(_capturedContext == null); + } } /// Represents an asynchronous operation with a result on a channel. @@ -515,13 +534,21 @@ public TResult GetResult(short token) if (_pooled) { - Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data + ClearRetainedState(); + // enable reuse only after fetching all needed data + Volatile.Write(ref _continuation, s_availableSentinel); } error?.Throw(); return result!; } + internal override void ClearRetainedState() + { + base.ClearRetainedState(); + _result = default; + } + /// Attempts to take ownership of the pooled instance. /// true if the instance is now owned by the caller, in which case its state has been reset; otherwise, false. public bool TryOwnAndReset() @@ -529,10 +556,6 @@ public bool TryOwnAndReset() Debug.Assert(_pooled, "Should only be used for pooled objects"); if (ReferenceEquals(Interlocked.CompareExchange(ref _continuation, null, s_availableSentinel), s_availableSentinel)) { - _continuationState = null; - _result = default; - _error = null; - _capturedContext = null; return true; } From 4e7b20301977ccb680b4e62b3dacf17eb1e9c394 Mon Sep 17 00:00:00 2001 From: Vladimir Sadov Date: Tue, 21 Apr 2026 18:33:14 -0700 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index 758e3e7942a548..a3a4f19796b37f 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -51,7 +51,7 @@ public void Reset() // Reset/update state for the next use/await of this instance. // Order of assignments is unimportant here. // The outer user always ensures that the state is not accessed across - // reset point by when implementing Rent/Return operations. + // the reset point when implementing Rent/Return operations. _version++; Debug.Assert(_continuation == null || IsCompleted); _continuation = null; @@ -242,7 +242,7 @@ private void SignalCompletion() _continuationState = null; object? context = _capturedContext; _capturedContext = null; - _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; + Volatile.Write(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel); if (context is null) { From 8f5499718f68c3394c63f75d230dda611a56b4d5 Mon Sep 17 00:00:00 2001 From: vsadov <8218165+VSadov@users.noreply.github.com> Date: Tue, 21 Apr 2026 19:25:49 -0700 Subject: [PATCH 4/4] PR feedback --- .../Tasks/Sources/ManualResetValueTaskSourceCore.cs | 5 ++++- .../src/System/Threading/Channels/AsyncOperation.cs | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs index a3a4f19796b37f..f9a4e2bb10e08a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Sources/ManualResetValueTaskSourceCore.cs @@ -242,7 +242,10 @@ private void SignalCompletion() _continuationState = null; object? context = _capturedContext; _capturedContext = null; - Volatile.Write(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel); + // NB: this write must happen after setting result/error, but + // _continuation is set to not-null via CAS after setting result/error, + // and this write is after that. + _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; if (context is null) { diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index b22501ca78a818..16f6741f90567d 100644 --- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -491,6 +491,8 @@ void IValueTaskSource.GetResult(short token) internal virtual void ClearRetainedState() { _error = null; + Debug.Assert((object?)Next == null); + Debug.Assert((object?)Previous == null); Debug.Assert(_continuationState == null); Debug.Assert(_capturedContext == null); }