diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java index 7b6cc70d710..bc456e765cd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java @@ -25,27 +25,23 @@ */ public abstract class HandlerBase { - //marker instance used to recognize if a thread is performing a packet handling - private static final Object DUMMY = Boolean.TRUE; + private static class Counter { + int count = 0; + } - // this cannot be static as the Actor will be used within another executor. For that reason - // each instance will have its own ThreadLocal. - // ... a thread that has its thread-local map populated with DUMMY while performing a handler - private final ThreadLocal inHandler = new ThreadLocal<>(); + /** an actor could be used within an OrderedExecutor. So we need this counter to decide if there's a Handler anywhere in the stack trace */ + private static final ThreadLocal inHandler = ThreadLocal.withInitial(() -> new Counter()); - protected void enter() { - assert inHandler.get() == null : "should be null"; - inHandler.set(DUMMY); + protected static void enter() { + inHandler.get().count++; } - public boolean inHandler() { - final Object dummy = inHandler.get(); - return dummy != null; + public static boolean inHandler() { + return inHandler.get().count > 0; } - protected void leave() { - assert inHandler.get() != null : "marker not set"; - inHandler.set(null); + protected static void leave() { + inHandler.get().count--; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 6d4d5d15d83..0a58e04f8fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -62,7 +62,7 @@ public class NettyConnection implements Connection { * here for when the connection (or Netty Channel) becomes available again. */ private final List readyListeners = new ArrayList<>(); - private final FastThreadLocal> localListenersPool = new FastThreadLocal<>(); + private static final FastThreadLocal> readyListenersPool = new FastThreadLocal<>(); private final boolean batchingEnabled; @@ -138,51 +138,47 @@ public boolean isOpen() { @Override public final void fireReady(final boolean ready) { - ArrayList readyToCall = localListenersPool.get(); - if (readyToCall != null) { - localListenersPool.set(null); + + // We are reusing a previously created ArrayList for this localArray + ArrayList localArrayList = readyListenersPool.get(); + if (localArrayList == null) { + localArrayList = new ArrayList<>(); + readyListenersPool.set(localArrayList); + } else { + localArrayList.clear(); } + synchronized (readyListeners) { this.ready = ready; if (ready) { final int size = this.readyListeners.size(); - if (readyToCall != null) { - readyToCall.ensureCapacity(size); - } + localArrayList.ensureCapacity(size); try { for (int i = 0; i < size; i++) { final ReadyListener readyListener = readyListeners.get(i); if (readyListener == null) { break; } - if (readyToCall == null) { - readyToCall = new ArrayList<>(size); - } - readyToCall.add(readyListener); + localArrayList.add(readyListener); } } finally { readyListeners.clear(); } } } - if (readyToCall != null) { - try { - readyToCall.forEach(readyListener -> { - try { - readyListener.readyForWriting(); - } catch (Throwable logOnly) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); - } - }); - } catch (Throwable t) { - ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); - } finally { - readyToCall.clear(); - if (localListenersPool.get() != null) { - localListenersPool.set(readyToCall); + try { + localArrayList.forEach(readyListener -> { + try { + readyListener.readyForWriting(); + } catch (Throwable logOnly) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly); } - } + }); + } catch (Throwable t) { + ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t); + } finally { + localArrayList.clear(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 3c3cac4098c..a3aec161915 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -18,7 +18,6 @@ import java.io.File; import java.util.Collection; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; @@ -89,14 +88,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener /** * Write message to page if we are paging. * - * @param readLock a read lock from the storage manager. This is an encapsulation violation made - * to keep the code less complex. If give {@code null} the method will throw a - * {@link NullPointerException} * @return {@code true} if we are paging and have handled the data, {@code false} if the data * needs to be sent to the journal * @throws NullPointerException if {@code readLock} is null */ - boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception; + boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception; Page createPage(int page) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 13b713c9b9c..0af5768dd4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -897,8 +896,7 @@ public boolean checkReleasedMemory() { @Override public boolean page(Message message, final Transaction tx, - RouteContextList listCtx, - final ReadLock managerLock) throws Exception { + RouteContextList listCtx) throws Exception { if (!running) { return false; @@ -941,62 +939,53 @@ public boolean page(Message message, lock.readLock().unlock(); } - if (managerLock != null) { - managerLock.lock(); - } - try { - lock.writeLock().lock(); + lock.writeLock().lock(); - try { - if (!paging) { - return false; - } + try { + if (!paging) { + return false; + } - final long transactionID = tx == null ? -1 : tx.getID(); - PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); + final long transactionID = tx == null ? -1 : tx.getID(); + PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID); - if (message.isLargeMessage()) { - ((LargeServerMessage) message).setPaged(); - } + if (message.isLargeMessage()) { + ((LargeServerMessage) message).setPaged(); + } - int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD; + int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD; + currentPageSize += bytesToWrite; + if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) { + // Make sure nothing is currently validating or using currentPage + openNewPage(); currentPageSize += bytesToWrite; - if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) { - // Make sure nothing is currently validating or using currentPage - openNewPage(); - currentPageSize += bytesToWrite; - } - - if (tx != null) { - installPageTransaction(tx, listCtx); - } + } - // the apply counter will make sure we write a record on journal - // especially on the case for non transactional sends and paging - // doing this will give us a possibility of recovering the page counters - long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0; - final Page page = currentPage; - applyPageCounters(tx, page, listCtx, persistentSize); + if (tx != null) { + installPageTransaction(tx, listCtx); + } - page.write(pagedMessage); + // the apply counter will make sure we write a record on journal + // especially on the case for non transactional sends and paging + // doing this will give us a possibility of recovering the page counters + long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0; + final Page page = currentPage; + applyPageCounters(tx, page, listCtx, persistentSize); - if (tx == null && syncNonTransactional && message.isDurable()) { - sync(); - } + page.write(pagedMessage); - if (logger.isTraceEnabled()) { - logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId()); - } + if (tx == null && syncNonTransactional && message.isDurable()) { + sync(); + } - return true; - } finally { - lock.writeLock().unlock(); + if (logger.isTraceEnabled()) { + logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId()); } + + return true; } finally { - if (managerLock != null) { - managerLock.unlock(); - } + lock.writeLock().unlock(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index ee747a6296e..8c759065416 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -436,11 +436,6 @@ void startReplication(ReplicationManager replicationManager, /** * Write message to page if we are paging. - *

- * This is primarily a {@link PagingStore} call, but as with any other call writing persistent - * data, it must go through here. Both for the sake of replication, and also to ensure that it - * takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the - * creation of dead-locks. * * @return {@code true} if we are paging and have handled the data, {@code false} if the data * needs to be sent to the journal diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1c5532ea733..1656f989259 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -174,12 +174,20 @@ public static JournalContent getType(byte type) { protected final ScheduledExecutorService scheduledExecutorService; - protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); + protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false); // I would rather cache the Closeable instance here.. // I never know when the JRE decides to create a new instance on every call. // So I'm playing safe here. That's all - protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock; + protected final ArtemisCloseable unlockCloseable = this::unlockCloseable; + protected static final ArtemisCloseable dummyCloseable = () -> { }; + + private static final ThreadLocal reentrant = ThreadLocal.withInitial(() -> false); + + private void unlockCloseable() { + storageManagerLock.readLock().unlock(); + reentrant.set(false); + } protected Journal messageJournal; @@ -395,6 +403,12 @@ public void storeReference(final long queueID, final long messageID, final boole @Override public ArtemisCloseable closeableReadLock() { + if (reentrant.get()) { + return dummyCloseable; + } + + reentrant.set(true); + CriticalCloseable measure = measureCritical(CRITICAL_STORE); storageManagerLock.readLock().lock(); @@ -415,20 +429,6 @@ public ArtemisCloseable closeableReadLock() { } } - /** - * for internal use and testsuite, don't use it outside of tests - */ - public void writeLock() { - storageManagerLock.writeLock().lock(); - } - - /** - * for internal use and testsuite, don't use it outside of tests - */ - public void writeUnlock() { - storageManagerLock.writeLock().unlock(); - } - @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { @@ -2150,17 +2150,9 @@ protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long @Override public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception { - /** - * Exposing the read-lock here is an encapsulation violation done in order to keep the code - * simpler. The alternative would be to add a second method, say 'verifyPaging', to - * PagingStore. - *

- * Adding this second method would also be more surprise prone as it would require a certain - * calling order. - *

- * The reasoning is that exposing the lock is more explicit and therefore `less bad`. - */ - return store.page(msg, tx, listCtx, storageManagerLock.readLock()); + try (ArtemisCloseable closeable = closeableReadLock()) { + return store.page(msg, tx, listCtx); + } } private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 2528e02fc0a..51e4f54ce3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -642,18 +642,8 @@ public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception { - /** - * Exposing the read-lock here is an encapsulation violation done in order to keep the code - * simpler. The alternative would be to add a second method, say 'verifyPaging', to - * PagingStore. - *

- * Adding this second method would also be more surprise prone as it would require a certain - * calling order. - *

- * The reasoning is that exposing the lock is more explicit and therefore `less bad`. - */ if (store != null) { - return store.page(msg, tx, listCtx, null); + return store.page(msg, tx, listCtx); } else { return false; } diff --git a/pom.xml b/pom.xml index 5e976fef006..090a0c114db 100644 --- a/pom.xml +++ b/pom.xml @@ -1131,7 +1131,7 @@ --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED -XDcompilePolicy=simple - -Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* + -Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* @@ -1165,7 +1165,7 @@ --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED -XDcompilePolicy=simple - -Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* + -Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.* -J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED -J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java index 0c2cba8d28e..7b96d71085e 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnCursorIteratorTest.java @@ -198,7 +198,7 @@ private static CoreMessage createMessage(final long id) throws Exception { final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(ADDRESS, queue); - pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock); + pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)); return msg; } diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 4923a74cfe1..efbefc314cc 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; @@ -354,8 +353,7 @@ public void ioSync() throws Exception { @Override public boolean page(Message message, Transaction tx, - RouteContextList listCtx, - ReentrantReadWriteLock.ReadLock readLock) throws Exception { + RouteContextList listCtx) throws Exception { return false; } diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java index 2f393380329..df9d5567201 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java @@ -408,7 +408,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); PagedReference readMessage = iterator.next(); @@ -441,7 +441,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } PagedReference readMessage = iterator.next(); @@ -471,7 +471,7 @@ public void testConsumeLivePage() throws Exception { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } PagedReference readMessage = iterator.next(); @@ -556,7 +556,7 @@ public void run() { msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } if (tx != null) { @@ -775,7 +775,7 @@ private int addMessages(final int start, final int numMessages, final int messag msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); - Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock)); + Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS))); } return pageStore.getNumberOfPages(); @@ -872,7 +872,7 @@ private Transaction pgMessages(StorageManager storage, Message msg = new CoreMessage(storage.generateID(), buffer.writerIndex()); msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex()); msg.putIntProperty("key", i); - pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock); + pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)); } return txImpl; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java index bf7cc83b222..1097514772a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java @@ -66,11 +66,11 @@ public void testPagingManager() throws Exception { ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); store.startPaging(); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); Page page = store.depage(); @@ -89,7 +89,7 @@ public void testPagingManager() throws Exception { Assert.assertNull(store.depage()); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 0e43558dab1..640825ac663 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -176,7 +176,7 @@ public void testStore() throws Exception { Assert.assertTrue(storeImpl.isPaging()); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); Assert.assertEquals(1, storeImpl.getNumberOfPages()); @@ -219,7 +219,7 @@ public void testDepageOnCurrentPage() throws Exception { Message msg = createMessage(i, storeImpl, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); } @@ -286,7 +286,7 @@ public void testRemoveInTheMiddle() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(); page++; @@ -372,7 +372,7 @@ public void testRemoveCurrentPage() throws Exception { msg.putIntProperty("page", page); final RoutingContextImpl ctx = new RoutingContextImpl(null); ctx.addQueue(fakeQueue.getName(), fakeQueue); - Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock)); + Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()))); if (i > 0 && i % 10 == 0) { storeImpl.forceAnotherPage(); page++; @@ -474,7 +474,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(i, store, destination, buffer); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); } Assert.assertEquals(2, store.getNumberOfPages()); @@ -509,7 +509,7 @@ public void testDepageMultiplePages() throws Exception { Message msg = createMessage(1, store, destination, buffers.get(0)); final RoutingContextImpl ctx = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()))); Page newPage = store.depage(); @@ -529,14 +529,14 @@ public void testDepageMultiplePages() throws Exception { { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } store.startPaging(); { final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)); + Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))); } Page page = store.depage(); @@ -619,7 +619,7 @@ public void run() { // Just using the internal API to remove it from the page file system Message msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5)); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock)) { + if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()))) { buffers.put(id, msg); } else { break; @@ -736,7 +736,7 @@ public void run() { storeImpl2.forceAnotherPage(); final RoutingContextImpl ctx = new RoutingContextImpl(null); - storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName()), lock); + storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName())); buffers2.put(lastMessageId, lastMsg); Page lastPage = null; @@ -858,7 +858,7 @@ public void run() { msg.putLongProperty("count", i); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)) { + while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))) { store.startPaging(); } @@ -1282,7 +1282,7 @@ protected void writePageMessage(final PagingStore storeImpl, msg.putLongProperty("count", id); final RoutingContextImpl ctx2 = new RoutingContextImpl(null); - storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock); + storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName())); } private CoreMessage createMessage(final long id,