Skip to content

Commit

Permalink
ARTEMIS-4453 Lots of addresses breaks cluster bridge flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Oct 11, 2023
1 parent 6dc225c commit ab6f0a3
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public synchronized int decrementRefCount() {
return --refCount;
}

@Override
public abstract int getBalance();

protected void checkCredits(final int credits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public void receiveCredits(int credits) {

}


@Override
public void receiveFailCredits(final int credits) {
super.receiveFailCredits(credits);
Expand All @@ -85,16 +84,4 @@ public void receiveFailCredits(final int credits) {
}
callback.onCreditsFail(this);
}

@Override
public void releaseOutstanding() {
synchronized (this) {
balance = 0;
callback.onCreditsFlow(true, this);
if (logger.isDebugEnabled()) {
logger.debug("releaseOutstanding credits, balance={}, callback={}", balance, callback.getClass());
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
Expand All @@ -37,10 +40,16 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana

private ClientProducerFlowCallback callback;

public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
private final ScheduledExecutorService scheduledThreadPool;

private ScheduledFuture future;

public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize, ScheduledExecutorService scheduledThreadPool) {
this.session = session;

this.windowSize = windowSize;

this.scheduledThreadPool = scheduledThreadPool;
}


Expand Down Expand Up @@ -146,6 +155,10 @@ public synchronized void close() {
producerCredits.clear();

unReferencedCredits.clear();

if (future != null) {
future.cancel(false);
}
}

@Override
Expand All @@ -162,26 +175,30 @@ private void addToUnReferencedCache(final SimpleString address, final ClientProd
unReferencedCredits.put(address, credits);

if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
// Remove the oldest entry

Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();

Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();

iter.remove();

removeEntry(oldest.getKey(), oldest.getValue());
// if we've exceeded our limit then try to clean up
if (future == null) {
future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
synchronized (this) {
Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<SimpleString, ClientProducerCredits> entry = iter.next();
if (entry.getValue().getBalance() == 0) {
iter.remove();
producerCredits.remove(entry.getKey());
entry.getValue().close();
}
}
}
}, 0, 30, TimeUnit.SECONDS);
}
} else {
// if we're below our limit make sure we're not trying to clean up
if (future != null) {
future.cancel(false);
}
}
}

private void removeEntry(final SimpleString address, final ClientProducerCredits credits) {
producerCredits.remove(address);

credits.releaseOutstanding();

credits.close();
}

static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits {

static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
Expand Down Expand Up @@ -225,12 +242,13 @@ public int decrementRefCount() {
}

@Override
public void releaseOutstanding() {
public SimpleString getAddress() {
return SimpleString.toSimpleString("");
}

@Override
public SimpleString getAddress() {
return SimpleString.toSimpleString("");
public int getBalance() {
return 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface ClientProducerCredits {

int decrementRefCount();

void releaseOutstanding();

SimpleString getAddress();

int getBalance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ public void receiveCredits(final int credits) {
semaphore.release(credits);
}


@Override
public synchronized void releaseOutstanding() {
semaphore.drainPermits();
}

@Override
public int getBalance() {
return semaphore.availablePermits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ private ClientSession createSessionInternal(final String rawUsername,

SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);

ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), scheduledThreadPool);

synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
Expand Down Expand Up @@ -193,7 +194,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final Executor executor,
final Executor confirmationExecutor,
final Executor flowControlExecutor,
final Executor closeExecutor) throws ActiveMQException {
final Executor closeExecutor,
final ScheduledExecutorService scheduledThreadPool) throws ActiveMQException {
this.sessionFactory = sessionFactory;

this.name = name;
Expand Down Expand Up @@ -246,7 +248,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

this.groupID = groupID;

producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);
producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize, scheduledThreadPool);

this.sessionContext = sessionContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import java.util.Collection;
import java.util.List;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
Expand All @@ -34,6 +39,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SimpleSymmetricClusterTest extends ClusterTestBase {

Expand Down Expand Up @@ -127,6 +136,78 @@ public void testSimple() throws Exception {

}

@Test
public void testWildcardFlowControl() throws Exception {
final int PRODUCER_COUNT = 3000;
final int ITERATIONS = 4;
final CountDownLatch consumerLatch = new CountDownLatch(PRODUCER_COUNT * ITERATIONS);
final CountDownLatch producerLatch = new CountDownLatch(PRODUCER_COUNT * ITERATIONS);

setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
servers[0].getConfiguration().setWildcardRoutingEnabled(true);
servers[1].getConfiguration().setWildcardRoutingEnabled(true);

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);

waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

logger.info("Creating " + PRODUCER_COUNT + " multicast addresses on node 1...");
for (int i = 0; i < PRODUCER_COUNT; i++) {
createAddressInfo(1, "queues." + i, RoutingType.MULTICAST, -1, false);
}
logger.info("Addresses created.");

createQueue(0, "queues.#", "queue", null, false, RoutingType.MULTICAST);

logger.info("Creating consumer on node 0");
addConsumer(0, 0, "queue", null);

consumers[0].getConsumer().setMessageHandler(message -> {
logger.debug("Received: " + message);
consumerLatch.countDown();
});

waitForBindings(0, "queues.#", 1, 1, true);

logger.info("Creating " + PRODUCER_COUNT + " producers...");
ClientProducer[] producers = new ClientProducer[PRODUCER_COUNT];
ClientSession[] sessions = new ClientSession[PRODUCER_COUNT];
for (int i = 0; i < PRODUCER_COUNT; i++) {
ClientSessionFactory sf = sfs[1];
sessions[i] = addClientSession(sf.createSession(true, true, 0));
producers[i] = addClientProducer(sessions[i].createProducer("queues." + i));
}

ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_COUNT);
runAfter(executorService::shutdownNow);
for (int i = 0; i < PRODUCER_COUNT; i++) {
final ClientProducer producer = producers[i];
final ClientMessage message = sessions[i].createMessage(true);
executorService.submit(() -> {
for (int j = 0; j < ITERATIONS; j++) {
try {
producer.send(message);
producerLatch.countDown();
logger.debug("Sent message");
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
producerLatch.await(30, TimeUnit.SECONDS);
logger.info("Waiting for messages on node 0");
assertTrue(consumerLatch.await(30, TimeUnit.SECONDS));
}

@Test
public void testSimpleRestartClusterConnection() throws Exception {
setupServer(0, true, isNetty());
Expand Down

0 comments on commit ab6f0a3

Please sign in to comment.