Skip to content

Commit

Permalink
Ensure that _activeMessage is always cleared, including if an excepti…
Browse files Browse the repository at this point in the history
…on is thrown while trying to send the message or flush the connection. Leaving _activeMessage set causes WriteMessageInsideLock to always return a "NoConnectionAvailable" error indefinitely (#1374)

Co-authored-by: Nick Craver <craver@stackoverflow.com>
  • Loading branch information
hamish-omny and Nick Craver committed Mar 18, 2020
1 parent 4f58848 commit 58e8d63
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,15 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
#pragma warning restore CS0618
}

UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { token.Dispose(); }
finally
{
UnmarkActiveMessage(message);
token.Dispose();
}

}

Expand Down Expand Up @@ -887,7 +890,6 @@ private void ProcessBacklog()
}

_backlogStatus = BacklogStatus.MarkingInactive;
UnmarkActiveMessage(message);
if (result != WriteResult.Success)
{
_backlogStatus = BacklogStatus.RecordingWriteFailure;
Expand All @@ -901,6 +903,10 @@ private void ProcessBacklog()
_backlogStatus = BacklogStatus.RecordingFault;
HandleWriteException(message, ex);
}
finally
{
UnmarkActiveMessage(message);
}
}
_backlogStatus = BacklogStatus.SettingIdle;
physical.SetIdle();
Expand Down Expand Up @@ -985,21 +991,25 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect

result = flush.Result; // we know it was completed, this is fine
}

UnmarkActiveMessage(message);

physical.SetIdle();

return new ValueTask<WriteResult>(result);
}
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally
{
if (releaseLock & token.Success)
if (token.Success)
{
UnmarkActiveMessage(message);

if (releaseLock)
{
#if DEBUG
RecordLockDuration(lockTaken);
RecordLockDuration(lockTaken);
#endif
token.Dispose();
token.Dispose();
}
}
}
}
Expand Down Expand Up @@ -1029,8 +1039,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{
result = await physical.FlushAsync(false).ForAwait();
}

UnmarkActiveMessage(message);

physical.SetIdle();

#if DEBUG
Expand All @@ -1043,6 +1052,10 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{
return HandleWriteException(message, ex);
}
finally
{
UnmarkActiveMessage(message);
}
}

private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message, int lockTaken)
Expand All @@ -1052,7 +1065,6 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken
try
{
var result = await flush.ForAwait();
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
Expand Down

0 comments on commit 58e8d63

Please sign in to comment.