Skip to content

Commit

Permalink
ARTEMIS-3848 High CPU usage on ReadWriteLocks
Browse files Browse the repository at this point in the history
This is caused by too many entries on the HashMap for ThreadLocals.
Also: I'm reviewing some readlock usage on the StorageManager to simplify things a little bit.
  • Loading branch information
clebertsuconic committed May 25, 2022
1 parent c259325 commit 8782ca2
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,23 @@
*/
public abstract class HandlerBase {

//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;
private static class Counter {
int count = 0;
}

// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
/** an actor could be used within an OrderedExecutor. So we need this counter to decide if there's a Handler anywhere in the stack trace */
private static final ThreadLocal<Counter> inHandler = ThreadLocal.withInitial(() -> new Counter());

protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
protected static void enter() {
inHandler.get().count++;
}

public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
public static boolean inHandler() {
return inHandler.get().count > 0;
}

protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
protected static void leave() {
inHandler.get().count--;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
private static final FastThreadLocal<ArrayList<ReadyListener>> readyListenersPool = new FastThreadLocal<>();

private final boolean batchingEnabled;

Expand Down Expand Up @@ -138,51 +138,47 @@ public boolean isOpen() {

@Override
public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);

// We are reusing a previously created ArrayList for this localArray
ArrayList<ReadyListener> localArrayList = readyListenersPool.get();
if (localArrayList == null) {
localArrayList = new ArrayList<>();
readyListenersPool.set(localArrayList);
} else {
localArrayList.clear();
}

synchronized (readyListeners) {
this.ready = ready;

if (ready) {
final int size = this.readyListeners.size();
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
localArrayList.ensureCapacity(size);
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
localArrayList.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
try {
localArrayList.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
localArrayList.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
Expand Down Expand Up @@ -96,7 +95,7 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception;

Page createPage(int page) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -897,8 +896,7 @@ public boolean checkReleasedMemory() {
@Override
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
RouteContextList listCtx) throws Exception {

if (!running) {
return false;
Expand Down Expand Up @@ -941,62 +939,53 @@ public boolean page(Message message,
lock.readLock().unlock();
}

if (managerLock != null) {
managerLock.lock();
}
try {
lock.writeLock().lock();
lock.writeLock().lock();

try {
if (!paging) {
return false;
}
try {
if (!paging) {
return false;
}

final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);

if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}

int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;

currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
}

if (tx != null) {
installPageTransaction(tx, listCtx);
}
}

// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
if (tx != null) {
installPageTransaction(tx, listCtx);
}

page.write(pagedMessage);
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);

if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
page.write(pagedMessage);

if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}

return true;
} finally {
lock.writeLock().unlock();
if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}

return true;
} finally {
if (managerLock != null) {
managerLock.unlock();
}
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,6 @@ void startReplication(ReplicationManager replicationManager,

/**
* Write message to page if we are paging.
* <p>
* This is primarily a {@link PagingStore} call, but as with any other call writing persistent
* data, it must go through here. Both for the sake of replication, and also to ensure that it
* takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
* creation of dead-locks.
*
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,20 @@ public static JournalContent getType(byte type) {

protected final ScheduledExecutorService scheduledExecutorService;

protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false);

// I would rather cache the Closeable instance here..
// I never know when the JRE decides to create a new instance on every call.
// So I'm playing safe here. That's all
protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock;
protected final ArtemisCloseable unlockCloseable = this::unlockCloseable;
protected static final ArtemisCloseable dummyCloseable = () -> { };

private static final ThreadLocal<Boolean> reentrant = ThreadLocal.withInitial(() -> false);

private void unlockCloseable() {
storageManagerLock.readLock().unlock();
reentrant.set(false);
}

protected Journal messageJournal;

Expand Down Expand Up @@ -395,6 +403,12 @@ public void storeReference(final long queueID, final long messageID, final boole

@Override
public ArtemisCloseable closeableReadLock() {
if (reentrant.get()) {
return dummyCloseable;
}

reentrant.set(true);

CriticalCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();

Expand All @@ -415,20 +429,6 @@ public ArtemisCloseable closeableReadLock() {
}
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeLock() {
storageManagerLock.writeLock().lock();
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}

@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
Expand Down Expand Up @@ -2150,17 +2150,9 @@ protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long

@Override
public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
return store.page(msg, tx, listCtx, storageManagerLock.readLock());
try (ArtemisCloseable closeable = closeableReadLock()) {
return store.page(msg, tx, listCtx);
}
}

private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,18 +642,8 @@ public boolean addToPage(PagingStore store,
Message msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
if (store != null) {
return store.page(msg, tx, listCtx, null);
return store.page(msg, tx, listCtx);
} else {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down Expand Up @@ -1165,7 +1165,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private static CoreMessage createMessage(final long id) throws Exception {

final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(ADDRESS, queue);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS));

return msg;
}
Expand Down
Loading

0 comments on commit 8782ca2

Please sign in to comment.