Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3848 High CPU usage on ReadWriteLocks #4098

Merged
merged 2 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,23 @@
*/
public abstract class HandlerBase {

//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;
private static class Counter {
int count = 0;
}

// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
/** an actor could be used within an OrderedExecutor. So we need this counter to decide if there's a Handler anywhere in the stack trace */
private static final ThreadLocal<Counter> inHandler = ThreadLocal.withInitial(() -> new Counter());

protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
protected static void enter() {
inHandler.get().count++;
}

public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
public static boolean inHandler() {
return inHandler.get().count > 0;
}

protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
protected static void leave() {
inHandler.get().count--;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
private static final FastThreadLocal<ArrayList<ReadyListener>> readyListenersPool = new FastThreadLocal<>();

private final boolean batchingEnabled;

Expand Down Expand Up @@ -138,51 +138,47 @@ public boolean isOpen() {

@Override
public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);

// We are reusing a previously created ArrayList for this localArray
ArrayList<ReadyListener> localArrayList = readyListenersPool.get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franz1981 I'm changing this code you wrote here. This was also introducing many entires on the ThreadLocal.

if (localArrayList == null) {
localArrayList = new ArrayList<>();
readyListenersPool.set(localArrayList);
} else {
localArrayList.clear();
}

synchronized (readyListeners) {
this.ready = ready;

if (ready) {
final int size = this.readyListeners.size();
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
localArrayList.ensureCapacity(size);
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
localArrayList.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
try {
localArrayList.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
localArrayList.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
Expand Down Expand Up @@ -89,14 +88,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
/**
* Write message to page if we are paging.
*
* @param readLock a read lock from the storage manager. This is an encapsulation violation made
* to keep the code less complex. If give {@code null} the method will throw a
* {@link NullPointerException}
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception;

Page createPage(int page) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -897,8 +896,7 @@ public boolean checkReleasedMemory() {
@Override
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
RouteContextList listCtx) throws Exception {

if (!running) {
return false;
Expand Down Expand Up @@ -941,62 +939,53 @@ public boolean page(Message message,
lock.readLock().unlock();
}

if (managerLock != null) {
managerLock.lock();
}
try {
lock.writeLock().lock();
lock.writeLock().lock();

try {
if (!paging) {
return false;
}
try {
if (!paging) {
return false;
}

final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);

if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}

int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;

currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
}

if (tx != null) {
installPageTransaction(tx, listCtx);
}
}

// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
if (tx != null) {
installPageTransaction(tx, listCtx);
}

page.write(pagedMessage);
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);

if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
page.write(pagedMessage);

if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}

return true;
} finally {
lock.writeLock().unlock();
if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}

return true;
} finally {
if (managerLock != null) {
managerLock.unlock();
}
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,6 @@ void startReplication(ReplicationManager replicationManager,

/**
* Write message to page if we are paging.
* <p>
* This is primarily a {@link PagingStore} call, but as with any other call writing persistent
* data, it must go through here. Both for the sake of replication, and also to ensure that it
* takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
* creation of dead-locks.
*
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,20 @@ public static JournalContent getType(byte type) {

protected final ScheduledExecutorService scheduledExecutorService;

protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false);
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved

// I would rather cache the Closeable instance here..
// I never know when the JRE decides to create a new instance on every call.
// So I'm playing safe here. That's all
protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock;
protected final ArtemisCloseable unlockCloseable = this::unlockCloseable;
protected static final ArtemisCloseable dummyCloseable = () -> { };

private static final ThreadLocal<Boolean> reentrant = ThreadLocal.withInitial(() -> false);

private void unlockCloseable() {
storageManagerLock.readLock().unlock();
reentrant.set(false);
}

protected Journal messageJournal;

Expand Down Expand Up @@ -395,6 +403,12 @@ public void storeReference(final long queueID, final long messageID, final boole

@Override
public ArtemisCloseable closeableReadLock() {
if (reentrant.get()) {
return dummyCloseable;
}

reentrant.set(true);

CriticalCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();

Expand All @@ -415,20 +429,6 @@ public ArtemisCloseable closeableReadLock() {
}
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeLock() {
storageManagerLock.writeLock().lock();
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}

@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
Expand Down Expand Up @@ -2150,17 +2150,9 @@ protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long

@Override
public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
return store.page(msg, tx, listCtx, storageManagerLock.readLock());
try (ArtemisCloseable closeable = closeableReadLock()) {
return store.page(msg, tx, listCtx);
}
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
}

private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,18 +642,8 @@ public boolean addToPage(PagingStore store,
Message msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
if (store != null) {
return store.page(msg, tx, listCtx, null);
return store.page(msg, tx, listCtx);
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
} else {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
clebertsuconic marked this conversation as resolved.
Show resolved Hide resolved
</compilerArgs>
</configuration>
</plugin>
Expand Down Expand Up @@ -1165,7 +1165,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private static CoreMessage createMessage(final long id) throws Exception {

final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(ADDRESS, queue);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS));

return msg;
}
Expand Down
Loading