Permalink
Browse files

Fixed routing keys being part of message production... they were not …

…being correctly sent along, especially for header values
  • Loading branch information...
1 parent 21cbc69 commit deb309155c62d948a66e986e5ea0439f1ba477f9 @deckerego deckerego committed Mar 15, 2012
View
@@ -21,6 +21,8 @@ If you wish to use a routing key, URIs have the structure:
The routing key is optional, but Queue Name and Exchange Name are required for consumers. Just the Exchange Name is required for producers.
+Producers can also defer the routing key to the message header, where the ROUTING_KEY header could be set to the appropriate routing key.
+
Options to the URI include the exchange type, which defaults to direct if none is specified.
For header based exchanges, the URI is similar but name/value pairs can be specified in place of the routing key. For example:
@@ -109,5 +109,10 @@ public AmqpTemplate getAmqpTemplate() {
public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
- }
+ }
+
+ public static Throwable findRootCause(Throwable t) {
+ if(t.getCause() == null) return t;
+ return findRootCause(t.getCause());
+ }
}
@@ -109,8 +109,8 @@ public void doStart() throws Exception {
if(this.endpoint.isUsingDefaultExchange()) {
LOG.info("Default exchange is implicitly bound to every queue, with a routing key equal to the queue name");
} else if (this.binding != null) {
+ LOG.info("Declaring binding {}", this.binding.getRoutingKey());
this.endpoint.getAmqpAdministration().declareBinding(binding);
- LOG.info("Declared binding {}", this.binding.getRoutingKey());
}
this.messageListener.start();
@@ -239,14 +239,17 @@ protected String createEndpointUri() {
}
org.springframework.amqp.core.Exchange createAMQPExchange() {
- if(this.exchangeType == null || "direct".equals(this.exchangeType)) {
+ if("direct".equals(this.exchangeType)) {
return new DirectExchange(this.exchangeName, this.durable, this.autodelete);
} else if("fanout".equals(this.exchangeType)) {
return new FanoutExchange(this.exchangeName, this.durable, this.autodelete);
} else if("headers".equals(this.exchangeType)) {
return new HeadersExchange(this.exchangeName, this.durable, this.autodelete);
} else if("topic".equals(this.exchangeType)) {
return new TopicExchange(this.exchangeName, this.durable, this.autodelete);
+ //We have a routing key but no explicit exchange type, assume topic exchange
+ } else if(this.routingKey != null) {
+ return new TopicExchange(this.exchangeName, this.durable, this.autodelete);
} else {
return new DirectExchange(this.exchangeName, this.durable, this.autodelete);
}
@@ -11,6 +11,7 @@
import org.apache.camel.impl.DefaultAsyncProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
@@ -59,10 +60,17 @@ public void doStart() throws Exception {
this.exchange = this.endpoint.createAMQPExchange();
if (this.endpoint.isUsingDefaultExchange()) {
- LOG.info("Using default exchange");
+ LOG.info("Using default exchange of type {}", this.exchange.getClass().getSimpleName());
} else {
- this.endpoint.amqpAdministration.declareExchange(this.exchange);
- LOG.info("Declared exchange {}", this.exchange.getName());
+ LOG.info("Declaring exchange {} of type {}", this.exchange.getName(), this.exchange.getClass().getSimpleName());
+ try {
+ this.endpoint.amqpAdministration.declareExchange(this.exchange);
+ } catch(AmqpIOException e) {
+ //The actual reason for failed exceptions is often swallowed up by Camel or Spring, find it
+ Throwable rootCause = SpringAMQPComponent.findRootCause(e);
+ LOG.error("Could not initialize exchange!", rootCause);
+ throw e;
+ }
}
//Initialize execution pool
@@ -111,15 +119,18 @@ public void run() {
LOG.warn("Cannot find RabbitMQ AMQP Template, falling back to simple message converter");
msgConverter = new SimpleMessageConverter();
}
+
+ String routingKeyHeader = message.getHeader(SpringAMQPComponent.ROUTING_KEY_HEADER, String.class);
+ String routingKey = routingKeyHeader != null ? routingKeyHeader : endpoint.routingKey;
if(exchange.getPattern().isOutCapable()) {
LOG.debug("Synchronous send and request for exchange {}", exchange.getExchangeId());
- Message amqpResponse = endpoint.getAmqpTemplate().sendAndReceive(endpoint.exchangeName, endpoint.routingKey, inMessage.toAMQPMessage(msgConverter));
+ Message amqpResponse = endpoint.getAmqpTemplate().sendAndReceive(endpoint.exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
SpringAMQPMessage camelResponse = SpringAMQPMessage.fromAMQPMessage(msgConverter, amqpResponse);
exchange.setOut(camelResponse);
} else {
LOG.debug("Synchronous send for exchange {}", exchange.getExchangeId());
- endpoint.getAmqpTemplate().send(endpoint.exchangeName, endpoint.routingKey, inMessage.toAMQPMessage(msgConverter));
+ endpoint.getAmqpTemplate().send(endpoint.exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
}
if(callback != null)
@@ -19,6 +19,14 @@ public void testCreateContext() throws Exception {
Assert.assertNotNull(component);
}
+ @Test
+ public void testFindRootCause() throws Exception {
+ IllegalStateException child = new IllegalStateException("Child Exception");
+ RuntimeException parent = new RuntimeException("Parent Exception", child);
+ RuntimeException grandparent = new RuntimeException("Grandparent Exception", parent);
+ Assert.assertEquals(child, SpringAMQPComponent.findRootCause(grandparent));
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
ConnectionFactory factory = new CachingConnectionFactory();
@@ -54,7 +54,7 @@ public void testKeyValueParsing() throws Exception {
@Test
public void sendMessage() throws Exception {
- MockEndpoint mockEndpoint = context().getEndpoint("mock:test.a", MockEndpoint.class);
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:test.a");
mockEndpoint.expectedMessageCount(1);
context().createProducerTemplate().sendBodyAndHeader("spring-amqp:directExchange:test.a?durable=false&autodelete=true&exclusive=false", "sendMessage", "HeaderKey", "HeaderValue");
@@ -67,7 +67,7 @@ public void sendMessage() throws Exception {
@Test
public void sendAsyncMessage() throws Exception {
- MockEndpoint mockEndpoint = context().getEndpoint("mock:test.b", MockEndpoint.class);
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:test.b");
mockEndpoint.expectedMessageCount(1);
context().createProducerTemplate().asyncRequestBodyAndHeader("spring-amqp:directExchange:test.b?durable=false&autodelete=true&exclusive=false", "sendMessage", "HeaderKey", "HeaderValue");
@@ -80,7 +80,7 @@ public void sendAsyncMessage() throws Exception {
@Test
public void testHeaderAndExchange() throws Exception {
- MockEndpoint mockEndpointOne = context().getEndpoint("mock:test.b", MockEndpoint.class);
+ MockEndpoint mockEndpointOne = getMockEndpoint("mock:test.b");
mockEndpointOne.expectedMessageCount(1);
Map<String, Object> headersOne = new HashMap<String, Object>();
@@ -102,7 +102,7 @@ public void testHeaderAndExchange() throws Exception {
@Test
public void testHeaderOrExchange() throws Exception {
- MockEndpoint mockEndpointOne = context().getEndpoint("mock:test.d", MockEndpoint.class);
+ MockEndpoint mockEndpointOne = getMockEndpoint("mock:test.d");
mockEndpointOne.expectedMessageCount(2);
Map<String, Object> headersOne = new HashMap<String, Object>();
@@ -120,7 +120,7 @@ public void testHeaderOrExchange() throws Exception {
@Test
public void testDefaultExchange() throws Exception {
- MockEndpoint mockEndpointOne = context().getEndpoint("mock:test.e", MockEndpoint.class);
+ MockEndpoint mockEndpointOne = getMockEndpoint("mock:test.e");
mockEndpointOne.expectedMessageCount(1);
context().createProducerTemplate().sendBody("spring-amqp::test.e", "testBody");
@@ -130,7 +130,7 @@ public void testDefaultExchange() throws Exception {
@Test
public void sendMessageTTL() throws Exception {
- MockEndpoint mockEndpoint = context().getEndpoint("mock:test.a", MockEndpoint.class);
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:test.a");
mockEndpoint.expectedMessageCount(1);
context().createProducerTemplate().sendBodyAndHeader("spring-amqp:directExchange:test.a?durable=false&autodelete=true&exclusive=false&timeToLive=1000", "sendMessage", "HeaderKey", "HeaderValue");
@@ -71,6 +71,18 @@ public void testIsNotUsingDefaultExchangeFalse() {
Assert.assertFalse(endpoint.isUsingDefaultExchange());
}
+ @Test
+ public void testHashDelimiters() {
+ Component component = context().getComponent("spring-amqp", SpringAMQPComponent.class);
+ String remaining = "exchange1:#.routingKey1.#";
+ String uri = "spring-amqp"+remaining;
+
+ SpringAMQPEndpoint endpoint = new SpringAMQPEndpoint(component, uri, remaining, null, null);
+
+ //Ensure things can be printed correctly; setEndpoint(String) has had issues previously
+ Assert.assertNotNull(endpoint.toString());
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
ConnectionFactory factory = new CachingConnectionFactory();
@@ -10,6 +10,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
@@ -27,7 +28,7 @@ public void testCreateContext() throws Exception {
@Test
public void restartProducer() throws Exception {
- Producer producer = context().getEndpoint("spring-amqp:myExchange:test.z?durable=false&autodelete=true&exclusive=false").createProducer();
+ Producer producer = context().getEndpoint("spring-amqp:fanoutExchange?durable=false&autodelete=true&exclusive=false").createProducer();
producer.start();
producer.stop();
}
@@ -72,6 +73,22 @@ public void sendUsingDefaultExchange() throws Exception {
context().createProducerTemplate().sendBody("direct:test.y", null);
}
+ @Test
+ public void headerRoutingKey() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:test.v");
+ mockEndpoint.expectedMessageCount(1);
+ context().createProducerTemplate().sendBodyAndHeader("direct:test.v", new ProducerTestObject(), SpringAMQPComponent.ROUTING_KEY_HEADER, "test.v");
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ @Test
+ public void uriRoutingKey() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:test.u");
+ mockEndpoint.expectedMessageCount(1);
+ context().createProducerTemplate().sendBody("direct:test.u", new ProducerTestObject());
+ mockEndpoint.assertIsSatisfied();
+ }
+
@Override
protected CamelContext createCamelContext() throws Exception {
CachingConnectionFactory factory = new CachingConnectionFactory();
@@ -89,10 +106,15 @@ protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:test.y").to("spring-amqp::test.y?durable=false&autodelete=true&exclusive=false");
- from("direct:test.z").to("spring-amqp:myExchange:test.z?durable=false&autodelete=true&exclusive=false");
- from("direct:test.x").to("spring-amqp:myExchange:test.x?durable=false&autodelete=true&exclusive=false");
- from("direct:test.w").to("spring-amqp:myExchange:test.w?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.y").to("spring-amqp::?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.z").to("spring-amqp:fanoutExchange?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.x").to("spring-amqp:fanoutExchange?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.w").to("spring-amqp:fanoutExchange?durable=false&autodelete=true&exclusive=false");
+ from("direct:test.v").to("spring-amqp:topicExchange?type=topic&durable=false&autodelete=true&exclusive=false");
+ from("direct:test.u").to("spring-amqp:topicExchange:test.u?durable=false&autodelete=true&exclusive=false");
+
+ from("spring-amqp:topicExchange:queue.v:#.v?type=topic&durable=false&type=direct&autodelete=true&exclusive=false").to("mock:test.v");
+ from("spring-amqp:topicExchange:queue.u:#.u?type=topic&durable=false&type=direct&autodelete=true&exclusive=false").to("mock:test.u");
}
};
}

0 comments on commit deb3091

Please sign in to comment.