Skip to content

Commit

Permalink
Fix cancellation-related deadlock in BoundedChannel (#33883)
Browse files Browse the repository at this point in the history
If:
- a BoundedChannel is opted-in to synchronous continuations via someone setting AllowSynchronousContinuations on its options when constructed, and
- if a read operation is performed when there's no data available and with a cancelable cancellation token, and
- if cancellation is requested concurrently with a write being performed that will satisfy that read...

then there's the potential for a deadlock.  While holding a lock, the writer needs to unregister the cancellation for the reader's operation prior to trying to complete it, and needs to know that all cancellation-related work has quiesced before it proceeds to try to complete that reader.  If the cancellation is currently in progress then, it waits for the cancellation callback to complete.  However, if a synchronous continuation was used, then it's possible the user's synchronous continuation might call back into the bounded channel and perform an operation that might require taking that same lock.  Deadlock.

The fix is to simply not try to use synchronous continuations with bounded channel when a cancelable token is employed.
  • Loading branch information
stephentoub committed Mar 21, 2020
1 parent ee13e3e commit d7db838
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,15 @@ public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
}
}

// Otherwise, queue the reader.
var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously, cancellationToken);
// Otherwise, queue a reader. Note that in addition to checking whether synchronous continuations were requested,
// we also check whether the supplied cancellation token can be canceled. The writer calls UnregisterCancellation
// while holding the lock, and if a callback needs to be unregistered and is currently running, it needs to wait
// for that callback to complete so that the subsequent code knows it won't be contending with another thread
// trying to complete the operation. However, if we allowed a synchronous continuation from this operation, that
// cancellation callback could end up running arbitrary code, including code that called back into the reader or
// writer and tried to take the same lock held by the thread running UnregisterCancellation... deadlock. As such,
// we only allow synchronous continuations here if both a) the caller requested it and the token isn't cancelable.
var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
parent._blockedReaders.EnqueueTail(reader);
return reader.ValueTaskOfT;
}
Expand Down Expand Up @@ -193,8 +200,15 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
}
}

// Otherwise, queue a reader.
var waiter = new AsyncOperation<bool>(parent._runContinuationsAsynchronously, cancellationToken);
// Otherwise, queue a reader. Note that in addition to checking whether synchronous continuations were requested,
// we also check whether the supplied cancellation token can be canceled. The writer calls UnregisterCancellation
// while holding the lock, and if a callback needs to be unregistered and is currently running, it needs to wait
// for that callback to complete so that the subsequent code knows it won't be contending with another thread
// trying to complete the operation. However, if we allowed a synchronous continuation from this operation, that
// cancellation callback could end up running arbitrary code, including code that called back into the reader or
// writer and tried to take the same lock held by the thread running UnregisterCancellation... deadlock. As such,
// we only allow synchronous continuations here if both a) the caller requested it and the token isn't cancelable.
var waiter = new AsyncOperation<bool>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
ChannelUtilities.QueueWaiter(ref _parent._waitingReadersTail, waiter);
return waiter.ValueTaskOfT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,18 @@ public async Task WaitToWriteAsync_AfterFullThenRead_ReturnsTrue()
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public void AllowSynchronousContinuations_WaitToReadAsync_ContinuationsInvokedAccordingToSetting(bool allowSynchronousContinuations)
[MemberData(nameof(ThreeBools))]
public void AllowSynchronousContinuations_Reading_ContinuationsInvokedAccordingToSetting(bool allowSynchronousContinuations, bool cancelable, bool waitToReadAsync)
{
var c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { AllowSynchronousContinuations = allowSynchronousContinuations });

CancellationToken ct = cancelable ? new CancellationTokenSource().Token : CancellationToken.None;

int expectedId = Environment.CurrentManagedThreadId;
Task r = c.Reader.WaitToReadAsync().AsTask().ContinueWith(_ =>
Task t = waitToReadAsync ? (Task)c.Reader.WaitToReadAsync(ct).AsTask() : c.Reader.ReadAsync(ct).AsTask();
Task r = t.ContinueWith(_ =>
{
Assert.Equal(allowSynchronousContinuations, expectedId == Environment.CurrentManagedThreadId);
Assert.Equal(allowSynchronousContinuations && !cancelable, expectedId == Environment.CurrentManagedThreadId);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

Assert.True(c.Writer.WriteAsync(42).IsCompletedSuccessfully);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public abstract partial class ChannelTestBase : TestBase
protected virtual bool RequiresSingleWriter => false;
protected virtual bool BuffersItems => true;

public static IEnumerable<object[]> ThreeBools =>
from b1 in new[] { false, true }
from b2 in new[] { false, true }
from b3 in new[] { false, true }
select new object[] { b1, b2, b3 };

[Fact]
public void ValidateDebuggerAttributes()
{
Expand Down

0 comments on commit d7db838

Please sign in to comment.