Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency issue between AbandonPendingBacklog() and CheckBacklogForTimeouts(), and remove backlog locking #2430

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 32 additions & 28 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal sealed class PhysicalBridge : IDisposable
private int _backlogProcessorIsRunning = 0;
private int _backlogCurrentEnqueued = 0;
private long _backlogTotalEnqueued = 0;
private Exception? _abandonPendingBacklogException;

private int activeWriters = 0;
private int beating;
Expand Down Expand Up @@ -483,11 +484,18 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti

private void AbandonPendingBacklog(Exception ex)
{
// Peeking at the backlog, checking message and then dequeuing is not thread-safe.
// CheckBacklogForTimeouts() depends on this being set to properly complete dequeued messages.
Volatile.Write(ref _abandonPendingBacklogException, ex);

while (BacklogTryDequeue(out Message? next))
{
Multiplexer.OnMessageFaulted(next, ex);
next.SetExceptionAndComplete(ex, this);
}

// Best effort cleanup to avoid false positive thread safey check failures in CheckBacklogForTimeouts().
if (_backlogStatus != BacklogStatus.CheckingForTimeout) Interlocked.CompareExchange(ref _abandonPendingBacklogException, null, ex);
}

internal void OnFullyEstablished(PhysicalConnection connection, string source)
Expand Down Expand Up @@ -888,24 +896,29 @@ private void CheckBacklogForTimeouts()
var now = Environment.TickCount;
var timeout = TimeoutMilliseconds;

// Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use
// a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately.
// But we reduce contention by only locking if we see something that looks timed out.
// Peeking at the backlog, checking message and then dequeuing is not thread-safe.
// Because AbandonPendingBacklog() is the only dequeuer that can run concurrently,
// locking can be avoided by throwing the AbandonPendingBacklog() exception here.
while (_backlog.TryPeek(out Message? message))
{
// See if the message has pass our async timeout threshold
// Note: All timed out messages must be dequeued, even when no completion is needed, to be able to dequeue and complete other timed out messages.
if (!message.HasTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking
lock (_backlog)
if (!BacklogTryDequeue(out var message2)) message2 = null; // consume it for real
if (message != message2)
{
// Peek again since we didn't have lock before...
// and rerun the exact same checks as above, note that it may be a different message now
if (!_backlog.TryPeek(out message)) break;
if (!message.HasTimedOut(now, timeout, out var _)) break;
var ex = Volatile.Read(ref _abandonPendingBacklogException);
var isAbandonPendingBacklog = ex != null;
ex ??= new RedisException("Thread safety bug detected! A queue message disappeared when AbandonPendingBacklog() was not running.");
message2?.SetExceptionAndComplete(ex, this);

if (!BacklogTryDequeue(out var message2) || (message != message2)) // consume it for real
if (isAbandonPendingBacklog)
{
throw new RedisException("Thread safety bug detected! A queue message disappeared while we had the backlog lock");
break;
}
else
{
throw ex;
}
}

Expand Down Expand Up @@ -976,20 +989,15 @@ private async Task ProcessBacklogAsync()
if (isDisposed && BacklogHasItems)
{
_backlogStatus = BacklogStatus.NotifyingDisposed;
// Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use
// a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately.
// But we reduce contention by only locking if we see something that looks timed out.
// Peeking at the backlog, checking message and then dequeuing is not thread-safe.
// CheckBacklogForTimeouts() depends on not running concurrently with this.
while (BacklogHasItems)
{
Message? message = null;
lock (_backlog)
if (!BacklogTryDequeue(out Message? message))
{
if (!BacklogTryDequeue(out message))
{
break;
}
break;
}

var ex = ExceptionFactory.Timeout(Multiplexer, "The message was in the backlog when connection was disposed", message, ServerEndPoint, WriteResult.TimeoutBeforeWrite, this);
message.SetExceptionAndComplete(ex, this);
}
Expand Down Expand Up @@ -1073,17 +1081,13 @@ private async Task ProcessBridgeBacklogAsync()
// If we can't write them, abort and wait for the next heartbeat or activation to try this again.
while (IsConnected && physical?.HasOutputPipe == true)
{
Message? message;
_backlogStatus = BacklogStatus.CheckingForWork;

lock (_backlog)
// Note that we're actively taking it off the queue here, not peeking
// If there's nothing left in queue, we're done.
if (!BacklogTryDequeue(out Message? message))
{
// Note that we're actively taking it off the queue here, not peeking
// If there's nothing left in queue, we're done.
if (!BacklogTryDequeue(out message))
{
break;
}
break;
}

try
Expand Down
49 changes: 49 additions & 0 deletions tests/StackExchange.Redis.Tests/Issues/Issue2430Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests.Issues
{
public class Issue2430Tests : TestBase
{
public Issue2430Tests(ITestOutputHelper output) : base(output) { }

[Fact]
public void Execute()
{
var options = new ConfigurationOptions()
{
AbortOnConnectFail = false,
ConnectTimeout = 1,
ConnectRetry = 0,
SyncTimeout = 1,
AllowAdmin = true,
EndPoints = { GetConfiguration() },
};

using var conn = ConnectionMultiplexer.Connect(options, Writer);
var db = conn.GetDatabase();

// Disconnect and don't allow re-connection
conn.AllowConnect = false;
var server = conn.GetServerSnapshot()[0];
server.SimulateConnectionFailure(SimulatedFailureType.All);

// Increasing the number of backlog items increases the chance of a concurrency issue occurring
var backlogTasks = new Task[100000];
for (int i = 0; i < backlogTasks.Length; i++)
backlogTasks[i] = db.PingAsync();

Assert.True(Task.WaitAny(backlogTasks, 5000) != -1, "Timeout.");
conn.Dispose();

foreach (var task in backlogTasks)
{
Assert.True(task.IsCompleted, "Not completed.");
Assert.True(task.IsFaulted, "Not faulted.");
Assert.True(task.Exception!.InnerException is RedisTimeoutException or RedisConnectionException or ObjectDisposedException, $"Wrong exception: {task.Exception.InnerException.Message}");
}
}
}
}