Permalink
Browse files

Added comments to the MessageBus for the recent memory leak fix

  • Loading branch information...
1 parent aa80c91 commit aa285dd994dd67e3e6ddb2162721e9acbe5ddaed @NTaylorMullen NTaylorMullen committed May 24, 2013
Showing with 21 additions and 1 deletion.
  1. +21 −1 src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs
@@ -192,8 +192,10 @@ protected ulong Save(Message message)
throw new ArgumentNullException("message");
}
- // Don't mark topics as active when publishing
+ // GetTopic will return a topic for the given key. If topic exists and is Dying,
+ // it will revive it and mark it as NoSubscriptions
Topic topic = GetTopic(message.Key);
+ // Mark the topic as used so it doesn't immediately expire (if it was in that state before).
topic.MarkUsed();
return topic.Store.Add(message);
@@ -230,6 +232,7 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func
foreach (var key in subscriber.EventKeys)
{
+ // Create or retrieve topic and set it as HasSubscriptions
Topic topic = SubscribeTopic(key);
// Set the subscription for this topic
@@ -390,6 +393,7 @@ internal void GarbageCollectTopics()
for (int i = 0; i < overflow && i < candidates.Count; i++)
{
var pair = candidates[i];
+ // We only want to kill the topic if it's in the NoSubscriptions or Dying state.
if (InterlockedHelper.CompareExchangeOr(ref pair.Value.State, TopicState.Dead, TopicState.NoSubscriptions, TopicState.Dying))
{
// Kill it
@@ -403,8 +407,16 @@ internal void GarbageCollectTopics()
private void DestroyTopic(string key, Topic topic)
{
+ // The goal of this function is to destroy topics after 2 garbage collect cycles
+ // This first if statement will transition a topic into the dying state on the first GC cycle
+ // but it will prevent the code path from hitting the second if statement
if (Interlocked.CompareExchange(ref topic.State, TopicState.Dying, TopicState.NoSubscriptions) == TopicState.Dying)
{
+ // If we've hit this if statement we're on the second GC cycle with this soon to be
+ // destroyed topic. At this point we move the Topic State into the Dead state as
+ // long as it has not been revived from the dying state. We check if the state is
+ // still dying again to ensure that the topic has not been transitioned into a new
+ // state since we've decided to destroy it.
if (Interlocked.CompareExchange(ref topic.State, TopicState.Dead, TopicState.Dying) == TopicState.Dying)
{
DestroyTopicCore(key, topic);
@@ -446,13 +458,18 @@ internal Topic GetTopic(string key)
BeforeTopicMarked(key, topic);
}
+ // If the topic was dying revive it to the NoSubscriptions state. This is used to ensure
+ // that in the scaleout case that even if we're publishing to a topic with no subscriptions
+ // that we keep it around in case a user hops nodes.
oldState = Interlocked.CompareExchange(ref topic.State, TopicState.NoSubscriptions, TopicState.Dying);
if (AfterTopicMarked != null)
{
AfterTopicMarked(key, topic, topic.State);
}
+ // If the topic is currently dead then we're racing with the DestroyTopicCore function, therefore
+ // loop around until we're able to create a new topic
} while (oldState == TopicState.Dead);
if (AfterTopicMarkedSuccessfully != null)
@@ -481,13 +498,16 @@ internal Topic SubscribeTopic(string key)
BeforeTopicMarked(key, topic);
}
+ // Transition into the HasSubscriptions state as long as the topic is not dead
InterlockedHelper.CompareExchangeOr(ref topic.State, TopicState.HasSubscriptions, TopicState.NoSubscriptions, TopicState.Dying);
if (AfterTopicMarked != null)
{
AfterTopicMarked(key, topic, topic.State);
}
+ // If we were unable to transition into the HasSubscription state that means we're in the Dead state.
+ // Loop around until we're able to create the topic new
} while (topic.State != TopicState.HasSubscriptions);
if (AfterTopicMarkedSuccessfully != null)

1 comment on commit aa285dd

@davidfowl
Member

Awrsome

Please sign in to comment.