Skip to content
Permalink
Browse files
ARTEMIS-3807 Simplify Redistributor avoid races, and fix ProtocolMess…
…ageLoadBalancingTest

The control existing in Redistributor is not needed as the Queue::deliver will already have a control on re-scheduling the loop and avoid holding references for too long.
  • Loading branch information
clebertsuconic committed Apr 29, 2022
1 parent 1a27382 commit 48cd586ac59440c99d1bde9477b82c4b8c44a16b
Showing 5 changed files with 49 additions and 157 deletions.
@@ -18,15 +18,12 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -44,14 +41,8 @@ public class Redistributor implements Consumer {

private final PostOffice postOffice;

private final Executor executor;

private final int batchSize;

private final Queue queue;

private int count;

private final long sequentialID;

// a Flush executor here is happening inside another executor.
@@ -61,20 +52,14 @@ public class Redistributor implements Consumer {

public Redistributor(final Queue queue,
final StorageManager storageManager,
final PostOffice postOffice,
final Executor executor,
final int batchSize) {
final PostOffice postOffice) {
this.queue = queue;

this.sequentialID = storageManager.generateID();

this.storageManager = storageManager;

this.postOffice = postOffice;

this.executor = executor;

this.batchSize = batchSize;
}

@Override
@@ -103,39 +88,17 @@ public void disconnect() {
}

public synchronized void start() {
active = true;
this.active = true;
}

public synchronized void stop() throws Exception {
active = false;

boolean ok = flushExecutor();

if (!ok) {
ActiveMQServerLogger.LOGGER.errorStoppingRedistributor();
}
this.active = false;
}

public synchronized void close() {
boolean ok = flushExecutor();

if (!ok) {
throw new IllegalStateException("Timed out waiting for executor to complete");
}

active = false;
}

private boolean flushExecutor() {
try {
boolean ok = pendingRuns.await(10000);
return ok;
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.failedToFlushExecutor(e);
return false;
}
}

@Override
public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
if (!active) {
@@ -154,41 +117,9 @@ public synchronized HandleStatus handle(final MessageReference reference) throws
return HandleStatus.BUSY;
}

if (!reference.getMessage().isLargeMessage()) {

postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

ackRedistribution(reference, tx);
} else {
active = false;
executor.execute(new Runnable() {
@Override
public void run() {
try {

postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

ackRedistribution(reference, tx);

synchronized (Redistributor.this) {
active = true;
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

count++;

queue.deliverAsync();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID());
try {
tx.rollback();
} catch (Exception e2) {
// Nothing much we can do now
ActiveMQServerLogger.LOGGER.failedToRollback(e2);
}
}
}
});
}
ackRedistribution(reference, tx);

return HandleStatus.HANDLED;
}
@@ -198,68 +129,12 @@ public void proceedDeliver(MessageReference ref) {
// no op
}

private void internalExecute(final Runnable runnable) {
pendingRuns.countUp();
executor.execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
pendingRuns.countDown();
}
}
});
}

private void ackRedistribution(final MessageReference reference, final Transaction tx) throws Exception {
reference.handled();

queue.acknowledge(tx, reference);

tx.commit();

storageManager.afterCompleteOperations(new IOCallback() {

@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorRedistributing(errorCode, errorMessage);
}

@Override
public void done() {
execPrompter();
}
});
}

private void execPrompter() {
count++;

// We use >= as the large message redistribution will set count to max_int
// so we are use the prompter will get called
if (count >= batchSize) {
// We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
// long time in the case there are many messages in the queue
active = false;

executor.execute(new Prompter());

count = 0;
}

}

private class Prompter implements Runnable {

@Override
public void run() {
synchronized (Redistributor.this) {
active = true;

queue.deliverAsync();
}
}
}

/* (non-Javadoc)
@@ -141,8 +141,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private static final AtomicLongFieldUpdater<QueueImpl> consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp");
private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");

public static final int REDISTRIBUTOR_BATCH_SIZE = 100;

public static final int NUM_PRIORITIES = 10;

public static final int MAX_DELIVERIES_IN_LOOP = 1000;
@@ -1532,7 +1530,7 @@ public synchronized void addRedistributor(final long delay) {
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
}
} else {
internalAddRedistributor(executor);
internalAddRedistributor();
}
}

@@ -3257,12 +3255,12 @@ public synchronized MessageReference removeWithSuppliedID(String serverID, long
return messageReferences.removeWithID(serverID, id);
}

private void internalAddRedistributor(final ArtemisExecutor executor) {
private void internalAddRedistributor() {
if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) {
if (logger.isTraceEnabled()) {
logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
}
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE));
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice));
redistributor.consumer.start();
consumers.add(redistributor);
hasUnMatchedPending = false;
@@ -4130,7 +4128,7 @@ private class DelayedAddRedistributor implements Runnable {
@Override
public void run() {
synchronized (QueueImpl.this) {
internalAddRedistributor(executor1);
internalAddRedistributor();

clearRedistributorFuture();
}
@@ -147,6 +147,7 @@
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.PortCheckRule;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -228,6 +229,31 @@ public abstract class ActiveMQTestBase extends Assert {
// There is a verification about thread leakages. We only fail a single thread when this happens
private static Set<Thread> alreadyFailedThread = new HashSet<>();

private LinkedList<RunnableEx> runAfter;

protected synchronized void runAfter(RunnableEx run) {
Assert.assertNotNull(run);
if (runAfter == null) {
runAfter = new LinkedList();
}
runAfter.add(run);
}

@After
public void runAfter() {
if (runAfter != null) {
runAfter.forEach((r) -> {
try {
r.run();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
}



private final Collection<ActiveMQServer> servers = new ArrayList<>();
private final Collection<ServerLocator> locators = new ArrayList<>();
private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<>();

0 comments on commit 48cd586

Please sign in to comment.