Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge branch 'release' into dev
  • Loading branch information
DamianEdwards committed Mar 17, 2014
2 parents fcfb472 + 3e66b43 commit 9e0f838
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 35 deletions.
47 changes: 20 additions & 27 deletions src/Microsoft.AspNet.SignalR.ServiceBus/ServiceBusConnection.cs
Expand Up @@ -36,53 +36,43 @@ public ServiceBusConnection(ServiceBusScaleoutConfiguration configuration, Trace
{
_namespaceManager = NamespaceManager.CreateFromConnectionString(_connectionString);
_factory = MessagingFactory.CreateFromConnectionString(_connectionString);

if (configuration.RetryPolicy != null)
{
_factory.RetryPolicy = configuration.RetryPolicy;
}
else
{
_factory.RetryPolicy = RetryExponential.Default;
}
}
catch (ConfigurationErrorsException)
{
_trace.TraceError("The configured Service Bus connection string contains an invalid property. Check the exception details for more information.");
}

if (configuration.RetryPolicy != null)
{
_factory.RetryPolicy = configuration.RetryPolicy;
}
else
{
_factory.RetryPolicy = RetryExponential.Default;
}

_idleSubscriptionTimeout = configuration.IdleSubscriptionTimeout;
_configuration = configuration;
}

[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope", Justification = "The disposable is returned to the caller")]
public ServiceBusConnectionContext Subscribe(IList<string> topicNames,
Action<int, IEnumerable<BrokeredMessage>> handler,
Action<int, Exception> errorHandler,
Action<int> openStream)
public void Subscribe(ServiceBusConnectionContext connectionContext)
{
if (topicNames == null)
{
throw new ArgumentNullException("topicNames");
}

if (handler == null)
if (connectionContext == null)
{
throw new ArgumentNullException("handler");
throw new ArgumentNullException("connectionContext");
}

_trace.TraceInformation("Subscribing to {0} topic(s) in the service bus...", topicNames.Count);
_trace.TraceInformation("Subscribing to {0} topic(s) in the service bus...", connectionContext.TopicNames.Count);

var connectionContext = new ServiceBusConnectionContext(_configuration, _namespaceManager, topicNames, _trace, handler, errorHandler, openStream);
connectionContext.NamespaceManager = _namespaceManager;

for (var topicIndex = 0; topicIndex < topicNames.Count; ++topicIndex)
for (var topicIndex = 0; topicIndex < connectionContext.TopicNames.Count; ++topicIndex)
{
Retry(() => CreateTopic(connectionContext, topicIndex));
}

_trace.TraceInformation("Subscription to {0} topics in the service bus Topic service completed successfully.", topicNames.Count);

return connectionContext;
_trace.TraceInformation("Subscription to {0} topics in the service bus Topic service completed successfully.", connectionContext.TopicNames.Count);
}

private void CreateTopic(ServiceBusConnectionContext connectionContext, int topicIndex)
Expand Down Expand Up @@ -223,7 +213,10 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
// Close the factory
_factory.Close();
if (_factory != null)
{
_factory.Close();
}
}
}

Expand Down
Expand Up @@ -14,7 +14,6 @@ namespace Microsoft.AspNet.SignalR.ServiceBus
{
public class ServiceBusConnectionContext : IDisposable
{
private readonly NamespaceManager _namespaceManager;
private readonly ServiceBusScaleoutConfiguration _configuration;

private readonly SubscriptionContext[] _subscriptions;
Expand All @@ -32,8 +31,9 @@ public class ServiceBusConnectionContext : IDisposable

public bool IsDisposed { get; private set; }

public NamespaceManager NamespaceManager { get; set; }

public ServiceBusConnectionContext(ServiceBusScaleoutConfiguration configuration,
NamespaceManager namespaceManager,
IList<string> topicNames,
TraceSource traceSource,
Action<int, IEnumerable<BrokeredMessage>> handler,
Expand All @@ -45,7 +45,6 @@ public class ServiceBusConnectionContext : IDisposable
throw new ArgumentNullException("topicNames");
}

_namespaceManager = namespaceManager;
_configuration = configuration;

_subscriptions = new SubscriptionContext[topicNames.Count];
Expand Down Expand Up @@ -116,10 +115,21 @@ protected virtual void Dispose(bool disposing)
{
for (int i = 0; i < TopicNames.Count; i++)
{
_topicClients[i].Close();
SubscriptionContext subscription = _subscriptions[i];
subscription.Receiver.Close();
_namespaceManager.DeleteSubscription(subscription.TopicPath, subscription.Name);
// BUG #2937: We need to null check here because the given topic/subscription
// may never have actually been created due to the lock being released
// between each retry attempt
var topicClient = _topicClients[i];
if (topicClient != null)
{
topicClient.Close();
}

var subscription = _subscriptions[i];
if (subscription != null)
{
subscription.Receiver.Close();
NamespaceManager.DeleteSubscription(subscription.TopicPath, subscription.Name);
}
}

IsDisposed = true;
Expand Down
Expand Up @@ -44,6 +44,8 @@ public ServiceBusMessageBus(IDependencyResolver resolver, ServiceBusScaleoutConf
.Select(topicIndex => SignalRTopicPrefix + "_" + configuration.TopicPrefix + "_" + topicIndex)
.ToArray();

_connectionContext = new ServiceBusConnectionContext(configuration, _topics, _trace, OnMessage, OnError, Open);

ThreadPool.QueueUserWorkItem(Subscribe);
}

Expand Down Expand Up @@ -87,7 +89,7 @@ private void OnMessage(int topicIndex, IEnumerable<BrokeredMessage> messages)

private void Subscribe(object state)
{
_connectionContext = _connection.Subscribe(_topics, OnMessage, OnError, Open);
_connection.Subscribe(_connectionContext);
}

private void TraceMessages(IList<Message> messages, string messageType)
Expand Down

0 comments on commit 9e0f838

Please sign in to comment.