Skip to content

Commit

Permalink
Made some chages to InProcessMessageStore.
Browse files Browse the repository at this point in the history
- Returns all messages if requested message id > max message id
- Ensure multiple cleanup loops don't happen concurrently.
- Removed GetAll(key) from IMessageStore.
- Added unit tests
  • Loading branch information
davidfowl committed Sep 16, 2011
1 parent 37b337f commit 29e5768
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 33 deletions.
4 changes: 0 additions & 4 deletions SignalR.ScaleOut/PeerToPeerSQLSignalBusMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public class PeerToPeerSQLSignalBusMessageStore : IMessageStore, ISignalBus {
.Unwrap();
}

public Task<IEnumerable<Message>> GetAll(string key) {
return _store.GetAll(key);
}

public Task<IEnumerable<Message>> GetAllSince(string key, long id) {
return _store.GetAllSince(key, id);
}
Expand Down
6 changes: 0 additions & 6 deletions SignalR.ScaleOut/SQLMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ public class SQLMessageStore : IMessageStore {
});
}

public Task<IEnumerable<Message>> GetAll(string key) {
return GetMessages(key, _getAllSQL.Replace("{TableName}", MessageTableName),
new[] { new SqlParameter("EventKey", key) }
);
}

public Task<IEnumerable<Message>> GetAllSince(string key, long id) {
return _queries.GetOrAdd(Tuple.Create(id, key),
GetMessages(key, _getAllSinceSQL.Replace("{TableName}", MessageTableName),
Expand Down
46 changes: 46 additions & 0 deletions SignalR.Tests/InProcessMessageStoreTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Linq;
using Xunit;

namespace SignalR.Tests {
public class InProcessMessageStoreTest {
[Fact]
public void GetAllSinceReturnsAllMessagesAfterIdOrderedById() {
var store = new InProcessMessageStore(garbageCollectMessages: false);

store.Save("a", "1").Wait();
store.Save("a", "2").Wait();
store.Save("a", "3").Wait();

var messages = store.GetAllSince("a", 1).Result.ToList();
Assert.Equal(2, messages.Count);
Assert.Equal("2", messages[0].Value);
Assert.Equal("3", messages[1].Value);
}

[Fact]
public void GetAllSinceReturnsAllMessagesIfIdGreaterThanMaxId() {
var store = new InProcessMessageStore(garbageCollectMessages: false);

for (int i = 0; i < 10; i++) {
store.Save("a", i).Wait();
}

var messages = store.GetAllSince("a", 100).Result.ToList();
Assert.Equal(10, messages.Count);
for (int i = 0; i < 10; i++) {
Assert.Equal(i, messages[i].Value);
}
}

[Fact]
public void GetLastIdReturnsMaxMessageId() {
var store = new InProcessMessageStore(garbageCollectMessages: false);

store.Save("a", "1").Wait();
store.Save("a", "2").Wait();
store.Save("a", "3").Wait();

Assert.Equal(3, store.GetLastId().Result);
}
}
}
1 change: 1 addition & 0 deletions SignalR.Tests/SignalR.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="InProcessMessageStoreTest.cs" />
<Compile Include="LongPollingTransportTest.cs" />
<Compile Include="DefaultActionResolverTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Expand Down
3 changes: 2 additions & 1 deletion SignalR/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class Connection : IConnection {
private readonly string _clientId;
private readonly HashSet<string> _signals;
private readonly HashSet<string> _groups;
private readonly object _lockObj = new object();

public Connection(IMessageStore store,
Signaler signaler,
Expand Down Expand Up @@ -187,7 +188,7 @@ public class Connection : IConnection {
.Catch();
}

private Task<IEnumerable<Message>> GetMessages(long id, IEnumerable<string> signals) {
private Task<IEnumerable<Message>> GetMessages(long id, IEnumerable<string> signals) {
var pendingMessagesTasks = (from signal in signals
select _store.GetAllSince(signal, id)).ToArray();

Expand Down
1 change: 0 additions & 1 deletion SignalR/IMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace SignalR {
public interface IMessageStore {
Task<long?> GetLastId();
Task Save(string key, object value);
Task<IEnumerable<Message>> GetAll(string key);
Task<IEnumerable<Message>> GetAllSince(string key, long id);
}
}
56 changes: 35 additions & 21 deletions SignalR/InProcessMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,39 @@ namespace SignalR {
public class InProcessMessageStore : IMessageStore {
private readonly ConcurrentDictionary<string, SafeSet<Message>> _items = new ConcurrentDictionary<string, SafeSet<Message>>(StringComparer.OrdinalIgnoreCase);
// Interval to wait before cleaning up expired items
private static readonly TimeSpan _cleanupInterval = TimeSpan.FromSeconds(10);
private readonly TimeSpan _cleanupInterval = TimeSpan.FromSeconds(10);

private static long _messageId = 0;
private static object _idLocker = new object();
private long _lastMessageId = 0;
private readonly object _idLocker = new object();
private bool _gcRunning;

private readonly Timer _timer;

public InProcessMessageStore() {
_timer = new Timer(RemoveExpiredEntries, null, _cleanupInterval, _cleanupInterval);
public InProcessMessageStore()
: this(garbageCollectMessages: true) {
}

internal InProcessMessageStore(bool garbageCollectMessages) {
if (garbageCollectMessages) {
_timer = new Timer(RemoveExpiredEntries, null, _cleanupInterval, _cleanupInterval);
}
}

public Task Save(string key, object value) {
return Save(new Message(key, Interlocked.Increment(ref _messageId), value));
var message = new Message(key, Interlocked.Increment(ref _lastMessageId), value);
return Save(message);
}

protected internal Task Save(Message message) {
var key = message.SignalKey;
SafeSet<Message> list;
if (!_items.TryGetValue(key, out list)) {
list = new SafeSet<Message>();
_items.TryAdd(key, list);
}

var list = _items.GetOrAdd(key, _ => new SafeSet<Message>());
list.Add(message);
if (message.Id > _messageId) {

if (message.Id > _lastMessageId) {
lock (_idLocker) {
if (message.Id > _messageId) {
_messageId = message.Id;
if (message.Id > _lastMessageId) {
_lastMessageId = message.Id;
}
}
}
Expand All @@ -45,24 +51,24 @@ public class InProcessMessageStore : IMessageStore {
}

public Task<IEnumerable<Message>> GetAllSince(string key, long id) {
if (id > _lastMessageId) {
id = 0;
}

var items = GetAllCore(key).Where(item => item.Id > id)
.OrderBy(item => item.Id);
return TaskAsyncHelper.FromResult<IEnumerable<Message>>(items);

return TaskAsyncHelper.FromResult<IEnumerable<Message>>(items);
}

public Task<long?> GetLastId() {
if (_messageId > 0) {
return TaskAsyncHelper.FromResult<long?>(_messageId);
if (_lastMessageId > 0) {
return TaskAsyncHelper.FromResult<long?>(_lastMessageId);
}

return TaskAsyncHelper.FromResult<long?>(null);
}

public Task<IEnumerable<Message>> GetAll(string key) {
return TaskAsyncHelper.FromResult(GetAllCore(key));
}

private IEnumerable<Message> GetAllCore(string key) {
SafeSet<Message> list;
if (_items.TryGetValue(key, out list)) {
Expand All @@ -73,6 +79,12 @@ public class InProcessMessageStore : IMessageStore {
}

private void RemoveExpiredEntries(object state) {
if (_gcRunning) {
return;
}

_gcRunning = true;

// Take a snapshot of the entries
var entries = _items.ToList();

Expand All @@ -84,6 +96,8 @@ public class InProcessMessageStore : IMessageStore {
}
}
}

_gcRunning = false;
}
}
}

0 comments on commit 29e5768

Please sign in to comment.