Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added dependency resolver to the AutoConfigurationModel to resolve su…

…bscription handlers.
  • Loading branch information...
commit 0231d23e22998fd04484173fa18075130d1e8d1d 1 parent 6547062
@derekgreer authored
View
2  VERSION.txt
@@ -1 +1 @@
-VERSION = "1.3.2"
+VERSION = "1.3.3"
View
4 packages.rb
@@ -2,8 +2,8 @@
[ "Machine.Specifications", "0.5.6.0" ],
[ "ExpectedObjects", "1.0.0.2" ],
[ "Moq", "4.0.10827" ],
-[ "RabbitMQ.Client", "2.8.6" ],
-[ "Newtonsoft.Json", "4.5.9" ]
+[ "RabbitMQ.Client", "2.8.7" ],
+[ "Newtonsoft.Json", "4.5.10" ]
]
configatron.packages = packages
View
3  rakefile.rb
@@ -33,7 +33,8 @@
assemblyinfo :versioning do |asm|
asm.output_file = "src/CommonAssemblyInfo.cs"
- asm.version = "#{VERSION}"
+ asm.version = "#{VERSION}".gsub(/-[a-zA-Z].*$/) { }
+
end
task :clean do
View
26 src/RabbitBus.Specs/Infrastructure/DependencyAutoMessageHandler.cs
@@ -0,0 +1,26 @@
+using RabbitBus.Specs.TestTypes;
+
+namespace RabbitBus.Specs.Infrastructure
+{
+ public class DependencyAutoMessageHandler : IMessageHandler<DependencyAutoMessage>
+ {
+ public DependencyAutoMessageHandler(INeededDependency neededDependency)
+ {
+ }
+
+ public static AutoMessage Message { get; set; }
+
+ public void Handle(IMessageContext<DependencyAutoMessage> messageContext)
+ {
+ Message = messageContext.Message;
+ }
+ }
+
+ public interface INeededDependency
+ {
+ }
+
+ class NeededDependency : INeededDependency
+ {
+ }
+}
View
18 src/RabbitBus.Specs/Infrastructure/TestDependencyResolver.cs
@@ -0,0 +1,18 @@
+using System;
+using RabbitBus.Configuration;
+
+namespace RabbitBus.Specs.Infrastructure
+{
+ class TestDependencyResolver : IDependencyResolver
+ {
+ public object Resolve(Type handlerType)
+ {
+ if (handlerType == typeof (DependencyAutoMessageHandler))
+ {
+ return new DependencyAutoMessageHandler(null);
+ }
+
+ return Activator.CreateInstance(handlerType);
+ }
+ }
+}
View
154 src/RabbitBus.Specs/Integration/ConventionConfigurationSpecs.cs
@@ -25,6 +25,7 @@ public class when_subscribing_by_convention_with_custom_consume_configuration_co
.WithAssembly(Assembly.GetExecutingAssembly())
.WithConsumeConfigurationConvention(new NamedAutoMessageConsumeConfigurationConvention(SpecId, SpecId))
.WithSubscriptionConvention(new MessageHandlerSubscrptionConvention())
+ .WithDependencyResolver(new TestDependencyResolver())
.Build())
.Build();
_bus.Connect();
@@ -36,9 +37,12 @@ public class when_subscribing_by_convention_with_custom_consume_configuration_co
_exchange.Close();
};
- Because of = () => new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
+ Because of =
+ () =>
+ new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
- It should_receive_the_message_when_published = () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
+ It should_receive_the_message_when_published =
+ () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
}
[Integration]
@@ -56,6 +60,7 @@ public class when_subscribing_by_convention_without_explicit_consume_registratio
.Configure(new AutoConfigurationModelBuilder()
.WithAssembly(Assembly.GetExecutingAssembly())
.WithSubscriptionConvention(new MessageHandlerSubscrptionConvention())
+ .WithDependencyResolver(new TestDependencyResolver())
.Build())
.Build();
_bus.Connect();
@@ -67,7 +72,9 @@ public class when_subscribing_by_convention_without_explicit_consume_registratio
_exchange.Close();
};
- Because of = () => new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
+ Because of =
+ () =>
+ new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
It should_receive_the_message_when_published =
() => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
@@ -81,29 +88,67 @@ public class when_subscribing_by_convention_using_the_calling_assembly
static Bus _bus;
Establish context = () =>
- {
- _exchange = new RabbitExchange("localhost", "AutoMessage", ExchangeType.Direct, false, true);
-
- _bus = new BusBuilder()
- .Configure(new AutoConfigurationModelBuilder()
- .WithCallingAssembly()
- .WithSubscriptionConvention(new MessageHandlerSubscrptionConvention())
- .Build())
- .Build();
- _bus.Connect();
- };
+ {
+ _exchange = new RabbitExchange("localhost", "AutoMessage", ExchangeType.Direct, false, true);
+
+ _bus = new BusBuilder()
+ .Configure(new AutoConfigurationModelBuilder()
+ .WithCallingAssembly()
+ .WithSubscriptionConvention(new MessageHandlerSubscrptionConvention())
+ .WithDependencyResolver(new TestDependencyResolver())
+ .Build())
+ .Build();
+ _bus.Connect();
+ };
Cleanup after = () =>
- {
- _bus.Close();
- _exchange.Close();
- };
+ {
+ _bus.Close();
+ _exchange.Close();
+ };
Because of =
() =>
new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
- It should_receive_the_message_when_published = () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
+ It should_receive_the_message_when_published =
+ () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
+ }
+
+ [Integration]
+ [Subject("Convention configuration")]
+ public class when_subscribing_by_convention_with_handler_dependencies
+ {
+ static RabbitExchange _exchange;
+ static Bus _bus;
+
+ Establish context = () =>
+ {
+ _exchange = new RabbitExchange("localhost", "DependencyAutoMessage", ExchangeType.Direct, false, true);
+
+ _bus = new BusBuilder()
+ .Configure(new AutoConfigurationModelBuilder()
+ .WithCallingAssembly()
+ .WithSubscriptionConvention(new MessageHandlerSubscrptionConvention())
+ .WithDependencyResolver(new TestDependencyResolver())
+ .Build())
+ .Build();
+ _bus.Connect();
+ };
+
+ Cleanup after = () =>
+ {
+ _bus.Close();
+ _exchange.Close();
+ };
+
+ Because of =
+ () =>
+ new Action(() => _exchange.Publish(new DependencyAutoMessage("test"))).BlockUntil(
+ () => DependencyAutoMessageHandler.Message != null)();
+
+ It should_receive_the_message_when_published =
+ () => DependencyAutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
}
[Integration]
@@ -114,27 +159,31 @@ public class when_subscribing_by_convention_using_the_default_conventions
static Bus _bus;
Establish context = () =>
- {
- _exchange = new RabbitExchange("localhost", "AutoMessage", ExchangeType.Direct, false, true);
-
- _bus = new BusBuilder()
- .Configure(new AutoConfigurationModelBuilder()
- .WithCallingAssembly()
- .WithDefaultConventions()
- .Build())
- .Build();
- _bus.Connect();
- };
+ {
+ _exchange = new RabbitExchange("localhost", "AutoMessage", ExchangeType.Direct, false, true);
+
+ _bus = new BusBuilder()
+ .Configure(new AutoConfigurationModelBuilder()
+ .WithCallingAssembly()
+ .WithDefaultConventions()
+ .WithDependencyResolver(new TestDependencyResolver())
+ .Build())
+ .Build();
+ _bus.Connect();
+ };
Cleanup after = () =>
- {
- _bus.Close();
- _exchange.Close();
- };
+ {
+ _bus.Close();
+ _exchange.Close();
+ };
- Because of = () => new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
+ Because of =
+ () =>
+ new Action(() => _exchange.Publish(new AutoMessage("test"))).BlockUntil(() => AutoMessageHandler.Message != null)();
- It should_receive_the_message_when_published = () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
+ It should_receive_the_message_when_published =
+ () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
}
[Integration]
@@ -149,10 +198,13 @@ public class when_configuring_publish_by_convention
{
_queue = new RabbitQueue("localhost", SpecId, ExchangeType.Direct, SpecId, false, true, false, true);
+ var builder = new BusBuilder();
+
_bus = new BusBuilder()
.Configure(new AutoConfigurationModelBuilder()
.WithAssembly(Assembly.GetExecutingAssembly())
.WithPublishConfigurationConvention(new TestPublishConfigurationConvention(SpecId))
+ .WithDependencyResolver(new TestDependencyResolver())
.Build())
.Build();
_bus.Connect();
@@ -166,7 +218,8 @@ public class when_configuring_publish_by_convention
Because of = () => _bus.Publish(new AutoMessage("test"));
- It should_be_able_to_publish_messages_matching_convention = () => _queue.GetMessage<AutoMessage>().Text.ShouldEqual("test");
+ It should_be_able_to_publish_messages_matching_convention =
+ () => _queue.GetMessage<AutoMessage>().Text.ShouldEqual("test");
}
[Integration]
@@ -176,23 +229,22 @@ public class when_configuring_publish_by_convention_using_the_default_convention
static Bus _bus;
Establish context = () =>
- {
- _bus = new BusBuilder()
- .Configure(new AutoConfigurationModelBuilder()
- .WithCallingAssembly()
- .WithDefaultConventions()
- .Build())
- .Build();
- _bus.Connect();
- };
+ {
+ _bus = new BusBuilder()
+ .Configure(new AutoConfigurationModelBuilder()
+ .WithCallingAssembly()
+ .WithDefaultConventions()
+ .WithDependencyResolver(new TestDependencyResolver())
+ .Build())
+ .Build();
+ _bus.Connect();
+ };
- Cleanup after = () =>
- {
- _bus.Close();
- };
+ Cleanup after = () => { _bus.Close(); };
Because of = () => _bus.Publish(new AutoMessage("test"));
- It should_be_able_to_publish_messages_matching_convention = () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
+ It should_be_able_to_publish_messages_matching_convention =
+ () => AutoMessageHandler.Message.ProvideDefault().Text.ShouldEqual("test");
}
}
View
3  src/RabbitBus.Specs/RabbitBus.Specs.csproj
@@ -61,6 +61,7 @@
<Compile Include="Infrastructure\AutoMessageHandler.cs" />
<Compile Include="Infrastructure\ByteExtensions.cs" />
<Compile Include="Infrastructure\ConsoleLogger.cs" />
+ <Compile Include="Infrastructure\DependencyAutoMessageHandler.cs" />
<Compile Include="Infrastructure\Enumerator.cs" />
<Compile Include="Infrastructure\BusExtensions.cs" />
<Compile Include="Infrastructure\ErrorSerializationStrategy.cs" />
@@ -70,6 +71,7 @@
<Compile Include="Infrastructure\RabbitHandlers.cs" />
<Compile Include="Infrastructure\RabbitService.cs" />
<Compile Include="Infrastructure\StatusUpdateHandler.cs" />
+ <Compile Include="Infrastructure\TestDependencyResolver.cs" />
<Compile Include="Infrastructure\TestPublishConfigurationConvention.cs" />
<Compile Include="Infrastructure\NamedAutoMessageConsumeConfigurationConvention.cs" />
<Compile Include="Infrastructure\Wait.cs" />
@@ -104,6 +106,7 @@
<Compile Include="Infrastructure\RabbitExchange.cs" />
<Compile Include="Infrastructure\RabbitQueue.cs" />
<Compile Include="TestTypes\AutoMessage.cs" />
+ <Compile Include="TestTypes\DependencyAutoMessage.cs" />
<Compile Include="TestTypes\ReplyMessage.cs" />
<Compile Include="TestTypes\RequestMessage.cs" />
<Compile Include="TestTypes\StatusUpdate.cs" />
View
1  src/RabbitBus.Specs/TestTypes/AutoMessage.cs
@@ -7,7 +7,6 @@ public class AutoMessage
{
public AutoMessage()
{
-
}
public AutoMessage(string text)
View
13 src/RabbitBus.Specs/TestTypes/DependencyAutoMessage.cs
@@ -0,0 +1,13 @@
+using System;
+
+namespace RabbitBus.Specs.TestTypes
+{
+ [Serializable]
+ public class DependencyAutoMessage : AutoMessage
+ {
+ public DependencyAutoMessage(string text)
+ {
+ Text = text;
+ }
+ }
+}
View
40 src/RabbitBus/Configuration/AutoConfigurationModelBuilder.cs
@@ -8,11 +8,26 @@ public class AutoConfigurationModelBuilder
{
readonly IList<Assembly> _assemblies = new List<Assembly>();
- readonly IList<IConsumeConfigurationConvention> _consumeConfigurationConventions = new List<IConsumeConfigurationConvention>();
+ readonly IList<IConsumeConfigurationConvention> _consumeConfigurationConventions =
+ new List<IConsumeConfigurationConvention>();
- readonly IList<IPublishConfigurationConvention> _publishConfigurationConventions = new List<IPublishConfigurationConvention>();
+ readonly IList<IPublishConfigurationConvention> _publishConfigurationConventions =
+ new List<IPublishConfigurationConvention>();
readonly IList<ISubscriptionConvention> _subscriptionConventions = new List<ISubscriptionConvention>();
+ IDependencyResolver _dependencyResolver = new DefaultDependencyResolver();
+
+ public IAutoConfigurationModel Build()
+ {
+ return new AutoConfigurationModel
+ {
+ Assemblies = _assemblies,
+ ConsumeConfigurationConventions = _consumeConfigurationConventions,
+ PublishConfigurationConventions = _publishConfigurationConventions,
+ SubscriptionConventions = _subscriptionConventions,
+ DependencyResolver = _dependencyResolver
+ };
+ }
public AutoConfigurationModelBuilder WithAssembly(Assembly assembly)
{
@@ -26,13 +41,15 @@ public AutoConfigurationModelBuilder WithCallingAssembly()
return this;
}
- public AutoConfigurationModelBuilder WithPublishConfigurationConvention(IPublishConfigurationConvention publishConfigurationConvention)
+ public AutoConfigurationModelBuilder WithPublishConfigurationConvention(
+ IPublishConfigurationConvention publishConfigurationConvention)
{
_publishConfigurationConventions.Add(publishConfigurationConvention);
return this;
}
- public AutoConfigurationModelBuilder WithConsumeConfigurationConvention(IConsumeConfigurationConvention consumeConfigurationConvention)
+ public AutoConfigurationModelBuilder WithConsumeConfigurationConvention(
+ IConsumeConfigurationConvention consumeConfigurationConvention)
{
_consumeConfigurationConventions.Add(consumeConfigurationConvention);
return this;
@@ -44,20 +61,15 @@ public AutoConfigurationModelBuilder WithSubscriptionConvention(ISubscriptionCon
return this;
}
- public IAutoConfigurationModel Build()
+ public AutoConfigurationModelBuilder WithDefaultConventions()
{
- return new AutoConfigurationModel
- {
- Assemblies = _assemblies,
- ConsumeConfigurationConventions = _consumeConfigurationConventions,
- PublishConfigurationConventions = _publishConfigurationConventions,
- SubscriptionConventions = _subscriptionConventions
- };
+ _subscriptionConventions.Add(new MessageHandlerSubscrptionConvention());
+ return this;
}
- public AutoConfigurationModelBuilder WithDefaultConventions()
+ public AutoConfigurationModelBuilder WithDependencyResolver(IDependencyResolver dependencyResolver)
{
- _subscriptionConventions.Add(new MessageHandlerSubscrptionConvention());
+ _dependencyResolver = dependencyResolver;
return this;
}
}
View
12 src/RabbitBus/Configuration/DefaultDependencyResolver.cs
@@ -0,0 +1,12 @@
+using System;
+
+namespace RabbitBus.Configuration
+{
+ class DefaultDependencyResolver : IDependencyResolver
+ {
+ public object Resolve(Type handlerType)
+ {
+ return Activator.CreateInstance(handlerType);
+ }
+ }
+}
View
1  src/RabbitBus/Configuration/IAutoConfigurationModel.cs
@@ -9,5 +9,6 @@ public interface IAutoConfigurationModel
IEnumerable<IConsumeConfigurationConvention> ConsumeConfigurationConventions { get; set; }
IEnumerable<IPublishConfigurationConvention> PublishConfigurationConventions { get; set; }
IEnumerable<ISubscriptionConvention> SubscriptionConventions { get; set; }
+ IDependencyResolver DependencyResolver { get; set; }
}
}
View
9 src/RabbitBus/Configuration/IDependencyResolver.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace RabbitBus.Configuration
+{
+ public interface IDependencyResolver
+ {
+ object Resolve(Type handlerType);
+ }
+}
View
1  src/RabbitBus/Configuration/Internal/AutoConfigurationModel.cs
@@ -9,5 +9,6 @@ class AutoConfigurationModel : IAutoConfigurationModel
public IEnumerable<IConsumeConfigurationConvention> ConsumeConfigurationConventions { get; set; }
public IEnumerable<IPublishConfigurationConvention> PublishConfigurationConventions { get; set; }
public IEnumerable<ISubscriptionConvention> SubscriptionConventions { get; set; }
+ public IDependencyResolver DependencyResolver { get; set; }
}
}
View
8 src/RabbitBus/Configuration/Internal/AutoConfigurator.cs
@@ -24,7 +24,7 @@ void ConfigureSubscriptions(IConfigurationModel configurationModel, IAutoConfigu
{
if (convention.ShouldRegister(handlerType))
{
- object handler = Activator.CreateInstance(handlerType);
+ object handler = autoConfigurationModel.DependencyResolver.Resolve(handlerType);
Type messageType = convention.GetMessageType(handler);
MethodInfo openGetMessageHandlerMethodInfo =
typeof (ISubscriptionConvention).GetMethod("GetMessageHandler", BindingFlags.Instance | BindingFlags.Public);
@@ -36,16 +36,14 @@ void ConfigureSubscriptions(IConfigurationModel configurationModel, IAutoConfigu
}
}
- void ConfigurePublish(IConfigurationModel configurationModel, IAutoConfigurationModel autoConfigurationModel,
- Type type)
+ void ConfigurePublish(IConfigurationModel configurationModel, IAutoConfigurationModel autoConfigurationModel, Type type)
{
foreach (IPublishConfigurationConvention convention in autoConfigurationModel.PublishConfigurationConventions)
{
if (convention.ShouldRegister(type))
{
PublishInfo publishInfo = GetPublishInfo(type, convention);
- configurationModel.PublishRouteConfiguration.AddPolicy<MappingRouteInfoLookupStrategy<IPublishInfo>>(type,
- publishInfo);
+ configurationModel.PublishRouteConfiguration.AddPolicy<MappingRouteInfoLookupStrategy<IPublishInfo>>(type, publishInfo);
}
}
}
View
2  src/RabbitBus/Configuration/MessageHandlerSubscrptionConvention.cs
@@ -9,7 +9,7 @@ public bool ShouldRegister(Type handlerType)
{
return
handlerType.GetInterfaces()
- .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof (IMessageHandler<>));
+ .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof (IMessageHandler<>));
}
public Action<IMessageContext<TMessage>> GetMessageHandler<TMessage>(object handler)
View
2  src/RabbitBus/RabbitBus.csproj
@@ -56,6 +56,8 @@
</Compile>
<Compile Include="Configuration\AutoConfigurationModelBuilder.cs" />
<Compile Include="Configuration\BinarySerializationStrategy.cs" />
+ <Compile Include="Configuration\DefaultDependencyResolver.cs" />
+ <Compile Include="Configuration\IDependencyResolver.cs" />
<Compile Include="Configuration\IMessage.cs" />
<Compile Include="Configuration\MessageHandlerSubscrptionConvention.cs" />
<Compile Include="Configuration\IAutoConfigurationModel.cs" />
View
465 src/RabbitBus/Subscription.cs
@@ -12,233 +12,240 @@
namespace RabbitBus
{
- class Subscription<TMessage> : ISubscription
- {
- readonly Action<IMessageContext<TMessage>> _callback;
- readonly TimeSpan _callbackTimeout;
- readonly IConsumeInfo _consumeInfo;
- readonly IDeadLetterStrategy _deadLetterStrategy;
- readonly Action<IErrorContext> _defaultErrorCallback;
- readonly ISerializationStrategy _defaultSerializationStrategy;
- readonly IDictionary _exchangeArguments;
- readonly IMessagePublisher _messagePublisher;
- readonly string _routingKey;
- readonly Stopwatch _stopwatch = new Stopwatch();
- readonly SubscriptionType _subscriptionType;
- IConnection _connection;
- QueueingBasicConsumer _consumer;
- Thread _thread;
- bool _threadCancelled;
-
- public Subscription(IConnection connection, IDeadLetterStrategy deadLetterStrategy,
- ISerializationStrategy defaultSerializationStrategy, IConsumeInfo consumeInfo, string routingKey,
- Action<IMessageContext<TMessage>> callback, IDictionary exchangeArguments,
- Action<IErrorContext> defaultErrorCallback, IMessagePublisher messagePublisher,
- SubscriptionType subscriptionType, TimeSpan callbackTimeout)
- {
- _connection = connection;
- _deadLetterStrategy = deadLetterStrategy;
- _defaultSerializationStrategy = defaultSerializationStrategy;
- _consumeInfo = consumeInfo;
- _routingKey = routingKey ?? _consumeInfo.DefaultRoutingKey;
- _callback = callback;
- _defaultErrorCallback = defaultErrorCallback;
- _messagePublisher = messagePublisher;
- _subscriptionType = subscriptionType;
- _callbackTimeout = callbackTimeout;
- _exchangeArguments = exchangeArguments;
- }
-
- public void Start()
- {
- try
- {
- ILogger logger = Logger.Current;
- IModel channel = _connection.CreateModel();
- channel.ModelShutdown += channel_ModelShutdown;
-
- if (_consumeInfo.ExchangeName != string.Empty)
- {
- channel.ExchangeDeclare(_consumeInfo.ExchangeName, _consumeInfo.ExchangeType, _consumeInfo.IsExchangeDurable,
- _consumeInfo.IsExchangeAutoDelete, null);
- }
- channel.QueueDeclare(_consumeInfo.QueueName, _consumeInfo.IsQueueDurable, _consumeInfo.Exclusive,
- _consumeInfo.IsQueueAutoDelete, _exchangeArguments);
- if (_consumeInfo.ExchangeName != string.Empty)
- {
- channel.QueueBind(_consumeInfo.QueueName, _consumeInfo.ExchangeName, _routingKey, _exchangeArguments);
- }
-
- _consumer = new QueueingBasicConsumer(channel);
- channel.BasicQos(0, _consumeInfo.QualityOfService, false);
- channel.BasicConsume(_consumeInfo.QueueName, _consumeInfo.IsAutoAcknowledge, _consumer);
-
- string log;
- _thread = new Thread(() =>
- {
- _stopwatch.Start();
-
- log =
- string.Format(
- "Starting thread for subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
- _connection.Endpoint.HostName,
- _connection.Endpoint.Port,
- _consumeInfo.ExchangeName,
- _consumeInfo.QueueName,
- _routingKey);
- logger.Write(log, TraceEventType.Information);
- while (true)
- {
- if (WaitExceeded() || _threadCancelled)
- {
- break;
- }
-
- BasicDeliverEventArgs eventArgs = null;
-
- try
- {
- object eArgs = null;
- _consumer.Queue.Dequeue(1000, out eArgs);
-
- if (eArgs != null)
- {
- eventArgs = (BasicDeliverEventArgs) eArgs;
- logger.Write(string.Format("Message received: {0} bytes", eventArgs.Body.Length),
- TraceEventType.Information);
- ISerializationStrategy serializationStrategy = _consumeInfo.SerializationStrategy ??
- _defaultSerializationStrategy;
- object message = serializationStrategy.Deserialize<TMessage>(eventArgs.Body);
-
- var messageContext = new MessageContext<TMessage>(_deadLetterStrategy, (TMessage) message, _consumeInfo,
- channel,
- eventArgs.DeliveryTag, eventArgs.Redelivered,
- eventArgs.Exchange, eventArgs.RoutingKey,
- eventArgs.BasicProperties, eventArgs.Body, _messagePublisher);
-
- _callback(messageContext);
-
- if (_subscriptionType == SubscriptionType.RemoteProcedure)
- {
- log =
- string.Format(
- "Terminating RPC subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
- _connection.Endpoint.HostName,
- _connection.Endpoint.Port,
- _consumeInfo.ExchangeName,
- _consumeInfo.QueueName,
- _routingKey);
- logger.Write(log, TraceEventType.Information);
- break;
- }
- }
- }
- catch (EndOfStreamException)
- {
- logger.Write("Received EndOfStreamException.", TraceEventType.Information);
- InvokeErrorCallback(eventArgs, channel);
- channel.Dispose();
- channel = null;
- logger.Write("Subscription terminated.", TraceEventType.Information);
- break;
- }
- catch (AlreadyClosedException e)
- {
- Logger.Current.Write(string.Format("An AlreadyClosedException occurred: {0} {1}", e.Message, e.StackTrace), TraceEventType.Error);
- InvokeErrorCallback(eventArgs, channel);
- break;
- }
- catch (Exception e)
- {
- Logger.Current.Write("An exception occurred while dequeuing a message: " + e.Message, TraceEventType.Error);
- InvokeErrorCallback(eventArgs, channel);
- }
- }
- });
-
- _thread.Start();
-
- log = string.Format("Subscribed to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
- _connection.Endpoint.HostName,
- _connection.Endpoint.Port,
- _consumeInfo.ExchangeName,
- _consumeInfo.QueueName,
- _routingKey);
- Logger.Current.Write(new LogEntry {Message = log});
- }
- catch (Exception e)
- {
- Logger.Current.Write("An exception occurred starting the subscription: " + e.Message, TraceEventType.Error);
- }
- }
-
- void channel_ModelShutdown(IModel model, ShutdownEventArgs reason)
- {
- try
- {
- Logger.Current.Write("Closing the channel ... ", TraceEventType.Information);
- model.Close();
- _threadCancelled = true;
- Logger.Current.Write("Channel closed.", TraceEventType.Information);
- }
- catch (Exception e)
- {
- Logger.Current.Write("An exception occurred closing the channel: " + e.Message, TraceEventType.Error);
- }
- }
-
- void InvokeErrorCallback(BasicDeliverEventArgs eventArgs, IModel channel)
- {
- try
- {
- Action<IErrorContext> errorCallback = _consumeInfo.ErrorCallback ?? _defaultErrorCallback;
- errorCallback(new ErrorContext(channel, eventArgs));
- }
- catch (Exception exception)
- {
- Logger.Current.Write("An exception occurred invoking the registered error callback: " + exception.Message,
- TraceEventType.Error);
- }
- }
-
- public void Stop()
- {
- string log =
- string.Format(
- "Stopping subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
- _connection.Endpoint.HostName,
- _connection.Endpoint.Port,
- _consumeInfo.ExchangeName,
- _consumeInfo.QueueName,
- _routingKey);
- Logger.Current.Write(log, TraceEventType.Information);
- _threadCancelled = true;
- _thread.Join();
- _threadCancelled = false;
- _thread = null;
- }
-
- public void Renew(IConnection connection)
- {
- Stop();
- _connection = connection;
- Start();
- }
-
- bool WaitExceeded()
- {
- if (_callbackTimeout == TimeSpan.MinValue)
- {
- return false;
- }
-
- return _stopwatch.ElapsedTicks > _callbackTimeout.Ticks;
- }
- }
-
- public enum SubscriptionType
- {
- Subscription,
- RemoteProcedure
- }
+ class Subscription<TMessage> : ISubscription
+ {
+ readonly Action<IMessageContext<TMessage>> _callback;
+ readonly TimeSpan _callbackTimeout;
+ readonly IConsumeInfo _consumeInfo;
+ readonly IDeadLetterStrategy _deadLetterStrategy;
+ readonly Action<IErrorContext> _defaultErrorCallback;
+ readonly ISerializationStrategy _defaultSerializationStrategy;
+ readonly IDictionary _exchangeArguments;
+ readonly IMessagePublisher _messagePublisher;
+ readonly string _routingKey;
+ readonly Stopwatch _stopwatch = new Stopwatch();
+ readonly SubscriptionType _subscriptionType;
+ IConnection _connection;
+ QueueingBasicConsumer _consumer;
+ Thread _thread;
+ bool _threadCancelled;
+
+ public Subscription(IConnection connection, IDeadLetterStrategy deadLetterStrategy,
+ ISerializationStrategy defaultSerializationStrategy, IConsumeInfo consumeInfo,
+ string routingKey,
+ Action<IMessageContext<TMessage>> callback, IDictionary exchangeArguments,
+ Action<IErrorContext> defaultErrorCallback, IMessagePublisher messagePublisher,
+ SubscriptionType subscriptionType, TimeSpan callbackTimeout)
+ {
+ _connection = connection;
+ _deadLetterStrategy = deadLetterStrategy;
+ _defaultSerializationStrategy = defaultSerializationStrategy;
+ _consumeInfo = consumeInfo;
+ _routingKey = routingKey ?? _consumeInfo.DefaultRoutingKey;
+ _callback = callback;
+ _defaultErrorCallback = defaultErrorCallback;
+ _messagePublisher = messagePublisher;
+ _subscriptionType = subscriptionType;
+ _callbackTimeout = callbackTimeout;
+ _exchangeArguments = exchangeArguments;
+ }
+
+ public void Start()
+ {
+ try
+ {
+ IModel channel = _connection.CreateModel();
+ channel.ModelShutdown += ChannelModelShutdown;
+
+ if (_consumeInfo.ExchangeName != string.Empty)
+ {
+ channel.ExchangeDeclare(_consumeInfo.ExchangeName, _consumeInfo.ExchangeType,
+ _consumeInfo.IsExchangeDurable,
+ _consumeInfo.IsExchangeAutoDelete, null);
+ }
+ channel.QueueDeclare(_consumeInfo.QueueName, _consumeInfo.IsQueueDurable, _consumeInfo.Exclusive,
+ _consumeInfo.IsQueueAutoDelete, _exchangeArguments);
+ if (_consumeInfo.ExchangeName != string.Empty)
+ {
+ channel.QueueBind(_consumeInfo.QueueName, _consumeInfo.ExchangeName, _routingKey, _exchangeArguments);
+ }
+
+ _consumer = new QueueingBasicConsumer(channel);
+ channel.BasicQos(0, _consumeInfo.QualityOfService, false);
+ channel.BasicConsume(_consumeInfo.QueueName, _consumeInfo.IsAutoAcknowledge, _consumer);
+ _thread = new Thread(() => Subscribe(channel));
+ _thread.Start();
+
+ string log =
+ string.Format(
+ "Subscribed to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
+ _connection.Endpoint.HostName,
+ _connection.Endpoint.Port,
+ _consumeInfo.ExchangeName,
+ _consumeInfo.QueueName,
+ _routingKey);
+ Logger.Current.Write(new LogEntry {Message = log});
+ }
+ catch (Exception e)
+ {
+ Logger.Current.Write("An exception occurred starting the subscription: " + e.Message,
+ TraceEventType.Error);
+ }
+ }
+
+ public void Stop()
+ {
+ string log =
+ string.Format(
+ "Stopping subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
+ _connection.Endpoint.HostName,
+ _connection.Endpoint.Port,
+ _consumeInfo.ExchangeName,
+ _consumeInfo.QueueName,
+ _routingKey);
+ Logger.Current.Write(log, TraceEventType.Information);
+ _threadCancelled = true;
+ _thread.Join();
+ _threadCancelled = false;
+ _thread = null;
+ }
+
+ public void Renew(IConnection connection)
+ {
+ Stop();
+ _connection = connection;
+ Start();
+ }
+
+ void Subscribe(IModel channel)
+ {
+ ILogger logger = Logger.Current;
+ _stopwatch.Start();
+
+ string log =
+ string.Format(
+ "Starting thread for subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
+ _connection.Endpoint.HostName,
+ _connection.Endpoint.Port,
+ _consumeInfo.ExchangeName,
+ _consumeInfo.QueueName,
+ _routingKey);
+ logger.Write(log, TraceEventType.Information);
+
+ while (true)
+ {
+ if (WaitExceeded() || _threadCancelled)
+ {
+ break;
+ }
+
+ BasicDeliverEventArgs eventArgs = null;
+
+ try
+ {
+ object eArgs = null;
+ _consumer.Queue.Dequeue(1000, out eArgs);
+
+ if (eArgs != null)
+ {
+ eventArgs = (BasicDeliverEventArgs) eArgs;
+ logger.Write(string.Format("Message received: {0} bytes", eventArgs.Body.Length), TraceEventType.Information);
+ ISerializationStrategy serializationStrategy = _consumeInfo.SerializationStrategy ??
+ _defaultSerializationStrategy;
+ object message = serializationStrategy.Deserialize<TMessage>(eventArgs.Body);
+
+ var messageContext = new MessageContext<TMessage>(_deadLetterStrategy, (TMessage) message,
+ _consumeInfo,
+ channel,
+ eventArgs.DeliveryTag, eventArgs.Redelivered,
+ eventArgs.Exchange, eventArgs.RoutingKey,
+ eventArgs.BasicProperties, eventArgs.Body,
+ _messagePublisher);
+
+ _callback(messageContext);
+
+ if (_subscriptionType == SubscriptionType.RemoteProcedure)
+ {
+ log =
+ string.Format(
+ "Terminating RPC subscription to messages from host: {0}, port: {1}, exchange: {2}, queue: {3}, routingKey: {4}",
+ _connection.Endpoint.HostName,
+ _connection.Endpoint.Port,
+ _consumeInfo.ExchangeName,
+ _consumeInfo.QueueName,
+ _routingKey);
+ logger.Write(log, TraceEventType.Information);
+ break;
+ }
+ }
+ }
+ catch (EndOfStreamException)
+ {
+ logger.Write("Received EndOfStreamException.", TraceEventType.Information);
+ InvokeErrorCallback(eventArgs, channel);
+ channel.Dispose();
+ channel = null;
+ logger.Write("Subscription terminated.", TraceEventType.Information);
+ break;
+ }
+ catch (AlreadyClosedException e)
+ {
+ Logger.Current.Write(string.Format("An AlreadyClosedException occurred: {0} {1}", e.Message, e.StackTrace), TraceEventType.Error);
+ InvokeErrorCallback(eventArgs, channel);
+ break;
+ }
+ catch (Exception e)
+ {
+ Logger.Current.Write("An exception occurred while dequeuing a message: " + e.Message, TraceEventType.Error);
+ InvokeErrorCallback(eventArgs, channel);
+ }
+ }
+ }
+
+ void ChannelModelShutdown(IModel model, ShutdownEventArgs reason)
+ {
+ try
+ {
+ Logger.Current.Write("Closing the channel ... ", TraceEventType.Information);
+ model.Close();
+ _threadCancelled = true;
+ Logger.Current.Write("Channel closed.", TraceEventType.Information);
+ }
+ catch (Exception e)
+ {
+ Logger.Current.Write("An exception occurred closing the channel: " + e.Message, TraceEventType.Error);
+ }
+ }
+
+ void InvokeErrorCallback(BasicDeliverEventArgs eventArgs, IModel channel)
+ {
+ try
+ {
+ Action<IErrorContext> errorCallback = _consumeInfo.ErrorCallback ?? _defaultErrorCallback;
+ errorCallback(new ErrorContext(channel, eventArgs));
+ }
+ catch (Exception exception)
+ {
+ Logger.Current.Write(
+ "An exception occurred invoking the registered error callback: " + exception.Message,
+ TraceEventType.Error);
+ }
+ }
+
+ bool WaitExceeded()
+ {
+ if (_callbackTimeout == TimeSpan.MinValue)
+ {
+ return false;
+ }
+
+ return _stopwatch.ElapsedTicks > _callbackTimeout.Ticks;
+ }
+ }
+
+ public enum SubscriptionType
+ {
+ Subscription,
+ RemoteProcedure
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.