diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index e7af56a69..29e8299b9 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -41,6 +41,7 @@ internal sealed class PhysicalBridge : IDisposable private int _backlogProcessorIsRunning = 0; private int _backlogCurrentEnqueued = 0; private long _backlogTotalEnqueued = 0; + private Exception? _abandonPendingBacklogException; private int activeWriters = 0; private int beating; @@ -483,11 +484,18 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti private void AbandonPendingBacklog(Exception ex) { + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // CheckBacklogForTimeouts() depends on this being set to properly complete dequeued messages. + Volatile.Write(ref _abandonPendingBacklogException, ex); + while (BacklogTryDequeue(out Message? next)) { Multiplexer.OnMessageFaulted(next, ex); next.SetExceptionAndComplete(ex, this); } + + // Best effort cleanup to avoid false positive thread safey check failures in CheckBacklogForTimeouts(). + if (_backlogStatus != BacklogStatus.CheckingForTimeout) Interlocked.CompareExchange(ref _abandonPendingBacklogException, null, ex); } internal void OnFullyEstablished(PhysicalConnection connection, string source) @@ -888,24 +896,29 @@ private void CheckBacklogForTimeouts() var now = Environment.TickCount; var timeout = TimeoutMilliseconds; - // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use - // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately. - // But we reduce contention by only locking if we see something that looks timed out. + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // Because AbandonPendingBacklog() is the only dequeuer that can run concurrently, + // locking can be avoided by throwing the AbandonPendingBacklog() exception here. while (_backlog.TryPeek(out Message? message)) { // See if the message has pass our async timeout threshold // Note: All timed out messages must be dequeued, even when no completion is needed, to be able to dequeue and complete other timed out messages. if (!message.HasTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking - lock (_backlog) + if (!BacklogTryDequeue(out var message2)) message2 = null; // consume it for real + if (message != message2) { - // Peek again since we didn't have lock before... - // and rerun the exact same checks as above, note that it may be a different message now - if (!_backlog.TryPeek(out message)) break; - if (!message.HasTimedOut(now, timeout, out var _)) break; + var ex = Volatile.Read(ref _abandonPendingBacklogException); + var isAbandonPendingBacklog = ex != null; + ex ??= new RedisException("Thread safety bug detected! A queue message disappeared when AbandonPendingBacklog() was not running."); + message2?.SetExceptionAndComplete(ex, this); - if (!BacklogTryDequeue(out var message2) || (message != message2)) // consume it for real + if (isAbandonPendingBacklog) { - throw new RedisException("Thread safety bug detected! A queue message disappeared while we had the backlog lock"); + break; + } + else + { + throw ex; } } @@ -976,20 +989,15 @@ private async Task ProcessBacklogAsync() if (isDisposed && BacklogHasItems) { _backlogStatus = BacklogStatus.NotifyingDisposed; - // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use - // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately. - // But we reduce contention by only locking if we see something that looks timed out. + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // CheckBacklogForTimeouts() depends on not running concurrently with this. while (BacklogHasItems) { - Message? message = null; - lock (_backlog) + if (!BacklogTryDequeue(out Message? message)) { - if (!BacklogTryDequeue(out message)) - { - break; - } + break; } - + var ex = ExceptionFactory.Timeout(Multiplexer, "The message was in the backlog when connection was disposed", message, ServerEndPoint, WriteResult.TimeoutBeforeWrite, this); message.SetExceptionAndComplete(ex, this); } @@ -1073,17 +1081,13 @@ private async Task ProcessBridgeBacklogAsync() // If we can't write them, abort and wait for the next heartbeat or activation to try this again. while (IsConnected && physical?.HasOutputPipe == true) { - Message? message; _backlogStatus = BacklogStatus.CheckingForWork; - lock (_backlog) + // Note that we're actively taking it off the queue here, not peeking + // If there's nothing left in queue, we're done. + if (!BacklogTryDequeue(out Message? message)) { - // Note that we're actively taking it off the queue here, not peeking - // If there's nothing left in queue, we're done. - if (!BacklogTryDequeue(out message)) - { - break; - } + break; } try diff --git a/tests/StackExchange.Redis.Tests/Issues/Issue2430Tests.cs b/tests/StackExchange.Redis.Tests/Issues/Issue2430Tests.cs new file mode 100644 index 000000000..79a402578 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/Issues/Issue2430Tests.cs @@ -0,0 +1,49 @@ +using System; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests.Issues +{ + public class Issue2430Tests : TestBase + { + public Issue2430Tests(ITestOutputHelper output) : base(output) { } + + [Fact] + public void Execute() + { + var options = new ConfigurationOptions() + { + AbortOnConnectFail = false, + ConnectTimeout = 1, + ConnectRetry = 0, + SyncTimeout = 1, + AllowAdmin = true, + EndPoints = { GetConfiguration() }, + }; + + using var conn = ConnectionMultiplexer.Connect(options, Writer); + var db = conn.GetDatabase(); + + // Disconnect and don't allow re-connection + conn.AllowConnect = false; + var server = conn.GetServerSnapshot()[0]; + server.SimulateConnectionFailure(SimulatedFailureType.All); + + // Increasing the number of backlog items increases the chance of a concurrency issue occurring + var backlogTasks = new Task[100000]; + for (int i = 0; i < backlogTasks.Length; i++) + backlogTasks[i] = db.PingAsync(); + + Assert.True(Task.WaitAny(backlogTasks, 5000) != -1, "Timeout."); + conn.Dispose(); + + foreach (var task in backlogTasks) + { + Assert.True(task.IsCompleted, "Not completed."); + Assert.True(task.IsFaulted, "Not faulted."); + Assert.True(task.Exception!.InnerException is RedisTimeoutException or RedisConnectionException or ObjectDisposedException, $"Wrong exception: {task.Exception.InnerException.Message}"); + } + } + } +}