Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #19 from michael-alford/master

Version 1.3.1:  Lenient when re-declaring exchanges
  • Loading branch information...
commit 59e204c1a47fe41b76e535b7de11f425ad54f44a 2 parents 72fa933 + 8f71b4b
@michaelpage michaelpage authored
View
2  pom.xml
@@ -12,7 +12,7 @@
<groupId>com.bluelock</groupId>
<artifactId>camel-spring-amqp</artifactId>
- <version>1.3</version>
+ <version>1.3.1</version>
<packaging>jar</packaging>
<name>Camel :: Spring-AMQP</name>
View
50 src/main/java/amqp/spring/camel/component/SpringAMQPConsumer.java
@@ -45,7 +45,7 @@
private static transient final Logger LOG = LoggerFactory.getLogger(SpringAMQPConsumer.class);
private static final String TTL_QUEUE_ARGUMENT = "x-message-ttl";
private static final String HA_POLICY_ARGUMENT = "x-ha-policy";
-
+
protected SpringAMQPEndpoint endpoint;
private RabbitMQMessageListener messageListener;
@@ -86,14 +86,14 @@ public void doShutdown() throws Exception {
@Override
public void onCreate(Connection connection) {
- LOG.info("Network connection created to broker for endpoint {}", this.getEndpoint().getEndpointUri());
+ LOG.info("Network connection created to broker for endpoint {}", this.getEndpoint());
}
@Override
public void onClose(Connection connection) {
// This event is received when the consumer initiates a close,
// but this event is _not_ received when RabbitMQ is the one that breaks the connection.
- LOG.info("Network connection closed to broker for endpoint {}", this.getEndpoint().getEndpointUri());
+ LOG.info("Network connection closed to broker for endpoint {}", this.getEndpoint());
}
//We have to ask the RabbitMQ Template for converters, the interface doesn't have a way to get MessageConverter
@@ -267,6 +267,10 @@ public void execute(final Runnable task, long startTimeout) {
private SpringAMQPEndpoint endpoint;
private Runnable delegateTask;
+ // Retry every 30 seconds upon error
+ public static final long RECOVERY_INTERVAL_MILLISECONDS = 30000L;
+
+
public SpringAMQPExecutorTask(SpringAMQPEndpoint endpoint, Runnable delegateTask) {
this.endpoint = endpoint;
this.delegateTask = delegateTask;
@@ -274,7 +278,7 @@ public SpringAMQPExecutorTask(SpringAMQPEndpoint endpoint, Runnable delegateTask
@Override
public void run() {
- boolean error = false;
+ boolean error;
do {
try {
@@ -283,18 +287,13 @@ public void run() {
delegateTask.run();
} catch (Exception e) {
error = true;
- LOG.error(e.getMessage(), e);
- }
-
- long timeout = System.currentTimeMillis() + SimpleMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL;
-
- try {
- while (System.currentTimeMillis() < timeout) {
- Thread.sleep(200);
+ LOG.error("Error consuming endpoint " + endpoint + ". " + e.getMessage(), e);
+ try {
+ Thread.sleep(RECOVERY_INTERVAL_MILLISECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Unrecoverable interruption on consumer restart");
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Unrecoverable interruption on consumer restart");
}
} while (error);
}
@@ -308,14 +307,15 @@ protected void declareAMQPEntities() {
protected org.springframework.amqp.core.Exchange declareExchange() {
org.springframework.amqp.core.Exchange exchange = this.endpoint.createAMQPExchange();
if (this.endpoint.isUsingDefaultExchange()) {
- LOG.info("Using default exchange, will not declare one");
+ LOG.info("Using default exchange; will not declare one for endpoint {}.", endpoint);
} else {
try {
this.endpoint.amqpAdministration.declareExchange(exchange);
- LOG.info("Declared exchange {}", exchange.getName());
+ LOG.info("Declared exchange {} for endpoint {}.", exchange.getName(), endpoint);
} catch (AmqpIOException e) {
- LOG.warn(String.format("Could not declare exchange %s, possible re-declaration of a different type?", exchange.getName()), e);
- throw e;
+ LOG.warn(String.format("Could not declare exchange %s for endpoint %s; possible re-declaration of a different type?", exchange.getName(), endpoint.toString()), e);
+ // Be lenient: Do not re-throw Exception because the exchange may already exist but just declared
+ // with different attributes, so let's go ahead and declare the queue and binding anyway.
} catch (AmqpConnectException e) {
LOG.error(String.format("Consumer cannot connect to broker for endpoint %s", this.endpoint.toString()), e);
throw e;
@@ -335,7 +335,7 @@ protected Queue declareQueue() {
//Declare queue
Queue queue = new Queue(this.endpoint.queueName, this.endpoint.durable, this.endpoint.exclusive, this.endpoint.autodelete, queueArguments);
this.endpoint.getAmqpAdministration().declareQueue(queue);
- LOG.info("Declared queue {}", queue.getName());
+ LOG.info("Declared queue {} for endpoint {}.", queue.getName(), endpoint);
return queue;
}
@@ -357,19 +357,19 @@ protected Binding declareBinding(org.springframework.amqp.core.Exchange exchange
else
binding = mapConfig.whereAll(keyValues).match();
- //Is this a fanout exchange? Just bind the queue and exchange directly
+ //Is this a fanout exchange? Just bind the queue and exchange directly
} else if(exchange instanceof FanoutExchange) {
binding = BindingBuilder.bind(queue).to((FanoutExchange) exchange);
- //Perform routing key binding for direct or topic exchanges
+ //Perform routing key binding for direct or topic exchanges
} else {
binding = BindingBuilder.bind(queue).to(exchange).with(this.endpoint.routingKey).noargs();
}
- if(this.endpoint.isUsingDefaultExchange()) {
- LOG.info("Default exchange is implicitly bound to every queue, with a routing key equal to the queue name");
+ if (this.endpoint.isUsingDefaultExchange()) {
+ LOG.info("Using default exchange for endpoint {}. Default exchange is implicitly bound to every queue, with a routing key equal to the queue name.", endpoint);
} else if (binding != null) {
- LOG.info("Declaring binding {}", binding.getRoutingKey());
+ LOG.info("Declaring binding {} for endpoint {}.", binding.getRoutingKey(), endpoint);
this.endpoint.getAmqpAdministration().declareBinding(binding);
}
Please sign in to comment.
Something went wrong with that request. Please try again.