diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index 76b20f9ddd5..efa0cabef69 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -28,14 +28,18 @@ /** This is for components with a scheduled at a fixed rate. */ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable { + private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class); private final ScheduledExecutorService scheduledExecutorService; private long period; + private long millisecondsPeriod; private TimeUnit timeUnit; private final Executor executor; private ScheduledFuture future; private final boolean onDemand; + long lastTime = 0; + private final AtomicInteger delayed = new AtomicInteger(0); public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, @@ -58,6 +62,8 @@ public synchronized void start() { if (future != null) { return; } + + this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS); if (onDemand) { return; } @@ -113,11 +119,6 @@ public synchronized void stop() { } - @Override - public void run() { - delayed.decrementAndGet(); - } - @Override public synchronized boolean isStarted() { return future != null; @@ -132,10 +133,30 @@ private void restartIfNeeded() { } } + final Runnable runForExecutor = new Runnable() { + @Override + public void run() { + if (onDemand && delayed.get() > 0) { + delayed.decrementAndGet(); + } + + if (!onDemand && lastTime > 0) { + if (System.currentTimeMillis() - lastTime < millisecondsPeriod) { + logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution"); + return; + } + } + + lastTime = System.currentTimeMillis(); + + ActiveMQScheduledComponent.this.run(); + } + }; + final Runnable runForScheduler = new Runnable() { @Override public void run() { - executor.execute(ActiveMQScheduledComponent.this); + executor.execute(runForExecutor); } }; diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java new file mode 100644 index 00000000000..bf920e74083 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class ActiveMQScheduledComponentTest { + + @Rule + public ThreadLeakCheckRule rule = new ThreadLeakCheckRule(); + + ScheduledExecutorService scheduledExecutorService; + ExecutorService executorService; + @Before + public void before() { + scheduledExecutorService = new ScheduledThreadPoolExecutor(5); + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void after() { + executorService.shutdown(); + scheduledExecutorService.shutdown(); + } + + @Test + public void testAccumulation() throws Exception { + final AtomicInteger count = new AtomicInteger(0); + + + final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) { + public void run() { + if (count.get() == 0) { + try { + Thread.sleep(800); + } + catch (Exception e) { + } + } + count.incrementAndGet(); + } + }; + + local.start(); + + Thread.sleep(1000); + + local.stop(); + + Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5); + } + +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java index 53f07b8eb74..8ef7e082cdb 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalSync.java @@ -38,7 +38,6 @@ public JDBCJournalSync(ScheduledExecutorService scheduledExecutorService, @Override public void run() { - super.run(); if (journal.isStarted()) { journal.sync(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java index 541743997b4..b0f46150103 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageSyncTimer.java @@ -78,7 +78,6 @@ synchronized void addSync(OperationContext ctx) { @Override public void run() { - super.run(); tick(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 61b46a01458..c8a6966ed92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -27,13 +27,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; @@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueInfo; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -112,8 +113,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private Reaper reaperRunnable; - private volatile Thread reaperThread; - private final long reaperPeriod; private final int reaperPriority; @@ -198,12 +197,6 @@ public synchronized void stop() throws Exception { if (reaperRunnable != null) reaperRunnable.stop(); - if (reaperThread != null) { - reaperThread.join(); - - reaperThread = null; - } - addressManager.clear(); queueInfos.clear(); @@ -1244,12 +1237,9 @@ public synchronized void startExpiryScanner() { if (reaperPeriod > 0) { if (reaperRunnable != null) reaperRunnable.stop(); - reaperRunnable = new Reaper(); - reaperThread = new Thread(reaperRunnable, "activemq-expiry-reaper-thread"); - - reaperThread.setPriority(reaperPriority); + reaperRunnable = new Reaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), reaperPeriod, TimeUnit.MILLISECONDS, false); - reaperThread.start(); + reaperRunnable.start(); } } @@ -1268,48 +1258,38 @@ private ServerMessage createQueueInfoMessage(final NotificationType type, final return message; } - private final class Reaper implements Runnable { + private final class Reaper extends ActiveMQScheduledComponent { - private final CountDownLatch latch = new CountDownLatch(1); - - public void stop() { - latch.countDown(); + Reaper(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + boolean onDemand) { + super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); } @Override public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts - while (isStarted()) { - try { - if (latch.await(reaperPeriod, TimeUnit.MILLISECONDS)) - return; - } - catch (InterruptedException e1) { - throw new ActiveMQInterruptedException(e1); - } - if (!isStarted()) - return; - - Map nameMap = addressManager.getBindings(); + Map nameMap = addressManager.getBindings(); - List queues = new ArrayList<>(); + List queues = new ArrayList<>(); - for (Binding binding : nameMap.values()) { - if (binding.getType() == BindingType.LOCAL_QUEUE) { - Queue queue = (Queue) binding.getBindable(); + for (Binding binding : nameMap.values()) { + if (binding.getType() == BindingType.LOCAL_QUEUE) { + Queue queue = (Queue) binding.getBindable(); - queues.add(queue); - } + queues.add(queue); } + } - for (Queue queue : queues) { - try { - queue.expireReferences(); - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); - } + for (Queue queue : queues) { + try { + queue.expireReferences(); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index 6fc24093bb8..f4ab032fc79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -74,7 +74,6 @@ public synchronized FileStoreMonitor addStore(FileStore store) { @Override public void run() { - super.run(); tick(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 1514748ac1f..99ce43f1aba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1100,13 +1100,17 @@ public synchronized void cancel(final MessageReference reference, final long tim } } - @Override public void expire(final MessageReference ref) throws Exception { - if (expiryAddress != null) { + SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref); + if (messageExpiryAddress == null) { + messageExpiryAddress = expiryAddressFromAddressSettings(ref); + } + + if (messageExpiryAddress != null) { if (logger.isTraceEnabled()) { - logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); + logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); } - move(null, expiryAddress, ref, false, AckReason.EXPIRED); + move(null, messageExpiryAddress, ref, false, AckReason.EXPIRED); } else { if (logger.isTraceEnabled()) { @@ -1116,6 +1120,40 @@ public void expire(final MessageReference ref) throws Exception { } } + private SimpleString expiryAddressFromMessageAddress(MessageReference ref) { + SimpleString messageAddress = extractAddress(ref.getMessage()); + SimpleString expiryAddress = null; + + if (messageAddress == null || messageAddress.equals(getAddress())) { + expiryAddress = getExpiryAddress(); + } + + return expiryAddress; + } + + private SimpleString expiryAddressFromAddressSettings(MessageReference ref) { + SimpleString messageAddress = extractAddress(ref.getMessage()); + SimpleString expiryAddress = null; + + if (messageAddress != null) { + AddressSettings addressSettings = addressSettingsRepository.getMatch(messageAddress.toString()); + + expiryAddress = addressSettings.getExpiryAddress(); + } + + return expiryAddress; + } + + private SimpleString extractAddress(ServerMessage message) { + if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { + return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS); + } + else { + return message.getAddress(); + } + } + + @Override public SimpleString getExpiryAddress() { return this.expiryAddress; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java index 8bce62d9cf7..ff72c37111a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/reload/ReloadManagerImpl.java @@ -44,7 +44,6 @@ public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, Exec @Override public void run() { - super.run(); tick(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java new file mode 100644 index 00000000000..3ce31d473d1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ExpireWhileLoadBalanceTest extends ClusterTestBase { + + @Before + public void setUp() throws Exception { + super.setUp(); + + setupServer(0, isFileStorage(), true); + setupServer(1, isFileStorage(), true); + setupServer(2, isFileStorage(), true); + + for (int i = 0; i < 3; i++) { + servers[i].getConfiguration().setMessageExpiryScanPeriod(100); + } + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, true, 0, 1, 2); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.STRICT, 1, true, 1, 0, 2); + + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.STRICT, 1, true, 2, 0, 1); + + startServers(0, 1, 2); + + setupSessionFactory(0, true); + setupSessionFactory(1, true); + setupSessionFactory(2, true); + } + + @Test + public void testSend() throws Exception { + waitForTopology(getServer(0), 3); + waitForTopology(getServer(1), 3); + waitForTopology(getServer(2), 3); + + SimpleString expiryQueue = SimpleString.toSimpleString("expiryQueue"); + + AddressSettings as = new AddressSettings(); + as.setDeadLetterAddress(expiryQueue); + as.setExpiryAddress(expiryQueue); + + for (int i = 0; i <= 2; i++) { + createQueue(i, "queues.testaddress", "queue0", null, true); + getServer(i).createQueue(expiryQueue, expiryQueue, null, true, false); + getServer(i).getAddressSettingsRepository().addMatch("queues.*", as); + + } + + // this will pause all the cluster bridges + for (ClusterConnection clusterConnection : getServer(0).getClusterManager().getClusterConnections()) { + for (MessageFlowRecord record : ((ClusterConnectionImpl) clusterConnection).getRecords().values()) { + record.getBridge().pause(); + } + } + + ClientSessionFactory sf = sfs[0]; + + ClientSession session = sf.createSession(false, false); + ClientProducer producer = session.createProducer("queues.testaddress"); + + for (int i = 0; i < 1000; i++) { + ClientMessage message = session.createMessage(true); + message.setExpiration(500); + producer.send(message); + } + + session.commit(); + + session.start(); + + ClientConsumer consumer = session.createConsumer("expiryQueue"); + for (int i = 0; i < 1000; i++) { + ClientMessage message = consumer.receive(2000); + Assert.assertNotNull(message); + message.acknowledge(); + } + + session.commit(); + + session.close(); + + } +}