Skip to content
Permalink
Browse files
ARTEMIS-3778 Streamline Expiration Reaping
Instead of holding a thread and an iterator, we should instead keep moving to next references
without holding any threads. Just with callbacks.
  • Loading branch information
clebertsuconic committed Apr 14, 2022
1 parent ccfd4b7 commit f4bdacbc4cb46b308f80391940890c74bf111ecd
Showing 5 changed files with 43 additions and 61 deletions.
@@ -36,14 +36,14 @@
public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {

private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private ScheduledExecutorService scheduledExecutorService;
protected ScheduledExecutorService scheduledExecutorService;
private boolean startedOwnScheduler;

/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
private TimeUnit timeUnit;
private final Executor executor;
protected final Executor executor;
private volatile boolean isStarted;
private ScheduledFuture future;
private final boolean onDemand;
@@ -30,11 +30,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -1846,43 +1844,37 @@ private final class ExpiryReaper extends ActiveMQScheduledComponent {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}

volatile CountDownLatch inUseLatch;
private Iterator<Queue> iterator;

private Queue currentQueue;

@Override
public void stop() {
super.stop();
// this will do a best effort to stop the current latch.
// no big deal if it failed. this is just to optimize this component stop.
CountDownLatch latch = inUseLatch;
if (latch != null) {
latch.countDown();
public void run() {
if (iterator != null) {
logger.debugf("A previous reaping call has not finished yet, and it is currently working on %s", currentQueue);
return;
}

iterator = iterableOf(getLocalQueues()).iterator();

moveNext();
}

private void done() {
executor.execute(this::moveNext);
}

@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
if (!isStarted()) {
break;
}
try {
CountDownLatch latch = new CountDownLatch(1);
this.inUseLatch = latch;
queue.expireReferences(latch::countDown);
// the idea is in fact to block the Reaper while the Queue is executing reaping.
// This would avoid another eventual expiry to be called if the period for reaping is too small
// This should also avoid bursts in CPU consumption because of the expiry reaping
if (!latch.await(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString()));
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
private void moveNext() {
if (!iterator.hasNext() || !this.isStarted()) {
iterator = null;
currentQueue = null;
return;
}

currentQueue = iterator.next();

// we will expire messages on this queue, once done we move to the next queue
currentQueue.expireReferences(this::done);
}
}

@@ -1823,10 +1823,6 @@ void slowConsumerDetected(String sessionID,
@Message(id = 224012, value = "error releasing resources", format = Message.Format.MESSAGE_FORMAT)
void largeMessageErrorReleasingResources(@Cause Exception e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224013, value = "failed to expire messages for queue", format = Message.Format.MESSAGE_FORMAT)
void errorExpiringMessages(@Cause Exception e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224014, value = "Failed to close session", format = Message.Format.MESSAGE_FORMAT)
void errorClosingSession(@Cause Exception e);
@@ -295,8 +295,6 @@ private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {

private AddressSettingsRepositoryListener addressSettingsRepositoryListener;

private final ExpiryScanner expiryScanner = new ExpiryScanner();

private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);

private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
@@ -2361,13 +2359,10 @@ public void expireReferences(Runnable done) {
}


if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
if (expiryScanner.scannerRunning.incrementAndGet() == 1) {
expiryScanner.doneCallback = done;
}
getExecutor().execute(expiryScanner);
if (!queueDestroyed) {
getExecutor().execute(new ExpiryScanner(done));
} else {
// expire is already happening on this queue, move on!
// queue is destroyed, move on
if (done != null) {
done.run();
}
@@ -2388,24 +2383,25 @@ public boolean isExpirationRedundant() {

class ExpiryScanner implements Runnable {

public Runnable doneCallback;
public AtomicInteger scannerRunning = new AtomicInteger(0);
private final Runnable doneCallback;

ExpiryScanner(Runnable doneCallback) {
this.doneCallback = doneCallback;
}

LinkedListIterator<MessageReference> iter = null;

@Override
public void run() {

boolean expired = false;
boolean hasElements = false;
int elementsIterated = 0;
int elementsExpired = 0;

boolean rescheduled = false;

LinkedList<MessageReference> expiredMessages = new LinkedList<>();
synchronized (QueueImpl.this) {
if (queueDestroyed) {
return;
}

if (logger.isDebugEnabled()) {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}
@@ -2422,7 +2418,7 @@ public void run() {
}

try {
while (postOffice.isStarted() && iter.hasNext()) {
while (!queueDestroyed && postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
if (ref.getMessage().isExpired()) {
@@ -2433,14 +2429,16 @@ public void run() {
iter.remove();
}
if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
logger.debugf("Expiry Scanner on %s ran for %s iteration, scheduling a new one", QueueImpl.this.getName(), elementsIterated);
rescheduled = true;
getExecutor().execute(this);
break;
}
}
} finally {
if (scannerRunning.decrementAndGet() == 0) {
if (!rescheduled) {
logger.debugf("Scanning for expires on %s done", QueueImpl.this.getName());

if (server.hasBrokerQueuePlugins()) {
try {
server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this));
@@ -2454,12 +2452,8 @@ public void run() {

if (doneCallback != null) {
doneCallback.run();
doneCallback = null;
}
}

logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");

}
}

@@ -1670,7 +1670,7 @@ public void testMoveExpire() throws Exception {
clearDataRecreateServerDirs();

Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact
.setMessageExpiryScanPeriod(500);
.setMessageExpiryScanPeriod(10);

server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);

0 comments on commit f4bdacb

Please sign in to comment.