Permalink
Browse files

Make checking for existing subscriptions cheaper.

  • Loading branch information...
1 parent cb103d7 commit 4ee1e06443d065aee34fb2b0c81137240665e2c2 @davidfowl davidfowl committed Sep 1, 2012
Showing with 32 additions and 25 deletions.
  1. +32 −25 SignalR/MessageBus/MessageBus.cs
@@ -135,14 +135,7 @@ public IDisposable Subscribe(ISubscriber subscriber, string cursor, Func<Message
foreach (var topic in topics)
{
- lock (topic.Subscriptions)
- {
- if (!topic.Subscriptions.Contains(subscription))
- {
- // Add this subscription to the list of subs
- topic.Subscriptions.Add(subscription);
- }
- }
+ topic.AddSubscription(subscription);
}
if (!String.IsNullOrEmpty(cursor))
@@ -168,14 +161,8 @@ public IDisposable Subscribe(ISubscriber subscriber, string cursor, Func<Message
// Add or update the cursor (in case it already exists)
subscription.AddOrUpdateCursor(eventKey, id, topic);
- lock (topic.Subscriptions)
- {
- if (!topic.Subscriptions.Contains(subscription))
- {
- // Add it to the list of subs
- topic.Subscriptions.Add(subscription);
- }
- }
+ // Add it to the list of subs
+ topic.AddSubscription(subscription);
};
Action<string> eventRemoved = eventKey => RemoveEvent(subscription, eventKey);
@@ -213,11 +200,7 @@ private void RemoveEvent(Subscription subscription, string eventKey)
Topic topic;
if (_topics.TryGetValue(eventKey, out topic))
{
- lock (topic.Subscriptions)
- {
- topic.Subscriptions.Remove(subscription);
- }
-
+ topic.RemoveSubscription(subscription);
subscription.RemoveCursor(eventKey);
}
}
@@ -247,7 +230,7 @@ internal class Subscription : IDisposable
{
private readonly List<Cursor> _cursors;
private readonly Func<MessageResult, Task<bool>> _callback;
- private readonly int _messageBufferSize;
+ private readonly int _maxMessages;
private readonly object _lockObj = new object();
private int _disposed;
@@ -273,12 +256,12 @@ public IList<Cursor> Cursors
public string Identity { get; private set; }
- public Subscription(string identity, IEnumerable<Cursor> cursors, Func<MessageResult, Task<bool>> callback, int messageBufferSize)
+ public Subscription(string identity, IEnumerable<Cursor> cursors, Func<MessageResult, Task<bool>> callback, int maxMessages)
{
Identity = identity;
_cursors = new List<Cursor>(cursors);
_callback = callback;
- _messageBufferSize = messageBufferSize;
+ _maxMessages = maxMessages;
}
public Task<bool> Invoke(MessageResult result)
@@ -357,7 +340,7 @@ private void WorkImpl(ConcurrentDictionary<string, Topic> topics, TaskCompletion
{
Cursor cursor = Cursors[i];
- MessageStoreResult<Message> storeResult = cursor.Topic.Store.GetMessages(cursor.Id, _messageBufferSize);
+ MessageStoreResult<Message> storeResult = cursor.Topic.Store.GetMessages(cursor.Id, _maxMessages);
ulong next = storeResult.FirstMessageId + (ulong)storeResult.Messages.Count;
cursor.Id = next;
@@ -686,6 +669,8 @@ public override string ToString()
internal class Topic
{
+ private HashSet<string> _subs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
+
public IList<Subscription> Subscriptions { get; private set; }
public MessageStore<Message> Store { get; private set; }
@@ -694,6 +679,28 @@ public Topic()
Subscriptions = new List<Subscription>();
Store = new MessageStore<Message>(DefaultMessageStoreSize);
}
+
+ public void AddSubscription(Subscription subscription)
+ {
+ lock (Subscriptions)
+ {
+ if (_subs.Add(subscription.Identity))
+ {
+ Subscriptions.Add(subscription);
+ }
+ }
+ }
+
+ public void RemoveSubscription(Subscription subscription)
+ {
+ lock (Subscriptions)
+ {
+ if (_subs.Remove(subscription.Identity))
+ {
+ Subscriptions.Remove(subscription);
+ }
+ }
+ }
}
/// <summary>

0 comments on commit 4ee1e06

Please sign in to comment.