Skip to content

Commit

Permalink
Allow exchange type and durability, queue durability and queue auto-d…
Browse files Browse the repository at this point in the history
…elete to be specified; closes #696
  • Loading branch information
Charles Duffy authored and kimchy committed Feb 22, 2011
1 parent 953fcbc commit d0780f0
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {

private final String rabbitQueue;
private final String rabbitExchange;
private final String rabbitExchangeType;
private final String rabbitRoutingKey;
private final boolean rabbitExchangeDurable;
private final boolean rabbitQueueDurable;
private final boolean rabbitQueueAutoDelete;

private final int bulkSize;
private final TimeValue bulkTimeout;
Expand Down Expand Up @@ -84,7 +88,11 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {

rabbitQueue = XContentMapValues.nodeStringValue(rabbitSettings.get("queue"), "elasticsearch");
rabbitExchange = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange"), "elasticsearch");
rabbitExchangeType = XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_type"), "direct");
rabbitRoutingKey = XContentMapValues.nodeStringValue(rabbitSettings.get("routing_key"), "elasticsearch");
rabbitExchangeDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("exchange_durable"), "true")).booleanValue();
rabbitQueueDurable = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_durable"), "true")).booleanValue();
rabbitQueueAutoDelete = Boolean.valueOf(XContentMapValues.nodeStringValue(rabbitSettings.get("queue_auto_delete"), "false")).booleanValue();
} else {
rabbitHost = ConnectionFactory.DEFAULT_HOST;
rabbitPort = ConnectionFactory.DEFAULT_AMQP_PORT;
Expand All @@ -93,7 +101,11 @@ public class RabbitmqRiver extends AbstractRiverComponent implements River {
rabbitVhost = ConnectionFactory.DEFAULT_VHOST;

rabbitQueue = "elasticsearch";
rabbitQueueAutoDelete = false;
rabbitQueueDurable = true;
rabbitExchange = "elasticsearch";
rabbitExchangeType = "direct";
rabbitExchangeDurable = true;
rabbitRoutingKey = "elasticsearch";
}

Expand Down Expand Up @@ -167,8 +179,8 @@ private class Consumer implements Runnable {
QueueingConsumer consumer = new QueueingConsumer(channel);
// define the queue
try {
channel.exchangeDeclare(rabbitExchange/*exchange*/, "direct"/*type*/, true /*durable*/);
channel.queueDeclare(rabbitQueue/*queue*/, true /*durable*/, false/*exclusive*/, false/*autoDelete*/, null);
channel.exchangeDeclare(rabbitExchange/*exchange*/, rabbitExchangeType/*type*/, true /*durable*/);
channel.queueDeclare(rabbitQueue/*queue*/, rabbitQueueDurable/*durable*/, false/*exclusive*/, rabbitQueueAutoDelete/*autoDelete*/, null);
channel.queueBind(rabbitQueue/*queue*/, rabbitExchange/*exchange*/, rabbitRoutingKey/*routingKey*/);
channel.basicConsume(rabbitQueue/*queue*/, false/*noAck*/, consumer);
} catch (Exception e) {
Expand Down Expand Up @@ -278,7 +290,7 @@ private class Consumer implements Runnable {
}

@Override public void onFailure(Throwable e) {
logger.warn("failed to execute bulk for delivery tags , not ack'ing", e, deliveryTags);
logger.warn("failed to execute bulk for delivery tags [{}], not ack'ing", e, deliveryTags);
}
});
}
Expand Down

0 comments on commit d0780f0

Please sign in to comment.