From 825b7bb72f635138d6578f9c19db88ac7ca77fcf Mon Sep 17 00:00:00 2001 From: metatechbe Date: Thu, 12 Mar 2015 16:53:30 +0100 Subject: [PATCH 1/2] AMQ-5659 Safety measure against infinite loop in "purge" (V2) --- .../org/apache/activemq/broker/region/Queue.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c3de541c887..af2e4817187 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1223,6 +1223,8 @@ public QueueMessageReference getMessage(String id) { public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; + long previousDequeueCount = -1; + long previousDequeueCountRepeated = 0L; long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false); // signal no expiry processing needed. @@ -1234,6 +1236,19 @@ public void purge() throws Exception { } for (MessageReference ref : list) { + long currentDequeueCount = this.destinationStatistics.getDequeues().getCount(); + if (previousDequeueCount == currentDequeueCount) { + previousDequeueCountRepeated++; + if (previousDequeueCountRepeated > 3) { + // Break the infinite loop in case the removal fails + // 3 times in a row -> error is fatal and not transient. + LOG.error("Aborted purge operation after attempting to delete messages"); + throw new RuntimeException("Purge operation failed to delete messages"); + } + } else { + previousDequeueCount = currentDequeueCount; + previousDequeueCountRepeated = 0L; + } try { QueueMessageReference r = (QueueMessageReference) ref; removeMessage(c, r); From 33ca67774be52b13799681c09583ec4112aac2ea Mon Sep 17 00:00:00 2001 From: metatechbe Date: Fri, 13 Mar 2015 10:23:57 +0100 Subject: [PATCH 2/2] Fix counter threshold --- .../main/java/org/apache/activemq/broker/region/Queue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index af2e4817187..5eaa294ca2a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1224,7 +1224,7 @@ public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; long previousDequeueCount = -1; - long previousDequeueCountRepeated = 0L; + long previousDequeueCountRepeated = 1L; long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false); // signal no expiry processing needed. @@ -1239,7 +1239,7 @@ public void purge() throws Exception { long currentDequeueCount = this.destinationStatistics.getDequeues().getCount(); if (previousDequeueCount == currentDequeueCount) { previousDequeueCountRepeated++; - if (previousDequeueCountRepeated > 3) { + if (previousDequeueCountRepeated >= 3) { // Break the infinite loop in case the removal fails // 3 times in a row -> error is fatal and not transient. LOG.error("Aborted purge operation after attempting to delete messages");