Permalink
Browse files

Added initial connection reattempt feature.

  • Loading branch information...
1 parent 559b396 commit c4781017288c039d3a922b99d824fcb11de01ee4 @derekgreer committed Sep 12, 2012
View
@@ -2,8 +2,8 @@
[ "Machine.Specifications", "0.5.6.0" ],
[ "ExpectedObjects", "1.0.0.2" ],
[ "Moq", "4.0.10827" ],
-[ "RabbitMQ.Client", "2.8.4" ],
-[ "Newtonsoft.Json", "4.5.3" ]
+[ "RabbitMQ.Client", "2.8.6" ],
+[ "Newtonsoft.Json", "4.5.9" ]
]
configatron.packages = packages
@@ -9,9 +9,9 @@
<projectUrl>http://github.com/derekgreer/rabbitBus</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>RabbitMQ client library.</description>
- <tags>RabbitBus RabbitMQ Client AMQP Queue,Bus</tags>
+ <tags>RabbitBus RabbitMQ Client AMQP Queue Bus</tags>
<dependencies>
- <dependency id="RabbitMQ.Client" version="2.8.4" />
+ <dependency id="RabbitMQ.Client" version="[2.8.6,3.0)" />
</dependencies>
</metadata>
<files>
@@ -10,7 +10,7 @@
<tags>RabbitBus RabbitMQ Client AMQP Queue Bus JSON</tags>
<dependencies>
<dependency id="RabbitBus" version="" />
- <dependency id="Newtonsoft.Json" version="4.5.3" />
+ <dependency id="Newtonsoft.Json" version="[4.5.9,5.0)" />
</dependencies>
</metadata>
<files>
@@ -9,7 +9,7 @@ class ConsoleLogger : ILogger
public void Write(LogEntry logEntry)
{
int id = Thread.CurrentThread.ManagedThreadId;
- Console.WriteLine(string.Format("Thread Id:{0} -{1}:{2}", id, logEntry.Severity, logEntry.Message));
+ Console.WriteLine(string.Format("Thread Id:{0} [{1}] - {2}", id, logEntry.Severity, logEntry.Message));
}
}
}
@@ -5,7 +5,7 @@ namespace RabbitBus.Specs.Infrastructure
{
public static class Wait
{
- const int Timeout = 30;
+ const int Timeout = 15;
public static void Until(Func<bool> predicate)
{
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
using Machine.Specifications;
using Moq;
using RabbitBus.Specs.Infrastructure;
@@ -9,6 +10,39 @@
namespace RabbitBus.Specs.Integration
{
[Integration]
+ [Subject("Connection unavailable")]
+ public class when__the_broker_is_unavailable
+ {
+ static Bus _bus;
+ static bool _connectionEstablished;
+ static RabbitService _service;
+ static bool _connectionFailed;
+
+ Establish context = () =>
+ {
+ _service = new RabbitService();
+ _service.Stop();
+
+ _bus = new BusBuilder().Configure(ctx => ctx.WithLogger(new ConsoleLogger())).Build();
+ _bus.ConnectionEstablished += (sender, e) => { _connectionEstablished = true; };
+ _bus.ConnectionFailed += (sender, e) => { _connectionFailed = true; };
+ new Thread(() => _bus.Connect()).Start();
+ Wait.Until(() => _connectionFailed);
+ };
+
+
+ Cleanup after = () =>
+ {
+ _bus.Close();
+ _service.Start();
+ };
+
+ Because of = () => new Action(() => _service.Start()).BlockUntil(() => _connectionEstablished)();
+
+ It should_connect_when_the_broker_becomes_available = () => _connectionEstablished.ShouldBeTrue();
+ }
+
+ [Integration]
[Subject("Connection interruption")]
public class when_the_connection_is_restarted
{
@@ -80,7 +114,10 @@ public class when_publishing_an_event_when_the_connection_is_down_with_default_q
_rabbitQueue.Delete().Close();
};
- Because of = () => new Action(() => _actualMessage = _rabbitQueue.GetMessage<TestMessage>()).BlockUntil(() => _actualMessage.Text != "default")();
+ Because of =
+ () =>
+ new Action(() => _actualMessage = _rabbitQueue.GetMessage<TestMessage>()).BlockUntil(
+ () => _actualMessage.Text != "default")();
It should_publish_the_event_when_the_connection_is_restored = () => _actualMessage.Text.ShouldEqual("test");
}
@@ -187,11 +224,11 @@ public class when_configuring_a_reconnection_timeout_value
_bus = new BusBuilder()
.Configure(ctx => ctx
.WithLogger(new ConsoleLogger())
- .WithReconnectionAttemptInterval(TimeSpan.FromSeconds(5))
+ .WithReconnectionAttemptInterval(TimeSpan.FromSeconds(5))
.WithConnectionUnavailableQueueStrategy(new MemoryQueueStrategy())
.Publish<TestMessage>().WithExchange(SpecId, cfg => cfg.Not.AutoDelete().Durable())).Build();
_bus.Connect();
-
+
_bus.ConnectionEstablished += (b, e) => { _connectionRestablished = true; };
};
@@ -203,6 +240,7 @@ public class when_configuring_a_reconnection_timeout_value
Because of = () => new Action(() => new RabbitService().Restart()).BlockUntil(() => _connectionRestablished)();
- It should_reconnect_using_the_configured_timeout = () => _mockTimeProvider.Verify(x => x.Sleep(TimeSpan.FromSeconds(5)));
+ It should_reconnect_using_the_configured_timeout =
+ () => _mockTimeProvider.Verify(x => x.Sleep(TimeSpan.FromSeconds(5)));
}
}
View
@@ -8,6 +8,7 @@
using RabbitBus.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
namespace RabbitBus
@@ -149,45 +150,83 @@ public void Connect()
Connect("amqp://guest:guest@localhost:5672/%2f");
}
- public void Connect(string ampqUri)
+ public void Connect(string amqpUri)
{
+ Connect(amqpUri, TimeSpan.FromSeconds(30));
+ }
+
+ public void Connect(string amqpUri, TimeSpan timeout)
+ {
+ var amqpTcpEndpoint = new AmqpTcpEndpoint(new Uri(amqpUri));
+
+ Logger.Current.Write(string.Format("Establishing connection to host:{0}, port:{1}",
+ amqpTcpEndpoint.HostName, amqpTcpEndpoint.Port), TraceEventType.Information);
_connectionFactory = new ConnectionFactory
{
- Uri = ampqUri
+ Uri = amqpUri
};
_messagePublisher = new MessagePublisher(_connectionFactory.UserName,
_configurationModel.PublishRouteConfiguration,
_configurationModel.ConsumeRouteConfiguration,
_configurationModel.DefaultSerializationStrategy,
_configurationModel.ConnectionDownQueueStrategy);
- InitializeConnection(_connectionFactory);
+ InitializeConnection(_connectionFactory, timeout);
RegisterAutoSubscriptions(_configurationModel);
}
-
- void InitializeConnection(ConnectionFactory connectionFactory)
+
+ void InitializeConnection(ConnectionFactory connectionFactory, TimeSpan timeout)
{
- Logger.Current.Write("Initializing connection ...", TraceEventType.Information);
- _connection = connectionFactory.CreateConnection();
- // ------------------------------------------------------------------------------------------
- // Closing/disposing channels on IConnection.ConnectionShutdown causes a deadlock, so
- // the ISession.SessionShutdown event is used here to infer a connection shutdown.
- // ------------------------------------------------------------------------------------------
- ((ConnectionBase) _connection).m_session0.SessionShutdown += UnexpectedConnectionShutdown;
- _connection.CallbackException += _connection_CallbackException;
- _messagePublisher.SetConnection(_connection);
- _configurationModel.DefaultDeadLetterStrategy.SetConnection(_connection);
+ var timeoutInterval = TimeSpan.FromSeconds(10);
+ IConnection connection = null;
+ var stopwatch = new Stopwatch();
+ stopwatch.Start();
- Logger.Current.Write(new LogEntry
- {
- Message = string.Format("Connected to the RabbitMQ node on host:{0}, port:{1}.",
- _connection.Endpoint.HostName, _connection.Endpoint.Port)
- });
+ while(connection == null)
+ {
+ Logger.Current.Write("Initializing connection ...", TraceEventType.Information);
+
+ try
+ {
+ connection = connectionFactory.CreateConnection();
+ // ------------------------------------------------------------------------------------------
+ // Closing/disposing channels on IConnection.ConnectionShutdown causes a deadlock, so
+ // the ISession.SessionShutdown event is used here to infer a connection shutdown.
+ // ------------------------------------------------------------------------------------------
+ ((ConnectionBase) connection).m_session0.SessionShutdown += UnexpectedConnectionShutdown;
+ connection.CallbackException += ConnectionCallbackException;
+ _messagePublisher.SetConnection(connection);
+ _configurationModel.DefaultDeadLetterStrategy.SetConnection(connection);
+
+ Logger.Current.Write(new LogEntry
+ {
+ Message = string.Format("Connected to the RabbitMQ node on host:{0}, port:{1}.",
+ connection.Endpoint.HostName, connection.Endpoint.Port)
+ });
+
+ _connection = connection;
+ OnConnectionEstablished(EventArgs.Empty);
+ }
+ catch (BrokerUnreachableException e)
+ {
+ OnConnectionFailed(EventArgs.Empty);
+ Logger.Current.Write(string.Format("The connection initialization failed because the RabbitMQ broker was unavailable. Reattempting connection in {0} seconds.",
+ timeoutInterval.Seconds), TraceEventType.Warning);
+ TimeProvider.Current.Sleep(timeoutInterval);
+ }
+
+ if(stopwatch.Elapsed > timeout)
+ {
+ break;
+ }
+ }
- OnConnectionEstablished(EventArgs.Empty);
+ if(connection == null)
+ {
+ Logger.Current.Write("A connection to the RabbitMQ broker could not be established within the allotted time frame", TraceEventType.Critical);
+ }
}
- // kludge to work around a rabbitMQ API deadlock bug
void UnexpectedConnectionShutdown(ISession session, ShutdownEventArgs reason)
{
Logger.Current.Write("Connection was shut down.", TraceEventType.Information);
@@ -202,7 +241,7 @@ void UnexpectedConnectionShutdown(ISession session, ShutdownEventArgs reason)
}
}
- void _connection_CallbackException(object sender, CallbackExceptionEventArgs e)
+ void ConnectionCallbackException(object sender, CallbackExceptionEventArgs e)
{
Logger.Current.Write("CallbackException received: " + e.Exception.Message, TraceEventType.Information);
}
@@ -239,7 +278,7 @@ void Reconnect(TimeSpan timeSpan)
Logger.Current.Write(string.Format("Attempting reconnect with last known configuration in {0} seconds.",
timeSpan.ToString("ss")), TraceEventType.Information);
TimeProvider.Current.Sleep(_configurationModel.ReconnectionInterval);
- InitializeConnection(_connectionFactory);
+ InitializeConnection(_connectionFactory, TimeSpan.MinValue);
}
catch (Exception)
{
@@ -286,12 +325,20 @@ static void OnConsumeError(IErrorContext errorContext)
public event EventHandler ConnectionEstablished;
- public void OnConnectionEstablished(EventArgs e)
+ protected void OnConnectionEstablished(EventArgs e)
{
EventHandler handler = ConnectionEstablished;
if (handler != null) handler(this, e);
}
+ public event EventHandler ConnectionFailed;
+
+ protected void OnConnectionFailed(EventArgs e)
+ {
+ EventHandler handler = ConnectionFailed;
+ if (handler != null) handler(this, e);
+ }
+
~Bus()
{
Dispose(false);
Oops, something went wrong.

0 comments on commit c478101

Please sign in to comment.