Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public struct ManualResetValueTaskSourceCore<TResult>
private TResult? _result;
/// <summary>The current version of this value, used to help prevent misuse.</summary>
private short _version;
/// <summary>Whether the current operation has completed.</summary>
private bool _completed;
/// <summary>Whether to force continuations to run asynchronously.</summary>
private bool _runContinuationsAsynchronously;

Expand All @@ -51,13 +49,18 @@ public bool RunContinuationsAsynchronously
public void Reset()
{
// Reset/update state for the next use/await of this instance.
// Order of assignments is unimportant here.
// The outer user always ensures that the state is not accessed across
// the reset point when implementing Rent/Return operations.
_version++;
Debug.Assert(_continuation == null || IsCompleted);
_continuation = null;
Debug.Assert(_continuationState == null);
Debug.Assert(_capturedContext == null);
_continuationState = null;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still clearing _continuationState and _capturedContext here, not relying on assertions since the type is public.
The assertions are there to catch internal misuse.

_capturedContext = null;
_error = null;
_result = default;
_completed = false;
}

/// <summary>Completes with a successful result.</summary>
Expand All @@ -79,13 +82,16 @@ public void SetException(Exception error)
/// <summary>Gets the operation version.</summary>
public short Version => _version;

/// <summary>Gets whether the operation has completed.</summary>
internal bool IsCompleted => ReferenceEquals(Volatile.Read(ref _continuation), ManualResetValueTaskSourceCoreShared.s_sentinel);

/// <summary>Gets the status of the operation.</summary>
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param>
public ValueTaskSourceStatus GetStatus(short token)
{
ValidateToken(token);
return
Volatile.Read(ref _continuation) is null || !_completed ? ValueTaskSourceStatus.Pending :
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error is null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
Expand All @@ -96,7 +102,7 @@ public ValueTaskSourceStatus GetStatus(short token)
[StackTraceHidden]
public TResult GetResult(short token)
{
if (token != _version || !_completed || _error is not null)
if (token != _version || !IsCompleted || _error is not null)
{
ThrowForFailedGetResult();
}
Expand Down Expand Up @@ -125,6 +131,17 @@ public void OnCompleted(Action<object?> continuation, object? state, short token
}
ValidateToken(token);

// We need to store the state before the CompareExchange, so that if it completes immediately
// after the CompareExchange, it'll find the state already stored. If someone misuses this
// and schedules multiple continuations erroneously, we could end up using the wrong state.
// Make a best-effort attempt to catch such misuse.
if (_continuationState is not null)
{
ThrowHelper.ThrowInvalidOperationException();
}
_continuationState = state;

Debug.Assert(_capturedContext is null);
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
_capturedContext = ExecutionContext.Capture();
Expand All @@ -151,35 +168,36 @@ public void OnCompleted(Action<object?> continuation, object? state, short token
}
}

// We need to set the continuation state before we swap in the delegate, so that
// if there's a race between this and SetResult/Exception and SetResult/Exception
// sees the _continuation as non-null, it'll be able to invoke it with the state
// stored here. However, this also means that if this is used incorrectly (e.g.
// awaited twice concurrently), _continuationState might get erroneously overwritten.
// To minimize the chances of that, we check preemptively whether _continuation
// is already set to something other than the completion sentinel.
object? storedContinuation = _continuation;
if (storedContinuation is null)
// Try to set the provided continuation into _continuation. If this succeeds, that means the operation
// has not yet completed, and the completer will be responsible for invoking the callback. If this fails,
// that means the operation has already completed, and we must invoke the callback, but because we're still
// inside the awaiter's OnCompleted method and we want to avoid possible stack dives, we must invoke
// the continuation asynchronously rather than synchronously.
Action<object?>? prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prevContinuation is null)
{
_continuationState = state;
storedContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (storedContinuation is null)
{
// Operation hadn't already completed, so we're done. The continuation will be
// invoked when SetResult/Exception is called at some later point.
return;
}
// Operation hadn't already completed, so we're done. The continuation will be
// invoked when SetResult/Exception is called at some later point.
return;
}

// Operation already completed, so we need to queue the supplied callback.
// At this point the storedContinuation should be the sentinal; if it's not, the instance was misused.
Debug.Assert(storedContinuation is not null, $"{nameof(storedContinuation)} is null");
if (!ReferenceEquals(storedContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.

// We no longer need the stored values as we will be passing the state when queuing directly
_continuationState = null;
object? capturedContext = _capturedContext;
_capturedContext = null;

// If the set failed because there's already a delegate in _continuation, but that delegate is
// something other than the completion sentinel, something went wrong, which should only happen if
// the instance was erroneously used, likely to hook up multiple continuations.
if (!ReferenceEquals(prevContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
{
ThrowHelper.ThrowInvalidOperationException();
}

object? capturedContext = _capturedContext;
switch (capturedContext)
{
case null:
Expand Down Expand Up @@ -209,11 +227,10 @@ private void ValidateToken(short token)
/// <summary>Signals that the operation has completed. Invoked after the result or error has been set.</summary>
private void SignalCompletion()
{
if (_completed)
if (IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException();
}
_completed = true;

Action<object?>? continuation =
Volatile.Read(ref _continuation) ??
Expand All @@ -225,6 +242,10 @@ private void SignalCompletion()
_continuationState = null;
object? context = _capturedContext;
_capturedContext = null;
// NB: this write must happen after setting result/error, but
// _continuation is set to not-null via CAS after setting result/error,
// and this write is after that.
_continuation = ManualResetValueTaskSourceCoreShared.s_sentinel;
Comment on lines +247 to +248
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SignalCompletion, _continuation = ManualResetValueTaskSourceCoreShared.s_sentinel; is a non-volatile write. Since IsCompleted uses Volatile.Read(ref _continuation) to publish/observe completion, this should be a releasing write (e.g., Volatile.Write or an interlocked exchange) to ensure other threads can't observe completion before _result / _error are visible (especially on weak memory models like ARM64).

Suggested change
// and this write is after that.
_continuation = ManualResetValueTaskSourceCoreShared.s_sentinel;
// and this write is after that. Use a volatile write so observers that
// use Volatile.Read(ref _continuation) can't observe completion before
// the preceding writes are visible.
Volatile.Write(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel);

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained in the comments the interlocked operation that stores continuation already orders this write the way we want.


Comment thread
VSadov marked this conversation as resolved.
if (context is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private CancellationToken CancellationToken
#endif

/// <summary>Gets whether the operation has completed.</summary>
internal bool IsCompleted => ReferenceEquals(_continuation, s_completedSentinel);
internal bool IsCompleted => ReferenceEquals(Volatile.Read(ref _continuation), s_completedSentinel);
Copy link
Copy Markdown
Member Author

@VSadov VSadov Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performing an ordinary read could allow prefetching _error and we could end up assuming that a failed/cancelled operation actually cleanly completed.


/// <summary>Completes the operation with a failed state and the specified error.</summary>
/// <param name="exception">The error.</param>
Expand Down Expand Up @@ -267,12 +267,13 @@ private void SetCompletionAndInvokeContinuation()
Debug.Assert(_continuation is not null);

object? ctx = _capturedContext;
_capturedContext = null;

ExecutionContext? ec =
ctx is null ? null :
ctx as ExecutionContext ??
(ctx as CapturedSchedulerAndExecutionContext)?._executionContext;

_capturedContext = null;
if (ec is null)
{
Action<object?> c = _continuation!;
Expand Down Expand Up @@ -359,43 +360,52 @@ public void OnCompleted(Action<object?> continuation, object? state, short token
// inside the awaiter's OnCompleted method and we want to avoid possible stack dives, we must invoke
// the continuation asynchronously rather than synchronously.
Action<object?>? prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prevContinuation is not null)
if (prevContinuation is null)
{
// If the set failed because there's already a delegate in _continuation, but that delegate is
// something other than s_completedSentinel, something went wrong, which should only happen if
// the instance was erroneously used, likely to hook up multiple continuations.
Debug.Assert(IsCompleted, $"Expected IsCompleted");
if (!ReferenceEquals(prevContinuation, s_completedSentinel))
{
Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel.");
ThrowMultipleContinuations();
}
// Operation hadn't already completed, so we're done. The continuation will be
// invoked when SetResult/Exception is called at some later point.
return;
}

// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.
if (_capturedContext is null)
{
ChannelUtilities.UnsafeQueueUserWorkItem(continuation, state);
}
else if (sc is not null)
{
sc.Post(static s =>
{
var t = (KeyValuePair<Action<object?>, object?>)s!;
t.Key(t.Value);
}, new KeyValuePair<Action<object?>, object?>(continuation, state));
}
else if (ts is not null)
{
Debug.Assert(ts is not null);
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
}
else
// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.

// We no longer need the stored values as we will be passing the state when queuing directly
_continuationState = null;
object? capturedContext = _capturedContext;
_capturedContext = null;

// If the set failed because there's already a delegate in _continuation, but that delegate is
// something other than the completion sentinel, something went wrong, which should only happen if
// the instance was erroneously used, likely to hook up multiple continuations.
Debug.Assert(prevContinuation != s_availableSentinel, "Continuation was the available sentinel.");
Debug.Assert(IsCompleted, $"Expected IsCompleted");
if (!ReferenceEquals(prevContinuation, s_completedSentinel))
{
ThrowMultipleContinuations();
}

if (capturedContext is null)
{
ChannelUtilities.UnsafeQueueUserWorkItem(continuation, state);
}
else if (sc is not null)
{
sc.Post(static s =>
{
Debug.Assert(_capturedContext is ExecutionContext);
ChannelUtilities.QueueUserWorkItem(continuation, state);
}
var t = (KeyValuePair<Action<object?>, object?>)s!;
t.Key(t.Value);
}, new KeyValuePair<Action<object?>, object?>(continuation, state));
}
else if (ts is not null)
{
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
}
else
{
Debug.Assert(capturedContext is ExecutionContext);
ChannelUtilities.QueueUserWorkItem(continuation, state);
}
}

Expand Down Expand Up @@ -470,11 +480,22 @@ void IValueTaskSource.GetResult(short token)

if (_pooled)
{
Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data
ClearRetainedState();
// enable reuse only after fetching all needed data
Volatile.Write(ref _continuation, s_availableSentinel);
}

error?.Throw();
}

internal virtual void ClearRetainedState()
{
_error = null;
Comment thread
VSadov marked this conversation as resolved.
Debug.Assert((object?)Next == null);
Debug.Assert((object?)Previous == null);
Debug.Assert(_continuationState == null);
Debug.Assert(_capturedContext == null);
}
Comment thread
VSadov marked this conversation as resolved.
}

/// <summary>Represents an asynchronous operation with a result on a channel.</summary>
Expand Down Expand Up @@ -515,24 +536,28 @@ public TResult GetResult(short token)

if (_pooled)
{
Volatile.Write(ref _continuation, s_availableSentinel); // only after fetching all needed data
ClearRetainedState();
// enable reuse only after fetching all needed data
Volatile.Write(ref _continuation, s_availableSentinel);
}

error?.Throw();
return result!;
}

internal override void ClearRetainedState()
{
base.ClearRetainedState();
_result = default;
}

/// <summary>Attempts to take ownership of the pooled instance.</summary>
/// <returns>true if the instance is now owned by the caller, in which case its state has been reset; otherwise, false.</returns>
public bool TryOwnAndReset()
{
Debug.Assert(_pooled, "Should only be used for pooled objects");
if (ReferenceEquals(Interlocked.CompareExchange(ref _continuation, null, s_availableSentinel), s_availableSentinel))
{
_continuationState = null;
_result = default;
_error = null;
_capturedContext = null;
return true;
Comment thread
VSadov marked this conversation as resolved.
}

Expand Down
Loading