Skip to content

Commit

Permalink
Fixed regression where server restarts don't cause the client to catc…
Browse files Browse the repository at this point in the history
…h back up for the InProcessMessageBus.

- Added unit test
Fixes #24
  • Loading branch information
davidfowl committed Apr 13, 2012
1 parent e8040b5 commit 1d6ce39
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
21 changes: 20 additions & 1 deletion SignalR.Tests/InProcessMessageBusFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void ReturnsAllMessagesWhenLastMessageIdIsLessThanAllMessages()
var result = bus.GetMessages(new[] { "foo" }, "1", CancellationToken.None).Result;
Assert.Equal(2, result.Messages.Count);
}

[Fact]
public void ReturnsMessagesGreaterThanLastMessageIdWhenLastMessageIdNotInStore()
{
Expand All @@ -45,6 +45,25 @@ public void ReturnsMessagesGreaterThanLastMessageIdWhenLastMessageIdNotInStore()
var result = bus.GetMessages(new[] { "foo" }, "3", CancellationToken.None).Result;
Assert.Equal(2, result.Messages.Count);
}

[Fact]
public void GetAllSinceReturnsAllMessagesIfIdGreaterThanMaxId()
{
var trace = new TraceManager();
var bus = new InProcessMessageBus(trace, false);

for (int i = 0; i < 10; i++)
{
bus.Send("testclient", "a", i).Wait();
}

var result = bus.GetMessages(new[] { "a" }, "100", CancellationToken.None).Result;
Assert.Equal(10, result.Messages.Count);
for (int i = 0; i < 10; i++)
{
Assert.Equal(i, result.Messages[i].Value);
}
}
}
}
}
14 changes: 11 additions & 3 deletions SignalR/MessageBus/InProcessMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,15 @@ public Task<MessageResult> GetMessages(IEnumerable<string> eventKeys, string id,
_cacheLock.EnterReadLock();
T uuid = _idGenerator.ConvertFromString(id);

if (uuid.CompareTo(_lastMessageId) >= 0)
if (uuid.CompareTo(_lastMessageId) > 0)
{
// BUG 24: Connection already has the latest message, so reset the id
// This can happen if the server is reset (appdomain or entire server incase of self host)
_trace.Source.TraceInformation("MessageBus: Connection asking for message id {0} when the largest is {1}. Resetting id", id, _lastMessageId);
Debug.WriteLine("MessageBus: Connection asking for message id {0} when the largest is {1}. Resetting id", id, _lastMessageId);
uuid = default(T);
}
else if (uuid.CompareTo(_lastMessageId) == 0)
{
// Connection already has the latest message, so start wating
_trace.Source.TraceInformation("MessageBus: Connection waiting for new messages from id {0}", id);
Expand Down Expand Up @@ -158,7 +166,7 @@ public Task Send(string connectionId, string eventKey, object value)
Broadcast(eventKey, message);
}
finally
{
{
_cacheLock.ExitWriteLock();
}

Expand Down Expand Up @@ -270,7 +278,7 @@ private Task<MessageResult> WaitForMessages(IEnumerable<string> eventKeys, Cance
{
return;
}
if (Interlocked.Exchange(ref callbackCalled, 1) == 0)
{
tcs.TrySetResult(GetMessageResult(messages));
Expand Down

0 comments on commit 1d6ce39

Please sign in to comment.