Skip to content

Commit

Permalink
WIP - investigate and resolve async-write-path bugs (#1057)
Browse files Browse the repository at this point in the history
Replace SemaphoreSlim with MutexSlim, due to major impact from https://github.com/dotnet/corefx/issues/35393
  • Loading branch information
mgravell committed Feb 19, 2019
1 parent a804031 commit 6873941
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 75 deletions.
21 changes: 12 additions & 9 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2126,19 +2126,22 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
return CompletedTask<T>.Default(state);
}

if (message.IsFireAndForget)
TaskCompletionSource<T> tcs = null;
ResultBox<T> source = null;
if (!message.IsFireAndForget)
{
tcs = TaskSource.Create<T>(state);
source = ResultBox<T>.Get(tcs);
}
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);

if (tcs == null)
{
TryPushMessageToBridgeAsync(message, processor, null, ref server);
return CompletedTask<T>.Default(null); // F+F explicitly does not get async-state
}
else
{
var tcs = TaskSource.Create<T>(state);
var source = ResultBox<T>.Get(tcs);
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);

if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);

var result = write.Result;
if (result != WriteResult.Success)
{
Expand All @@ -2157,7 +2160,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @
var ex = @this.GetException(result, message, server);
ThrowFailed(tcs, ex);
}
return await tcs.Task.ForAwait();
return tcs == null ? default(T) : await tcs.Task.ForAwait();
}

internal Exception GetException(WriteResult result, Message message, ServerEndPoint server)
Expand Down
128 changes: 79 additions & 49 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial.Threading;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;

namespace StackExchange.Redis
Expand Down Expand Up @@ -52,6 +53,8 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
completionManager = new CompletionManager(Multiplexer, Name);
TimeoutMilliseconds = timeoutMilliseconds;
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds,
scheduler: Multiplexer?.SocketManager?.SchedulerPool);
}
private readonly int TimeoutMilliseconds;

Expand Down Expand Up @@ -634,7 +637,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
return true;
}

private readonly SemaphoreSlim _SingleWriterLock = new SemaphoreSlim(1);
private readonly MutexSlim _singleWriterMutex;

private Message _activeMesssage;

Expand Down Expand Up @@ -681,34 +684,37 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
}
}

private async Task<WriteResult> WriteMessageTakingDelayedWriteLockAsync(PhysicalConnection physical, Message message)
private async ValueTask<WriteResult> WriteMessageTakingDelayedWriteLockAsync(MutexSlim.AwaitableLockToken pendingLock, PhysicalConnection physical, Message message)
{
bool haveLock = false;
try
{
// WriteMessageTakingWriteLockAsync will have checked for immediate availability,
// so this is the fallback case - fine to go straight to "await"
haveLock = await _SingleWriterLock.WaitAsync(TimeoutMilliseconds).ForAwait();
if (!haveLock)

// note: timeout is specified in mutex-constructor
using (var token = await pendingLock)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
if (!token.Success)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}

var result = WriteMessageInsideLock(physical, message);
var result = WriteMessageInsideLock(physical, message);

if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
}
if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false).ForAwait();
}

physical.SetIdle();
return result;
physical.SetIdle();
UnmarkActiveMessage(message);
return result;
}
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
}

[Obsolete("prefer async")]
Expand All @@ -717,32 +723,33 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point

bool haveLock = false;
try
{
haveLock = _SingleWriterLock.Wait(TimeoutMilliseconds);
if (!haveLock)
using (var token = _singleWriterMutex.TryWait())
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
if (!token.Success)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}

var result = WriteMessageInsideLock(physical, message);
var result = WriteMessageInsideLock(physical, message);

if (result == WriteResult.Success)
{
if (result == WriteResult.Success)
{
#pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds);
result = physical.FlushSync(false, TimeoutMilliseconds);
#pragma warning restore CS0618
}
}

physical.SetIdle();
return result;
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
}

/// <summary>
Expand All @@ -755,12 +762,24 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point

bool haveLock = false;
bool releaseLock = false;
MutexSlim.LockToken token = default;
try
{
// try to acquire it synchronously
haveLock = _SingleWriterLock.Wait(0);
if (!haveLock) return new ValueTask<WriteResult>(WriteMessageTakingDelayedWriteLockAsync(physical, message));
// note: timeout is specified in mutex-constructor
var pending = _singleWriterMutex.TryWaitAsync(options: MutexSlim.WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingDelayedWriteLockAsync(pending, physical, message);

releaseLock = true;
token = pending.GetResult(); // we can't use "using" for this, because we might not want to kill it yet
if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return new ValueTask<WriteResult>(WriteResult.TimeoutBeforeWrite);
}

var result = WriteMessageInsideLock(physical, message);

Expand All @@ -769,25 +788,38 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
var flush = physical.FlushAsync(false);
if (!flush.IsCompletedSuccessfully)
{
haveLock = false; // so we don't release prematurely
return new ValueTask<WriteResult>(CompleteWriteAndReleaseLockAsync(flush, message));
releaseLock = false; // so we don't release prematurely
return CompleteWriteAndReleaseLockAsync(token, flush, message);
}

result = flush.Result; // we know it was completed, this is fine
}

UnmarkActiveMessage(message);
physical.SetIdle();

return new ValueTask<WriteResult>(result);
}
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
finally
{
if (releaseLock) token.Dispose();
}
}

private async Task<WriteResult> CompleteWriteAndReleaseLockAsync(ValueTask<WriteResult> flush, Message message)
private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(MutexSlim.LockToken lockToken, ValueTask<WriteResult> flush, Message message)
{
try { return await flush.ForAwait(); }
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { ReleaseSingleWriterLock(message); }
using (lockToken)
{
try
{
var result = await flush.ForAwait();
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
}
}

private WriteResult HandleWriteException(Message message, Exception ex)
Expand All @@ -797,11 +829,9 @@ private WriteResult HandleWriteException(Message message, Exception ex)
return WriteResult.WriteFailure;
}

private void ReleaseSingleWriterLock(Message message)
{
Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us
_SingleWriterLock.Release();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void UnmarkActiveMessage(Message message)
=> Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us

private State ChangeState(State newState)
{
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
else
{
unableToConnectError = true;
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. "
+ PerfCounterHelper.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
}
err += PerfCounterHelper.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/StackExchange.Redis/ServerSelectionStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ internal sealed class ServerSelectionStrategy
{
public const int NoSlot = -1, MultipleSlots = -2;
private const int RedisClusterSlotCount = 16384;
private static readonly ushort[] crc16tab =
{
private static ReadOnlySpan<ushort> s_crc16tab => new ushort[]
{ // this syntax allows a special-case population implementation by the compiler/JIT
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
Expand Down Expand Up @@ -72,6 +72,7 @@ private static unsafe int GetClusterSlot(in RedisKey key)
{
var blob = (byte[])key;
fixed (byte* ptr = blob)
fixed (ushort* crc16tab = s_crc16tab)
{
int offset = 0, count = blob.Length, start, end;
if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/StackExchange.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.9" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.0.18" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
Expand Down
13 changes: 1 addition & 12 deletions toys/TestConsole/Program.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace TestConsole
{
internal static class Program
{
public static async Task Main()
public static void Main()
{
using (var muxer = await ConnectionMultiplexer.ConnectAsync("127.0.0.1"))
{
var db = muxer.GetDatabase();
var sub = muxer.GetSubscriber();
Console.WriteLine("subscribing");
ChannelMessageQueue queue = await sub.SubscribeAsync("yolo");
Console.WriteLine("subscribed");
}
}
}
}

0 comments on commit 6873941

Please sign in to comment.