From 552c0f0f7a7502c0337b13699de51a11b29723a2 Mon Sep 17 00:00:00 2001 From: "Brian D. Johnson" Date: Fri, 26 Feb 2016 20:19:50 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6188 - reset BaseDestination.lastActiveTime each time a message is delivered to the broker. --- .../broker/region/BaseDestination.java | 3 +- .../broker/region/DestinationGCTest.java | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 528c2ec3c27..5ae2d286af3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -523,6 +523,7 @@ public void messageConsumed(ConnectionContext context, MessageReference messageR */ @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { + this.lastActiveTime = 0L; if (advisoryForDelivery) { broker.messageDelivered(context, messageReference); } @@ -777,7 +778,7 @@ public void markForGC(long timeStamp) { @Override public boolean canGC() { boolean result = false; - if (isGcIfInactive()&& this.lastActiveTime != 0l) { + if (isGcIfInactive() && this.lastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) { result = true; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java index d0936935913..d75f8e3a041 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java @@ -17,12 +17,14 @@ package org.apache.activemq.broker.region; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; @@ -37,8 +39,13 @@ import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DestinationGCTest { + protected static final Logger logger = LoggerFactory.getLogger(DestinationGCTest.class); + private final ActiveMQQueue queue = new ActiveMQQueue("TEST"); private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER"); @@ -137,4 +144,34 @@ public boolean isSatisified() throws Exception { } })); } + + @Test(timeout = 60000) + public void testDestinationGcAnonymousProducer() throws Exception { + + final ActiveMQQueue q = new ActiveMQQueue("Q.TEST.ANONYMOUS.PRODUCER"); + + brokerService.getAdminView().addQueue(q.getPhysicalName()); + assertEquals(2, brokerService.getAdminView().getQueues().length); + + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // wait for the queue to be marked for GC + logger.info("Waiting for '{}' to be marked for GC...", q); + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getDestination(q).canGC(); + } + }, Wait.MAX_WAIT_MILLIS, 500L); + + // create anonymous producer and send a message + logger.info("Sending PERSISTENT message to QUEUE '{}'", q.getPhysicalName()); + final MessageProducer producer = session.createProducer(null); + producer.send(q, session.createTextMessage()); + producer.close(); + + assertFalse(brokerService.getDestination(q).canGC()); + } }