Permalink
Browse files

When failing to parse a bulk message, we ack the delivery tag twice, …

…closes #9.
  • Loading branch information...
kimchy committed Apr 11, 2012
1 parent 548ddc9 commit 0eb8960dbd7d8c8997b7d3164a008967e2a6afc7
Showing with 4 additions and 4 deletions.
  1. +4 −4 src/main/java/org/elasticsearch/river/rabbitmq/RabbitmqRiver.java
@@ -238,15 +238,15 @@ public void run() {
while ((task = consumer.nextDelivery(bulkTimeout.millis())) != null) {
try {
bulkRequestBuilder.add(task.getBody(), 0, task.getBody().length, false);
+ deliveryTags.add(task.getEnvelope().getDeliveryTag());
} catch (Exception e) {
logger.warn("failed to parse request for delivery tag [{}], ack'ing...", e, task.getEnvelope().getDeliveryTag());
try {
channel.basicAck(task.getEnvelope().getDeliveryTag(), false);
- } catch (IOException e1) {
+ } catch (Exception e1) {
logger.warn("failed to ack on failure [{}]", e1, task.getEnvelope().getDeliveryTag());
}
}
- deliveryTags.add(task.getEnvelope().getDeliveryTag());
if (bulkRequestBuilder.numberOfActions() >= bulkSize) {
break;
}
@@ -272,7 +272,7 @@ public void run() {
for (Long deliveryTag : deliveryTags) {
try {
channel.basicAck(deliveryTag, false);
- } catch (IOException e1) {
+ } catch (Exception e1) {
logger.warn("failed to ack [{}]", e1, deliveryTag);
}
}
@@ -290,7 +290,7 @@ public void onResponse(BulkResponse response) {
for (Long deliveryTag : deliveryTags) {
try {
channel.basicAck(deliveryTag, false);
- } catch (IOException e1) {
+ } catch (Exception e1) {
logger.warn("failed to ack [{}]", e1, deliveryTag);
}
}

0 comments on commit 0eb8960

Please sign in to comment.