Skip to content

Commit

Permalink
Fix SQL duplicate messages issue:
Browse files Browse the repository at this point in the history
- Ensure the state is reset at the beginning of the receive loop
- Added more tracing to detect unexpected states in the ObservableDbOperation state machine
- #2988
  • Loading branch information
DamianEdwards committed Aug 12, 2014
1 parent 4d6cc83 commit c557e36
Showing 1 changed file with 49 additions and 12 deletions.
61 changes: 49 additions & 12 deletions src/Microsoft.AspNet.SignalR.SqlServer/ObservableDbOperation.cs
Expand Up @@ -81,15 +81,19 @@ public void ExecuteReaderWithUpdates(Action<IDataRecord, DbOperation> processRec

var useNotifications = _dbBehavior.StartSqlDependencyListener();

if (useNotifications)
{
_notificationState = NotificationState.ProcessingUpdates;
}

var delays = _dbBehavior.UpdateLoopRetryDelays;

for (var i = 0; i < delays.Count; i++)
{
if (i == 0 && useNotifications)
{
// Reset the state to ProcessingUpdates if this is the start of the loop.
// This should be safe to do here without Interlocked because the state is protected
// in the other two cases using Interlocked, i.e. there should only be one instance of
// this running at any point in time.
_notificationState = NotificationState.ProcessingUpdates;
}

Tuple<int, int> retry = delays[i];
var retryDelay = retry.Item1;
var retryCount = retry.Item2;
Expand Down Expand Up @@ -158,15 +162,36 @@ public void ExecuteReaderWithUpdates(Action<IDataRecord, DbOperation> processRec

Queried();

if (recordCount > 0 ||
Interlocked.CompareExchange(ref _notificationState, NotificationState.AwaitingNotification,
NotificationState.ProcessingUpdates) == NotificationState.NotificationReceived)
if (recordCount > 0)
{
Trace.TraceVerbose("{0}Records received while setting up SQL notification, restarting the recieve loop", TracePrefix);
Trace.TraceVerbose("{0}Records were returned by the command that sets up the SQL notification, restarting the receive loop", TracePrefix);

i = -1;
break; // break the inner for loop
};
}
else
{
var previousState = Interlocked.CompareExchange(ref _notificationState, NotificationState.AwaitingNotification,
NotificationState.ProcessingUpdates);

if (previousState == NotificationState.AwaitingNotification)
{
Trace.TraceError("{0}A SQL notification was already running. Overlapping receive loops detected, this should never happen. BUG!", TracePrefix);

return;
}

if (previousState == NotificationState.NotificationReceived)
{
// Failed to change _notificationState from ProcessingUpdates to AwaitingNotification, it was already NotificationReceived

Trace.TraceVerbose("{0}The SQL notification fired before the receive loop returned, restarting the receive loop", TracePrefix);

i = -1;
break; // break the inner for loop
}

}

Trace.TraceVerbose("{0}No records received while setting up SQL notification", TracePrefix);

Expand Down Expand Up @@ -245,15 +270,27 @@ protected virtual void SqlDependency_OnChange(SqlNotificationEventArgs e, Action
}
}

if (Interlocked.CompareExchange(ref _notificationState,
NotificationState.NotificationReceived, NotificationState.ProcessingUpdates) == NotificationState.ProcessingUpdates)
var previousState = Interlocked.CompareExchange(ref _notificationState,
NotificationState.NotificationReceived, NotificationState.ProcessingUpdates);

if (previousState == NotificationState.NotificationReceived)
{
Trace.TraceError("{0}Overlapping SQL change notifications received, this should never happen, BUG!", TracePrefix);

return;
}
if (previousState == NotificationState.ProcessingUpdates)
{
// We're still in the original receive loop

// New updates will be retreived by the original reader thread
Trace.TraceVerbose("{0}Original reader processing is still in progress and will pick up the changes", TracePrefix);

return;
}

// _notificationState wasn't ProcessingUpdates (likely AwaitingNotification)

// Check notification args for issues
if (e.Type == SqlNotificationType.Change)
{
Expand Down

0 comments on commit c557e36

Please sign in to comment.