From c2593251ccedc2ace8cf12be5cc23bb042d221f8 Mon Sep 17 00:00:00 2001 From: clebert Date: Wed, 25 May 2022 11:32:14 -0400 Subject: [PATCH 1/2] NO-JIRA removing useless assertionRemainingMessages JMSTestCase is deprecated anyway. in its older form many many years ago a server would be reused over between tests. what forced us to make such verification to avoid messages from one test leaking into the next. This was because a server startup was expensive many years ago (less efficient code and the hardware available 10 years ago) with the current state of things this is not needed as the server will be started from scratch on every test --- .../org/apache/activemq/artemis/jms/tests/JMSTestCase.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/JMSTestCase.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/JMSTestCase.java index 45f5a4923e8..508e9c2cd03 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/JMSTestCase.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/JMSTestCase.java @@ -69,8 +69,6 @@ public void setUp() throws Exception { cf = new ActiveMQJMSConnectionFactory("tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"); queueCf = new ActiveMQQueueConnectionFactory("tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"); topicCf = new ActiveMQTopicConnectionFactory("tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"); - - assertRemainingMessages(0); } protected final JMSContext createContext() { @@ -115,8 +113,6 @@ public void tearDown() throws Exception { } cf = null; - - assertRemainingMessages(0); } protected Connection createConnection(ConnectionFactory cf1) throws JMSException { From b3c1f51e8994f2cdbf35d3b038ce3f78deff30bb Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 25 May 2022 09:21:51 -0400 Subject: [PATCH 2/2] ARTEMIS-3848 High CPU usage on ReadWriteLocks This is caused by too many entries on the HashMap for ThreadLocals. Also: I'm reviewing some readlock usage on the StorageManager to simplify things a little bit. --- .../artemis/utils/actors/HandlerBase.java | 26 +++--- .../remoting/impl/netty/NettyConnection.java | 50 ++++++------ .../artemis/core/paging/PagingStore.java | 6 +- .../core/paging/impl/PagingStoreImpl.java | 81 ++++++++----------- .../core/persistence/StorageManager.java | 5 -- .../AbstractJournalStorageManager.java | 46 +++++------ .../impl/nullpm/NullStorageManager.java | 12 +-- pom.xml | 4 +- .../byteman/RaceOnCursorIteratorTest.java | 2 +- .../storage/PersistMultiThreadTest.java | 4 +- .../stress/paging/PageCursorStressTest.java | 12 +-- .../paging/impl/PagingManagerImplTest.java | 6 +- .../core/paging/impl/PagingStoreImplTest.java | 24 +++--- 13 files changed, 115 insertions(+), 163 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java index 7b6cc70d710..bc456e765cd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java @@ -25,27 +25,23 @@ */ public abstract class HandlerBase { - //marker instance used to recognize if a thread is performing a packet handling - private static final Object DUMMY = Boolean.TRUE; + private static class Counter { + int count = 0; + } - // this cannot be static as the Actor will be used within another executor. For that reason - // each instance will have its own ThreadLocal. - // ... a thread that has its thread-local map populated with DUMMY while performing a handler - private final ThreadLocal inHandler = new ThreadLocal<>(); + /** an actor could be used within an OrderedExecutor. So we need this counter to decide if there's a Handler anywhere in the stack trace */ + private static final ThreadLocal inHandler = ThreadLocal.withInitial(() -> new Counter()); - protected void enter() { - assert inHandler.get() == null : "should be null"; - inHandler.set(DUMMY); + protected static void enter() { + inHandler.get().count++; } - public boolean inHandler() { - final Object dummy = inHandler.get(); - return dummy != null; + public static boolean inHandler() { + return inHandler.get().count > 0; } - protected void leave() { - assert inHandler.get() != null : "marker not set"; - inHandler.set(null); + protected static void leave() { + inHandler.get().count--; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 6d4d5d15d83..0a58e04f8fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -62,7 +62,7 @@ public class NettyConnection implements Connection { * here for when the connection (or Netty Channel) becomes available again. */ private final List readyListeners = new ArrayList<>(); - private final FastThreadLocal> localListenersPool = new FastThreadLocal<>(); + private static final FastThreadLocal> readyListenersPool = new FastThreadLocal<>(); private final boolean batchingEnabled; @@ -138,51 +138,47 @@ public boolean isOpen() { @Override public final void fireReady(final boolean ready) { - ArrayList readyToCall = localListenersPool.get(); - if (readyToCall != null) { - localListenersPool.set(null); + + // We are reusing a previously created ArrayList for this localArray + ArrayList localArrayList = readyListenersPool.get(); + if (localArrayList == null) { + localArrayList = new ArrayList<>(); + readyListenersPool.set(localArrayList); + } else { + localArrayList.clear(); } + synchronized (readyListeners) { this.ready = ready; if (ready) { final int size = this.readyListeners.size(); - if (readyToCall != null) { - readyToCall.ensureCapacity(size); - } + localArrayList.ensureCapacity(size); try { for (int i = 0; i < size; i++) { final ReadyListener readyListener = readyListeners.get(i); if (readyListener == null) { break; } - if (readyToCall == null) { - readyToCall = new ArrayList<>(size); - } - readyToCall.add(readyListener); + localArrayList.add(readyListener); } } finally { readyListeners.clear(); } } } - if (readyToCall != null) { - try { - readyToCall.forEach(readyListener -> { - try { - readyListener.readyForWriting(); - } catch (Throwable logOnly) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); - } - }); - } catch (Throwable t) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); - } finally { - readyToCall.clear(); - if (localListenersPool.get() != null) { - localListenersPool.set(readyToCall); + try { + localArrayList.forEach(readyListener -> { + try { + readyListener.readyForWriting(); + } catch (Throwable logOnly) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); } - } + }); + } catch (Throwable t) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); + } finally { + localArrayList.clear(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 3c3cac4098c..a3aec161915 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -18,7 +18,6 @@ import java.io.File; import java.util.Collection; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; @@ -89,14 +88,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener /** * Write message to page if we are paging. * - * @param readLock a read lock from the storage manager. This is an encapsulation violation made - * to keep the code less complex. If give {@code null} the method will throw a - * {@link NullPointerException} * @return {@code true} if we are paging and have handled the data, {@code false} if the data * needs to be sent to the journal * @throws NullPointerException if {@code readLock} is null */ - boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; + boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception; Page createPage(int page) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 13b713c9b9c..0af5768dd4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -897,8 +896,7 @@ public boolean checkReleasedMemory() { @Override public boolean page(Message message, final Transaction tx, - RouteContextList listCtx, - final ReadLock managerLock) throws Exception { + RouteContextList listCtx) throws Exception { if (!running) { return false; @@ -941,62 +939,53 @@ public boolean page(Message message, lock.readLock().unlock(); } - if (managerLock != null) { - managerLock.lock(); - } - try { - lock.writeLock().lock(); + lock.writeLock().lock(); - try { - if (!paging) { - return false; - } + try { + if (!paging) { + return false; + } - final long transactionID = tx == null ? -1 : tx.getID(); - PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); + final long transactionID = tx == null ? -1 : tx.getID(); + PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); - if (message.isLargeMessage()) { - ((LargeServerMessage) message).setPaged(); - } + if (message.isLargeMessage()) { + ((LargeServerMessage) message).setPaged(); + } - int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD; + int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD; + currentPageSize += bytesToWrite; + if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) { + // Make sure nothing is currently validating or using currentPage + openNewPage(); currentPageSize += bytesToWrite; - if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) { - // Make sure nothing is currently validating or using currentPage - openNewPage(); - currentPageSize += bytesToWrite; - } - - if (tx != null) { - installPageTransaction(tx, listCtx); - } + } - // the apply counter will make sure we write a record on journal - // especially on the case for non transactional sends and paging - // doing this will give us a possibility of recovering the page counters - long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0; - final Page page = currentPage; - applyPageCounters(tx, page, listCtx, persistentSize); + if (tx != null) { + installPageTransaction(tx, listCtx); + } - page.write(pagedMessage); + // the apply counter will make sure we write a record on journal + // especially on the case for non transactional sends and paging + // doing this will give us a possibility of recovering the page counters + long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0; + final Page page = currentPage; + applyPageCounters(tx, page, listCtx, persistentSize); - if (tx == null && syncNonTransactional && message.isDurable()) { - sync(); - } + page.write(pagedMessage); - if (logger.isTraceEnabled()) { - logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId()); - } + if (tx == null && syncNonTransactional && message.isDurable()) { + sync(); + } - return true; - } finally { - lock.writeLock().unlock(); + if (logger.isTraceEnabled()) { + logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId()); } + + return true; } finally { - if (managerLock != null) { - managerLock.unlock(); - } + lock.writeLock().unlock(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index ee747a6296e..8c759065416 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -436,11 +436,6 @@ void startReplication(ReplicationManager replicationManager, /** * Write message to page if we are paging. - *

- * This is primarily a {@link PagingStore} call, but as with any other call writing persistent - * data, it must go through here. Both for the sake of replication, and also to ensure that it - * takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the - * creation of dead-locks. * * @return {@code true} if we are paging and have handled the data, {@code false} if the data * needs to be sent to the journal diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1c5532ea733..1656f989259 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -174,12 +174,20 @@ public static JournalContent getType(byte type) { protected final ScheduledExecutorService scheduledExecutorService; - protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); + protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false); // I would rather cache the Closeable instance here.. // I never know when the JRE decides to create a new instance on every call. // So I'm playing safe here. That's all - protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock; + protected final ArtemisCloseable unlockCloseable = this::unlockCloseable; + protected static final ArtemisCloseable dummyCloseable = () -> { }; + + private static final ThreadLocal reentrant = ThreadLocal.withInitial(() -> false); + + private void unlockCloseable() { + storageManagerLock.readLock().unlock(); + reentrant.set(false); + } protected Journal messageJournal; @@ -395,6 +403,12 @@ public void storeReference(final long queueID, final long messageID, final boole @Override public ArtemisCloseable closeableReadLock() { + if (reentrant.get()) { + return dummyCloseable; + } + + reentrant.set(true); + CriticalCloseable measure = measureCritical(CRITICAL_STORE); storageManagerLock.readLock().lock(); @@ -415,20 +429,6 @@ public ArtemisCloseable closeableReadLock() { } } - /** - * for internal use and testsuite, don't use it outside of tests - */ - public void writeLock() { - storageManagerLock.writeLock().lock(); - } - - /** - * for internal use and testsuite, don't use it outside of tests - */ - public void writeUnlock() { - storageManagerLock.writeLock().unlock(); - } - @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { @@ -2150,17 +2150,9 @@ protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long @Override public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception { - /** - * Exposing the read-lock here is an encapsulation violation done in order to keep the code - * simpler. The alternative would be to add a second method, say 'verifyPaging', to - * PagingStore. - *

- * Adding this second method would also be more surprise prone as it would require a certain - * calling order. - *

- * The reasoning is that exposing the lock is more explicit and therefore `less bad`. - */ - return store.page(msg, tx, listCtx, storageManagerLock.readLock()); + try (ArtemisCloseable closeable = closeableReadLock()) { + return store.page(msg, tx, listCtx); + } } private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 2528e02fc0a..51e4f54ce3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -642,18 +642,8 @@ public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception { - /** - * Exposing the read-lock here is an encapsulation violation done in order to keep the code - * simpler. The alternative would be to add a second method, say 'verifyPaging', to - * PagingStore. - *

- * Adding this second method would also be more surprise prone as it would require a certain - * calling order. - *

- * The reasoning is that exposing the lock is more explicit and therefore `less bad`. - */ if (store != null) { - return store.page(msg, tx, listCtx, null); + return store.page(msg, tx, listCtx); } else { return false; } diff --git a/pom.xml b/pom.xml index 5e976fef006..090a0c114db 100644 --- a/pom.xml +++ b/pom.xml @@ -1131,7 +1131,7 @@ --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED -XDcompilePolicy=simple - -Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* + -Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* @@ -1165,7 +1165,7 @@ --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED -XDcompilePolicy=simple - -Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* + -Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* -J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java index 0c2cba8d28e..7b96d71085e 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java @@ -198,7 +198,7 @@ private static CoreMessage createMessage(final long id) throws Exception { final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(ADDRESS, queue); - pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock); + pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)); return msg; } diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 4923a74cfe1..efbefc314cc 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; @@ -354,8 +353,7 @@ public void ioSync() throws Exception { @Override public boolean page(Message message, Transaction tx, - RouteContextList listCtx, - ReentrantReadWriteLock.ReadLock readLock) throws Exception { + RouteContextList listCtx) throws Exception { return false; } diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java index 2f393380329..df9d5567201 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java @@ -408,7 +408,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); PagedReference readMessage = iterator.next(); @@ -441,7 +441,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } PagedReference readMessage = iterator.next(); @@ -471,7 +471,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } PagedReference readMessage = iterator.next(); @@ -556,7 +556,7 @@ public void run() { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } if (tx != null) { @@ -775,7 +775,7 @@ private int addMessages(final int start, final int numMessages, final int messag msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } return pageStore.getNumberOfPages(); @@ -872,7 +872,7 @@ private Transaction pgMessages(StorageManager storage, Message msg = new CoreMessage(storage.generateID(), buffer.writerIndex()); msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); msg.putIntProperty("key", i); - pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock); + pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)); } return txImpl; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java index bf7cc83b222..1097514772a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java @@ -66,11 +66,11 @@ public void testPagingManager() throws Exception { ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); store.startPaging(); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); Page page = store.depage(); @@ -89,7 +89,7 @@ public void testPagingManager() throws Exception { Assert.assertNull(store.depage()); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 0e43558dab1..640825ac663 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -176,7 +176,7 @@ public void testStore() throws Exception { Assert.assertTrue(storeImpl.isPaging()); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); Assert.assertEquals(1, storeImpl.getNumberOfPages()); @@ -219,7 +219,7 @@ public void testDepageOnCurrentPage() throws Exception { Message msg = createMessage(i, storeImpl, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); } @@ -286,7 +286,7 @@ public void testRemoveInTheMiddle() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(); page++; @@ -372,7 +372,7 @@ public void testRemoveCurrentPage() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(); page++; @@ -474,7 +474,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(i, store, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); } Assert.assertEquals(2, store.getNumberOfPages()); @@ -509,7 +509,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(1, store, destination, buffers.get(0)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); Page newPage = store.depage(); @@ -529,14 +529,14 @@ public void testDepageMultiplePages() throws Exception { { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } store.startPaging(); { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } Page page = store.depage(); @@ -619,7 +619,7 @@ public void run() { // Just using the internal API to remove it from the page file system Message msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5)); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock)) { + if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()))) { buffers.put(id, msg); } else { break; @@ -736,7 +736,7 @@ public void run() { storeImpl2.forceAnotherPage(); final RoutingContextImpl ctx = new RoutingContextImpl(null); - storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName()), lock); + storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName())); buffers2.put(lastMessageId, lastMsg); Page lastPage = null; @@ -858,7 +858,7 @@ public void run() { msg.putLongProperty("count", i); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)) { + while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))) { store.startPaging(); } @@ -1282,7 +1282,7 @@ protected void writePageMessage(final PagingStore storeImpl, msg.putLongProperty("count", id); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock); + storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName())); } private CoreMessage createMessage(final long id,