Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fixed race condition in InProcessMessageBus.
- Ignore messages in callback that are less than what we're waiting on.
  • Loading branch information
davidfowl committed Apr 2, 2012
1 parent dcf6e1d commit 8240e49
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions SignalR/MessageBus/InProcessMessageBus.cs
Expand Up @@ -96,7 +96,7 @@ public Task<MessageResult> GetMessages(IEnumerable<string> eventKeys, string id,
// Wait for new messages
_trace.Source.TraceInformation("MessageBus: New connection waiting for messages");
Debug.WriteLine("MessageBus: New connection waiting for messages");
return WaitForMessages(eventKeys, timeoutToken);
return WaitForMessages(eventKeys, timeoutToken, default(T));
}

try
Expand All @@ -110,7 +110,7 @@ public Task<MessageResult> GetMessages(IEnumerable<string> eventKeys, string id,
// Connection already has the latest message, so start wating
_trace.Source.TraceInformation("MessageBus: Connection waiting for new messages from id {0}", id);
Debug.WriteLine("MessageBus: Connection waiting for new messages from id {0}", (object)id);
return WaitForMessages(eventKeys, timeoutToken);
return WaitForMessages(eventKeys, timeoutToken, uuid);
}

var messages = eventKeys.SelectMany(key => GetMessagesSince(key, uuid));
Expand All @@ -126,7 +126,7 @@ public Task<MessageResult> GetMessages(IEnumerable<string> eventKeys, string id,
// Wait for new messages
_trace.Source.TraceInformation("MessageBus: Connection waiting for new messages from id {0}", id);
Debug.WriteLine("MessageBus: Connection waiting for new messages from id {0}", (object)id);
return WaitForMessages(eventKeys, timeoutToken);
return WaitForMessages(eventKeys, timeoutToken, uuid);
}
finally
{
Expand Down Expand Up @@ -240,7 +240,7 @@ private IList<InMemoryMessage<T>> GetMessagesSince(string eventKey, T id)
return snapshot.GetRange(startIndex, snapshot.Count - startIndex);
}

private Task<MessageResult> WaitForMessages(IEnumerable<string> eventKeys, CancellationToken timeoutToken)
private Task<MessageResult> WaitForMessages(IEnumerable<string> eventKeys, CancellationToken timeoutToken, T lastId)
{
var tcs = new TaskCompletionSource<MessageResult>();
int callbackCalled = 0;
Expand All @@ -265,8 +265,18 @@ private Task<MessageResult> WaitForMessages(IEnumerable<string> eventKeys, Cance
}
});

callback = messages =>
callback = receivedMessages =>
{
// REVIEW: Consider the case where lastId is a referene type and is null.
// What wouls this return? Does it matter?
var messages = receivedMessages.Where(m => m.Id.CompareTo(lastId) > 0)
.ToList();
if (messages.Count == 0)
{
return;
}
if (Interlocked.Exchange(ref callbackCalled, 1) == 0)
{
tcs.TrySetResult(GetMessageResult(messages));
Expand Down

0 comments on commit 8240e49

Please sign in to comment.