From f4bdacbc4cb46b308f80391940890c74bf111ecd Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 13 Apr 2022 13:17:44 -0400 Subject: [PATCH] ARTEMIS-3778 Streamline Expiration Reaping Instead of holding a thread and an iterator, we should instead keep moving to next references without holding any threads. Just with callbacks. --- .../server/ActiveMQScheduledComponent.java | 4 +- .../core/postoffice/impl/PostOfficeImpl.java | 54 ++++++++----------- .../core/server/ActiveMQServerLogger.java | 4 -- .../artemis/core/server/impl/QueueImpl.java | 40 ++++++-------- .../tests/integration/paging/PagingTest.java | 2 +- 5 files changed, 43 insertions(+), 61 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index 5ff56e59034..6783f7a6723 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -36,14 +36,14 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable { private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class); - private ScheduledExecutorService scheduledExecutorService; + protected ScheduledExecutorService scheduledExecutorService; private boolean startedOwnScheduler; /** initialDelay < 0 would mean no initial delay, use the period instead */ private long initialDelay; private long period; private TimeUnit timeUnit; - private final Executor executor; + protected final Executor executor; private volatile boolean isStarted; private ScheduledFuture future; private final boolean onDemand; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index d6866856f69..94eb81845cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -30,11 +30,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; @@ -1846,43 +1844,37 @@ private final class ExpiryReaper extends ActiveMQScheduledComponent { super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); } - volatile CountDownLatch inUseLatch; + private Iterator iterator; + private Queue currentQueue; @Override - public void stop() { - super.stop(); - // this will do a best effort to stop the current latch. - // no big deal if it failed. this is just to optimize this component stop. - CountDownLatch latch = inUseLatch; - if (latch != null) { - latch.countDown(); + public void run() { + if (iterator != null) { + logger.debugf("A previous reaping call has not finished yet, and it is currently working on %s", currentQueue); + return; } + + iterator = iterableOf(getLocalQueues()).iterator(); + + moveNext(); } + private void done() { + executor.execute(this::moveNext); + } - @Override - public void run() { - // The reaper thread should be finished case the PostOffice is gone - // This is to avoid leaks on PostOffice between stops and starts - for (Queue queue : iterableOf(getLocalQueues())) { - if (!isStarted()) { - break; - } - try { - CountDownLatch latch = new CountDownLatch(1); - this.inUseLatch = latch; - queue.expireReferences(latch::countDown); - // the idea is in fact to block the Reaper while the Queue is executing reaping. - // This would avoid another eventual expiry to be called if the period for reaping is too small - // This should also avoid bursts in CPU consumption because of the expiry reaping - if (!latch.await(10, TimeUnit.SECONDS)) { - ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString())); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); - } + private void moveNext() { + if (!iterator.hasNext() || !this.isStarted()) { + iterator = null; + currentQueue = null; + return; } + + currentQueue = iterator.next(); + + // we will expire messages on this queue, once done we move to the next queue + currentQueue.expireReferences(this::done); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 5568ea18433..76985fa71ef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1823,10 +1823,6 @@ void slowConsumerDetected(String sessionID, @Message(id = 224012, value = "error releasing resources", format = Message.Format.MESSAGE_FORMAT) void largeMessageErrorReleasingResources(@Cause Exception e); - @LogMessage(level = Logger.Level.ERROR) - @Message(id = 224013, value = "failed to expire messages for queue", format = Message.Format.MESSAGE_FORMAT) - void errorExpiringMessages(@Cause Exception e); - @LogMessage(level = Logger.Level.ERROR) @Message(id = 224014, value = "Failed to close session", format = Message.Format.MESSAGE_FORMAT) void errorClosingSession(@Cause Exception e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index b79308c2abf..9e63cbd9cc0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -295,8 +295,6 @@ private void checkIDSupplier(NodeStore nodeStore) { private AddressSettingsRepositoryListener addressSettingsRepositoryListener; - private final ExpiryScanner expiryScanner = new ExpiryScanner(); - private final ReusableLatch deliveriesInTransit = new ReusableLatch(0); private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); @@ -2361,13 +2359,10 @@ public void expireReferences(Runnable done) { } - if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) { - if (expiryScanner.scannerRunning.incrementAndGet() == 1) { - expiryScanner.doneCallback = done; - } - getExecutor().execute(expiryScanner); + if (!queueDestroyed) { + getExecutor().execute(new ExpiryScanner(done)); } else { - // expire is already happening on this queue, move on! + // queue is destroyed, move on if (done != null) { done.run(); } @@ -2388,24 +2383,25 @@ public boolean isExpirationRedundant() { class ExpiryScanner implements Runnable { - public Runnable doneCallback; - public AtomicInteger scannerRunning = new AtomicInteger(0); + private final Runnable doneCallback; + + ExpiryScanner(Runnable doneCallback) { + this.doneCallback = doneCallback; + } + LinkedListIterator iter = null; @Override public void run() { - boolean expired = false; boolean hasElements = false; int elementsIterated = 0; int elementsExpired = 0; + boolean rescheduled = false; + LinkedList expiredMessages = new LinkedList<>(); synchronized (QueueImpl.this) { - if (queueDestroyed) { - return; - } - if (logger.isDebugEnabled()) { logger.debug("Scanning for expires on " + QueueImpl.this.getName()); } @@ -2422,7 +2418,7 @@ public void run() { } try { - while (postOffice.isStarted() && iter.hasNext()) { + while (!queueDestroyed && postOffice.isStarted() && iter.hasNext()) { hasElements = true; MessageReference ref = iter.next(); if (ref.getMessage().isExpired()) { @@ -2433,14 +2429,16 @@ public void run() { iter.remove(); } if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) { - logger.debug("Breaking loop of expiring"); - scannerRunning.incrementAndGet(); + logger.debugf("Expiry Scanner on %s ran for %s iteration, scheduling a new one", QueueImpl.this.getName(), elementsIterated); + rescheduled = true; getExecutor().execute(this); break; } } } finally { - if (scannerRunning.decrementAndGet() == 0) { + if (!rescheduled) { + logger.debugf("Scanning for expires on %s done", QueueImpl.this.getName()); + if (server.hasBrokerQueuePlugins()) { try { server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this)); @@ -2454,12 +2452,8 @@ public void run() { if (doneCallback != null) { doneCallback.run(); - doneCallback = null; } } - - logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done"); - } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index c72f5da9801..9083fecd08e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -1670,7 +1670,7 @@ public void testMoveExpire() throws Exception { clearDataRecreateServerDirs(); Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact - .setMessageExpiryScanPeriod(500); + .setMessageExpiryScanPeriod(10); server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);