Skip to content

Commit

Permalink
Exchange name used by producer can be overriden by message header
Browse files Browse the repository at this point in the history
  • Loading branch information
drasil committed Jul 26, 2013
1 parent 88f2a1d commit 2695c62
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -23,6 +23,8 @@ The routing key is optional, but Queue Name and Exchange Name are required for c

Producers can also defer the routing key to the message header, where the ROUTING_KEY header could be set to the appropriate routing key.

Producers can override the exchange name specified in the URI by setting the EXCHANGE_NAME Camel message header.

Options to the URI include the exchange type, which defaults to direct if none is specified.

For header based exchanges, the URI is similar but name/value pairs can be specified in place of the routing key. For example:
Expand Down
Expand Up @@ -24,6 +24,7 @@ public class SpringAMQPComponent extends DefaultComponent {
protected AmqpTemplate amqpTemplate;
protected AmqpAdmin amqpAdministration;
public static final String ROUTING_KEY_HEADER = "ROUTING_KEY";
public static final String EXCHANGE_NAME_HEADER = "EXCHANGE_NAME";

public SpringAMQPComponent() {
this.connectionFactory = new CachingConnectionFactory();
Expand Down
Expand Up @@ -139,11 +139,14 @@ public void run() {

String routingKeyHeader = message.getHeader(SpringAMQPComponent.ROUTING_KEY_HEADER, String.class);
String routingKey = routingKeyHeader != null ? routingKeyHeader : endpoint.routingKey;

String exchangeNameHeader = message.getHeader(SpringAMQPComponent.EXCHANGE_NAME_HEADER, String.class);
String exchangeName = exchangeNameHeader != null ? exchangeNameHeader : endpoint.exchangeName;

try {
if(exchange.getPattern().isOutCapable()) {
LOG.debug("Synchronous send and request for exchange {}", exchange.getExchangeId());
Message amqpResponse = endpoint.getAmqpTemplate().sendAndReceive(endpoint.exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
Message amqpResponse = endpoint.getAmqpTemplate().sendAndReceive(exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
SpringAMQPMessage camelResponse = SpringAMQPMessage.fromAMQPMessage(msgConverter, amqpResponse);

Boolean isExceptionCaught = (Boolean)camelResponse.getHeader(SpringAMQPMessage.IS_EXCEPTION_CAUGHT);
Expand All @@ -161,7 +164,7 @@ public void run() {
}
} else {
LOG.debug("Synchronous send for exchange {}", exchange.getExchangeId());
endpoint.getAmqpTemplate().send(endpoint.exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
endpoint.getAmqpTemplate().send(exchangeName, routingKey, inMessage.toAMQPMessage(msgConverter));
}
} catch (Throwable t) {
LOG.error("Could not deliver message via AMQP", t);
Expand Down

0 comments on commit 2695c62

Please sign in to comment.