Skip to content
Permalink
Browse files

optionally re-add the "write on async/await path" code rather than re…

…lying on the backlog; add supporting code for both; heartbeat should also help clear down anything in the backlog
  • Loading branch information...
mgravell committed Mar 7, 2019
1 parent 1ad2f95 commit 4ebacd62076e5441bba3873130e30863386f371d
Showing with 96 additions and 27 deletions.
  1. +95 −26 src/StackExchange.Redis/PhysicalBridge.cs
  2. +1 −1 toys/TestConsoleBaseline/TestConsoleBaseline.csproj
@@ -489,6 +489,8 @@ internal void OnHeartbeat(bool ifConnectedOnly)
bool runThisTime = false;
try
{
CheckBacklogForTimeouts();

runThisTime = !isDisposed && Interlocked.CompareExchange(ref beating, 1, 0) == 0;
if (!runThisTime) return;

@@ -687,28 +689,13 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
{
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
bool haveBacklog;
lock (_backlog)
{
haveBacklog = _backlog.Count != 0;
}
if (haveBacklog)
{
PushToBacklog(message);
return WriteResult.Success; // queued counts as success
}
if (PushToBacklog(message, onlyIfExists: true)) return WriteResult.Success; // queued counts as success

// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
token = _singleWriterMutex.TryWait();
if (!token.Success)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}
if (!token.Success) return TimedOutBeforeWrite(message);
}

var result = WriteMessageInsideLock(physical, message);
@@ -730,15 +717,18 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void PushToBacklog(Message message)
private bool PushToBacklog(Message message, bool onlyIfExists)
{
bool startWorker;
bool wasEmpty;
lock (_backlog)
{
startWorker = _backlog.Count == 0;
wasEmpty = _backlog.Count == 0;
if (wasEmpty & onlyIfExists) return false;

_backlog.Enqueue(message);
}
if (startWorker) StartBacklogProcessor();
if (wasEmpty) StartBacklogProcessor();
return true;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void StartBacklogProcessor()
@@ -754,6 +744,27 @@ private void StartBacklogProcessor()
if (bridge != null) bridge.ProcessBacklog();
};

private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
{
lock (_backlog)
{
var now = Environment.TickCount;
var timeout = TimeoutMilliseconds;
while (_backlog.Count != 0)
{
var message = _backlog.Peek();
if (message.IsInternalCall) break; // don't stomp these (not that they should have the async timeout flag, but...)

if (!message.HasAsyncTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking
_backlog.Dequeue(); // consume it for real

// tell the message that it failed
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
}
}
}

private void ProcessBacklog()
{
LockToken token = default;
@@ -780,7 +791,7 @@ private void ProcessBacklog()

try
{
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var elapsed))
if (message.HasAsyncTimedOut(Environment.TickCount, timeout, out var _))
{
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
@@ -817,28 +828,59 @@ private void ProcessBacklog()
}
}

private WriteResult TimedOutBeforeWrite(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}

/// <summary>
/// This writes a message to the output stream
/// </summary>
/// <param name="physical">The phsyical connection to write to.</param>
/// <param name="message">The message to be written.</param>
internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message)
{
/* design decision/choice; the code works fine either way, but if this is
* set to *true*, then when we can't take the writer-lock *right away*,
* we push the message to the backlog (starting a worker if needed)
*
* otherwise, we go for a TryWaitAsync and rely on the await machinery
*
* "true" seems to give faster times *when under heavy contention*, based on profiling
* but it involves the backlog concept; "false" works well under low contention, and
* makes more use of async
*/
const bool ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK = true;

Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point

bool releaseLock = true;
bool releaseLock = true; // fine to default to true, as it doesn't matter until token is a "success"
LockToken token = default;
try
{
// try to acquire it synchronously
// note: timeout is specified in mutex-constructor
token = _singleWriterMutex.TryWait(options: WaitOptions.NoDelay);

if (!token.Success) // (in particular, me might hand the lifetime to CompleteWriteAndReleaseLockAsync)
if (!token.Success)
{
PushToBacklog(message);
return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success
// we can't get it *instantaneously*; is there
// perhaps a backlog and active backlog processor?
if (PushToBacklog(message, onlyIfExists: !ALWAYS_USE_BACKLOG_IF_CANNOT_GET_SYNC_LOCK))
return new ValueTask<WriteResult>(WriteResult.Success); // queued counts as success

// no backlog... try to wait with the timeout;
// if we *still* can't get it: that counts as
// an actual timeout
var pending = _singleWriterMutex.TryWaitAsync(options: WaitOptions.DisableAsyncContext);
if (!pending.IsCompletedSuccessfully) return WriteMessageTakingWriteLockAsync_Awaited(pending, physical, message);

token = pending.Result; // fine since we know we got a result
if (!token.Success) return new ValueTask<WriteResult>(TimedOutBeforeWrite(message));
}

var result = WriteMessageInsideLock(physical, message);
@@ -867,6 +909,33 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
}
}

private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(ValueTask<LockToken> pending, PhysicalConnection physical, Message message)
{
try
{
using (var token = await pending)
{
if (!token.Success) return TimedOutBeforeWrite(message);

var result = WriteMessageInsideLock(physical, message);

if (result == WriteResult.Success)
{
result = await physical.FlushAsync(false);
}

UnmarkActiveMessage(message);
physical.SetIdle();

return result;
}
}
catch (Exception ex)
{
return HandleWriteException(message, ex);
}
}

private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message)
{
using (lockToken)
@@ -15,6 +15,6 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="[2.0.545]" /> <!-- [1.2.6] for previous major -->
<PackageReference Include="StackExchange.Redis" Version="[2.0.558]" /> <!-- [1.2.6] for previous major -->
</ItemGroup>
</Project>

0 comments on commit 4ebacd6

Please sign in to comment.
You can’t perform that action at this time.