Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Models will now be tracked and cleaned up on Dispose.

  • Loading branch information...
commit ffafdf0251da9f9433bb2e342af87bb0069b353a 1 parent 35ec542
@ahazelwood ahazelwood authored
Showing with 15 additions and 2 deletions.
  1. +15 −2 EasyNetQ/RabbitBus.cs
View
17 EasyNetQ/RabbitBus.cs
@@ -19,7 +19,8 @@ public class RabbitBus : IBus, IRawByteBus
private readonly IDictionary<int, string> responseQueueNameCache = new ConcurrentDictionary<int, string>();
private readonly ISet<string> publishExchanges = new HashSet<string>();
- private readonly ISet<string> requestExchanges = new HashSet<string>();
+ private readonly ISet<string> requestExchanges = new HashSet<string>();
+ private List<IModel> modelList = new List<IModel>();
private const string rpcExchange = "easy_net_q_rpc";
private const bool noAck = false;
@@ -181,6 +182,7 @@ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
Action subscribeAction = () =>
{
var channel = connection.CreateModel();
+ modelList.Add( channel );
DeclarePublishExchange(channel, typeName);
channel.BasicQos(0, prefetchCount, false);
@@ -258,6 +260,7 @@ public void SubscribeAsync<T>(string subscriptionId, Func<T, Task> onMessage)
private void SubscribeToResponse<TResponse>(Action<TResponse> onResponse, string returnQueueName)
{
var responseChannel = connection.CreateModel();
+ modelList.Add( responseChannel );
// respond queue is transient, only exists for the lifetime of the service.
var respondQueue = responseChannel.QueueDeclare(
@@ -299,6 +302,7 @@ private void RequestPublish<TRequest>(TRequest request, string returnQueueName)
{
var requestTypeName = serializeType(typeof(TRequest));
var requestChannel = connection.CreateModel();
+ modelList.Add( requestChannel );
// declare the exchange, binding and queue here. No need to set the mandatory flag, the recieving queue
// will already have been declared, so in the case of no responder being present, message will collect
@@ -343,6 +347,7 @@ private void RequestPublish<TRequest>(TRequest request, string returnQueueName)
Action subscribeAction = () =>
{
var requestChannel = connection.CreateModel();
+ modelList.Add( requestChannel );
DeclareRequestResponseStructure(requestChannel, requestTypeName);
var consumer = consumerFactory.CreateConsumer(requestChannel,
@@ -477,7 +482,15 @@ private void DeclareRequestResponseStructure(IModel channel, string requestTypeN
public void Dispose()
{
if (disposed) return;
-
+
+ // Abort all Models
+ if ( modelList.Count > 0 ) {
+ foreach ( var model in modelList ) {
+ if ( null != model )
+ model.Abort();
+ }
+ }
+
threadLocalPublishChannel.Dispose();
consumerFactory.Dispose();
connection.Dispose();
Please sign in to comment.
Something went wrong with that request. Please try again.