Skip to content

Commit

Permalink
ARTEMIS-3848 High CPU usage on ReadWriteLocks
Browse files Browse the repository at this point in the history
This is caused by too many entires on the HashMap for ThreadLocals.
Also: I'm reviewing some readlock usage on the StorageManager to simplify things a little bit.
  • Loading branch information
clebertsuconic committed May 25, 2022
1 parent 3ea18a8 commit 80dd821
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.activemq.artemis.utils.actors;

import java.util.concurrent.atomic.AtomicInteger;

/**
* This abstract class will encapsulate
* ThreadLocals to determine when a class is a handler.
Expand All @@ -25,27 +27,18 @@
*/
public abstract class HandlerBase {

//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;

// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
private static final ThreadLocal<AtomicInteger> inHandler = ThreadLocal.withInitial(() -> new AtomicInteger(0));

protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
protected static void enter() {
inHandler.get().incrementAndGet();
}

public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
public static boolean inHandler() {
return inHandler.get().get() > 0;
}

protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
protected static void leave() {
inHandler.get().decrementAndGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
private static final ThreadLocal<ArrayList<ReadyListener>> readyListenersPool = new ThreadLocal<>();

private final boolean batchingEnabled;

Expand Down Expand Up @@ -138,51 +137,47 @@ public boolean isOpen() {

@Override
public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);

// We are reusing a previously created ArrayList for this localArray
ArrayList<ReadyListener> localArrayList = readyListenersPool.get();
if (localArrayList == null) {
localArrayList = new ArrayList<>();
readyListenersPool.set(localArrayList);
} else {
localArrayList.clear();
}

synchronized (readyListeners) {
this.ready = ready;

if (ready) {
final int size = this.readyListeners.size();
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
localArrayList.ensureCapacity(size);
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
localArrayList.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
try {
localArrayList.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
localArrayList.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
Expand Down Expand Up @@ -96,7 +95,7 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception;

Page createPage(int page) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -897,8 +896,7 @@ public boolean checkReleasedMemory() {
@Override
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
RouteContextList listCtx) throws Exception {

if (!running) {
return false;
Expand Down Expand Up @@ -941,62 +939,53 @@ public boolean page(Message message,
lock.readLock().unlock();
}

if (managerLock != null) {
managerLock.lock();
}
try {
lock.writeLock().lock();
lock.writeLock().lock();

try {
if (!paging) {
return false;
}
try {
if (!paging) {
return false;
}

final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);

if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}

int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;

currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
}

if (tx != null) {
installPageTransaction(tx, listCtx);
}
}

// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
if (tx != null) {
installPageTransaction(tx, listCtx);
}

page.write(pagedMessage);
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);

if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
page.write(pagedMessage);

if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}

return true;
} finally {
lock.writeLock().unlock();
if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}

return true;
} finally {
if (managerLock != null) {
managerLock.unlock();
}
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,20 @@ public static JournalContent getType(byte type) {

protected final ScheduledExecutorService scheduledExecutorService;

protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false);

// I would rather cache the Closeable instance here..
// I never know when the JRE decides to create a new instance on every call.
// So I'm playing safe here. That's all
protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock;
protected final ArtemisCloseable unlockCloseable = this::unlockCloseable;
protected static final ArtemisCloseable dummyCloseable = () -> { };

private static final ThreadLocal<Boolean> reentrant = ThreadLocal.withInitial(() -> false);

private void unlockCloseable() {
storageManagerLock.readLock().unlock();
reentrant.set(false);
}

protected Journal messageJournal;

Expand Down Expand Up @@ -395,6 +403,12 @@ public void storeReference(final long queueID, final long messageID, final boole

@Override
public ArtemisCloseable closeableReadLock() {
if (reentrant.get()) {
return dummyCloseable;
}

reentrant.set(true);

CriticalCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();

Expand All @@ -415,20 +429,6 @@ public ArtemisCloseable closeableReadLock() {
}
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeLock() {
storageManagerLock.writeLock().lock();
}

/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}

@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
Expand Down Expand Up @@ -2160,7 +2160,9 @@ public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteCo
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
return store.page(msg, tx, listCtx, storageManagerLock.readLock());
try (ArtemisCloseable closeable = closeableReadLock()) {
return store.page(msg, tx, listCtx);
}
}

private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public boolean addToPage(PagingStore store,
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
if (store != null) {
return store.page(msg, tx, listCtx, null);
return store.page(msg, tx, listCtx);
} else {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private static CoreMessage createMessage(final long id) throws Exception {

final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(ADDRESS, queue);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS));

return msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
Expand Down Expand Up @@ -354,8 +353,7 @@ public void ioSync() throws Exception {
@Override
public boolean page(Message message,
Transaction tx,
RouteContextList listCtx,
ReentrantReadWriteLock.ReadLock readLock) throws Exception {
RouteContextList listCtx) throws Exception {
return false;
}

Expand Down
Loading

0 comments on commit 80dd821

Please sign in to comment.