Skip to content

Commit

Permalink
Fix async deadlock issue when sending attention fails due to network …
Browse files Browse the repository at this point in the history
…failure (#1766)
  • Loading branch information
cheenamalhotra committed Oct 4, 2022
1 parent f2517d4 commit 81055cf
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public TimeoutState(int value)

private const int AttentionTimeoutSeconds = 5;

private static readonly ContextCallback s_readAdyncCallbackComplete = ReadAsyncCallbackComplete;
private static readonly ContextCallback s_readAsyncCallbackComplete = ReadAsyncCallbackComplete;

// Ticks to consider a connection "good" after a successful I/O (10,000 ticks = 1 ms)
// The resolution of the timer is typically in the range 10 to 16 milliseconds according to msdn.
Expand Down Expand Up @@ -2337,9 +2337,9 @@ private void OnTimeoutAsync(object state)
}
}

private bool OnTimeoutSync()
private bool OnTimeoutSync(bool asyncClose = false)
{
return OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredSync);
return OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredSync, asyncClose);
}

/// <summary>
Expand All @@ -2348,8 +2348,9 @@ private bool OnTimeoutSync()
/// </summary>
/// <param name="expectedState">the state that is the expected current state, state will change only if this is correct</param>
/// <param name="targetState">the state that will be changed to if the expected state is correct</param>
/// <param name="asyncClose">any close action to be taken by an async task to avoid deadlock.</param>
/// <returns>boolean value indicating whether the call changed the timeout state</returns>
private bool OnTimeoutCore(int expectedState, int targetState)
private bool OnTimeoutCore(int expectedState, int targetState, bool asyncClose = false)
{
Debug.Assert(targetState == TimeoutState.ExpiredAsync || targetState == TimeoutState.ExpiredSync, "OnTimeoutCore must have an expiry state as the targetState");

Expand Down Expand Up @@ -2382,7 +2383,7 @@ private bool OnTimeoutCore(int expectedState, int targetState)
{
try
{
SendAttention(mustTakeWriteLock: true);
SendAttention(mustTakeWriteLock: true, asyncClose);
}
catch (Exception e)
{
Expand Down Expand Up @@ -2927,7 +2928,7 @@ public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)
// synchrnously and then call OnTimeoutSync to force an atomic change of state.
if (TimeoutHasExpired)
{
OnTimeoutSync();
OnTimeoutSync(true);
}

// try to change to the stopped state but only do so if currently in the running state
Expand Down Expand Up @@ -2958,7 +2959,7 @@ public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)
{
if (_executionContext != null)
{
ExecutionContext.Run(_executionContext, s_readAdyncCallbackComplete, source);
ExecutionContext.Run(_executionContext, s_readAsyncCallbackComplete, source);
}
else
{
Expand Down Expand Up @@ -3441,7 +3442,7 @@ private void CancelWritePacket()

#pragma warning disable 0420 // a reference to a volatile field will not be treated as volatile

private Task SNIWritePacket(PacketHandle packet, out uint sniError, bool canAccumulate, bool callerHasConnectionLock)
private Task SNIWritePacket(PacketHandle packet, out uint sniError, bool canAccumulate, bool callerHasConnectionLock, bool asyncClose = false)
{
// Check for a stored exception
var delayedException = Interlocked.Exchange(ref _delayedWriteAsyncCallbackException, null);
Expand Down Expand Up @@ -3534,7 +3535,7 @@ private Task SNIWritePacket(PacketHandle packet, out uint sniError, bool canAccu
{
SqlClientEventSource.Log.TryTraceEvent("TdsParserStateObject.SNIWritePacket | Info | State Object Id {0}, Write async returned error code {1}", _objectID, (int)error);
AddError(_parser.ProcessSNIError(this));
ThrowExceptionAndWarning();
ThrowExceptionAndWarning(false, asyncClose);
}
AssertValidState();
completion.SetResult(null);
Expand Down Expand Up @@ -3569,7 +3570,7 @@ private Task SNIWritePacket(PacketHandle packet, out uint sniError, bool canAccu
{
SqlClientEventSource.Log.TryTraceEvent("TdsParserStateObject.SNIWritePacket | Info | State Object Id {0}, Write async returned error code {1}", _objectID, (int)sniError);
AddError(_parser.ProcessSNIError(this));
ThrowExceptionAndWarning(callerHasConnectionLock);
ThrowExceptionAndWarning(callerHasConnectionLock, asyncClose);
}
AssertValidState();
}
Expand All @@ -3581,7 +3582,7 @@ private Task SNIWritePacket(PacketHandle packet, out uint sniError, bool canAccu
internal abstract uint WritePacket(PacketHandle packet, bool sync);

// Sends an attention signal - executing thread will consume attn.
internal void SendAttention(bool mustTakeWriteLock = false)
internal void SendAttention(bool mustTakeWriteLock = false, bool asyncClose = false)
{
if (!_attentionSent)
{
Expand Down Expand Up @@ -3623,7 +3624,7 @@ internal void SendAttention(bool mustTakeWriteLock = false)

uint sniError;
_parser._asyncWrite = false; // stop async write
SNIWritePacket(attnPacket, out sniError, canAccumulate: false, callerHasConnectionLock: false);
SNIWritePacket(attnPacket, out sniError, canAccumulate: false, callerHasConnectionLock: false, asyncClose);
SqlClientEventSource.Log.TryTraceEvent("TdsParserStateObject.SendAttention | Info | State Object Id {0}, Sent Attention.", _objectID);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2401,9 +2401,9 @@ private void OnTimeoutAsync(object state)
}
}

private bool OnTimeoutSync()
private bool OnTimeoutSync(bool asyncClose = false)
{
return OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredSync);
return OnTimeoutCore(TimeoutState.Running, TimeoutState.ExpiredSync, asyncClose);
}

/// <summary>
Expand All @@ -2412,8 +2412,9 @@ private bool OnTimeoutSync()
/// </summary>
/// <param name="expectedState">the state that is the expected current state, state will change only if this is correct</param>
/// <param name="targetState">the state that will be changed to if the expected state is correct</param>
/// <param name="asyncClose">any close action to be taken by an async task to avoid deadlock.</param>
/// <returns>boolean value indicating whether the call changed the timeout state</returns>
private bool OnTimeoutCore(int expectedState, int targetState)
private bool OnTimeoutCore(int expectedState, int targetState, bool asyncClose = false)
{
Debug.Assert(targetState == TimeoutState.ExpiredAsync || targetState == TimeoutState.ExpiredSync, "OnTimeoutCore must have an expiry state as the targetState");

Expand Down Expand Up @@ -2447,7 +2448,7 @@ private bool OnTimeoutCore(int expectedState, int targetState)
{
try
{
SendAttention(mustTakeWriteLock: true);
SendAttention(mustTakeWriteLock: true, asyncClose);
}
catch (Exception e)
{
Expand Down Expand Up @@ -2988,7 +2989,7 @@ public void ReadAsyncCallback(IntPtr key, IntPtr packet, UInt32 error)
// synchrnously and then call OnTimeoutSync to force an atomic change of state.
if (TimeoutHasExpired)
{
OnTimeoutSync();
OnTimeoutSync(asyncClose: true);
}

// try to change to the stopped state but only do so if currently in the running state
Expand Down Expand Up @@ -3475,7 +3476,7 @@ private void CancelWritePacket()

#pragma warning disable 420 // a reference to a volatile field will not be treated as volatile

private Task SNIWritePacket(SNIHandle handle, SNIPacket packet, out UInt32 sniError, bool canAccumulate, bool callerHasConnectionLock)
private Task SNIWritePacket(SNIHandle handle, SNIPacket packet, out UInt32 sniError, bool canAccumulate, bool callerHasConnectionLock, bool asyncClose = false)
{
// Check for a stored exception
var delayedException = Interlocked.Exchange(ref _delayedWriteAsyncCallbackException, null);
Expand Down Expand Up @@ -3566,7 +3567,7 @@ private Task SNIWritePacket(SNIHandle handle, SNIPacket packet, out UInt32 sniEr
SqlClientEventSource.Log.TryTraceEvent("<sc.TdsParser.WritePacket|Info> write async returned error code {0}", (int)error);
AddError(_parser.ProcessSNIError(this));
ThrowExceptionAndWarning();
ThrowExceptionAndWarning(false, asyncClose);
}
AssertValidState();
completion.SetResult(null);
Expand Down Expand Up @@ -3603,7 +3604,7 @@ private Task SNIWritePacket(SNIHandle handle, SNIPacket packet, out UInt32 sniEr
{
SqlClientEventSource.Log.TryTraceEvent("<sc.TdsParser.WritePacket|Info> write async returned error code {0}", (int)sniError);
AddError(_parser.ProcessSNIError(this));
ThrowExceptionAndWarning(callerHasConnectionLock);
ThrowExceptionAndWarning(callerHasConnectionLock, false);
}
AssertValidState();
}
Expand All @@ -3613,7 +3614,7 @@ private Task SNIWritePacket(SNIHandle handle, SNIPacket packet, out UInt32 sniEr
#pragma warning restore 420

// Sends an attention signal - executing thread will consume attn.
internal void SendAttention(bool mustTakeWriteLock = false)
internal void SendAttention(bool mustTakeWriteLock = false, bool asyncClose = false)
{
if (!_attentionSent)
{
Expand Down Expand Up @@ -3660,7 +3661,7 @@ internal void SendAttention(bool mustTakeWriteLock = false)

UInt32 sniError;
_parser._asyncWrite = false; // stop async write
SNIWritePacket(Handle, attnPacket, out sniError, canAccumulate: false, callerHasConnectionLock: false);
SNIWritePacket(Handle, attnPacket, out sniError, canAccumulate: false, callerHasConnectionLock: false, asyncClose);
SqlClientEventSource.Log.TryTraceEvent("<sc.TdsParser.SendAttention|{0}> Send Attention ASync.", "Info");
}
finally
Expand Down

0 comments on commit 81055cf

Please sign in to comment.