Skip to content

Commit

Permalink
ARTEMIS-4314 support queue federation batchOnCapacity via consumerWin…
Browse files Browse the repository at this point in the history
…dowSize=0
  • Loading branch information
gtully committed Jun 16, 2023
1 parent d00b9ad commit a8b4ee1
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
Expand Down Expand Up @@ -133,6 +134,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private volatile boolean ackIndividually;

private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;

public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
Expand Down Expand Up @@ -406,6 +408,13 @@ public Thread getCurrentThread() {
return receiverThread;
}

@Override
public ClientConsumer setManualFlowMessageHandler(final MessageHandler theHandler) throws ActiveMQException {
checkClosed();
this.handler = theHandler;
this.manualFlowManagement = true;
return this;
}

// Must be synchronized since messages may be arriving while handler is being set and might otherwise end
// up not queueing enough executors - so messages get stranded
Expand Down Expand Up @@ -849,7 +858,8 @@ private void startSlowConsumer() {
}
}

private void resetIfSlowConsumer() {
@Override
public void resetIfSlowConsumer() {
if (clientWindowSize == 0) {
sendCredits(0);

Expand Down Expand Up @@ -1041,6 +1051,9 @@ private void safeRestoreContextClassLoader(final ClassLoader originalClassLoader
* @throws ActiveMQException
*/
private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
if (manualFlowManagement) {
return;
}
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0) {
// on large messages we should discount 1 on the first packets as we need continuity until the last packet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.utils.FutureLatch;

public interface ClientConsumerInternal extends ClientConsumer {
Expand Down Expand Up @@ -76,4 +77,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
ClientSession.QueueQuery getQueueInfo();

long getForceDeliveryCount();

ClientConsumer setManualFlowMessageHandler(MessageHandler theHandler) throws ActiveMQException;

void resetIfSlowConsumer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ default int retryMessages(Filter filter, Integer expectedHits) throws Exception

boolean hasMatchingConsumer(Message message);

long getPendingMessageCount();

Collection<Consumer> getConsumers();

Map<SimpleString, Consumer> getGroups();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.federation;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,20 +26,23 @@
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;

public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
Expand All @@ -60,7 +64,9 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi

private ClientSessionFactoryInternal clientSessionFactory;
private ClientSession clientSession;
private ClientConsumer clientConsumer;
private ClientConsumerInternal clientConsumer;
private final AtomicInteger pendingPullCredit = new AtomicInteger();
private QueueHandle queueHandle;

public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
this.federation = federation;
Expand Down Expand Up @@ -135,9 +141,16 @@ private synchronized void connect() throws Exception {
if (clientSessionCallback != null) {
clientSessionCallback.callback(clientSession);
}
if (clientSession.queueQuery(key.getQueueName()).isExists()) {
this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
this.clientConsumer.setMessageHandler(this);
ClientSession.QueueQuery queryResult = clientSession.queueQuery(key.getQueueName());
if (queryResult.isExists()) {
this.clientConsumer = (ClientConsumerInternal) clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
if (this.clientConsumer.getClientWindowSize() == 0) {
this.clientConsumer.setManualFlowMessageHandler(this);
queueHandle = createQueueHandle(server, queryResult);
scheduleCreditOnEmpty(0, queueHandle);
} else {
this.clientConsumer.setMessageHandler(this);
}
} else {
throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
}
Expand All @@ -155,6 +168,75 @@ private synchronized void connect() throws Exception {
}
}

interface QueueHandle {
long getMessageCount();
int getCreditWindow();

Executor getExecutor();
}

private QueueHandle createQueueHandle(ActiveMQServer server, ClientSession.QueueQuery queryResult) {
final Queue queue = server.locateQueue(queryResult.getName());
int creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;

final Integer defaultConsumerWindowSize = queryResult.getDefaultConsumerWindowSize();
if (defaultConsumerWindowSize != null) {
creditWindow = defaultConsumerWindowSize.intValue();
if (creditWindow <= 0) {
creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
logger.trace("{} override non positive queue consumerWindowSize with {}.", this, creditWindow);
}
}

final int finalCreditWindow = creditWindow;
return new QueueHandle() {
@Override
public long getMessageCount() {
return queue.getPendingMessageCount();
}

@Override
public int getCreditWindow() {
return finalCreditWindow;
}

@Override
public Executor getExecutor() {
return queue.getExecutor();
}
};
}

private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) {
scheduledExecutorService.schedule(() -> {
// use queue executor to sync on message count metric
handle.getExecutor().execute(() -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
}
});
}, delay, TimeUnit.SECONDS);
}

private void flow(int creditWindow) {
try {
if (this.clientConsumer != null) {
this.clientConsumer.flowControl(creditWindow, false);
}
} catch (ActiveMQException ignored) {
logger.trace("{} failed to flowControl with credit {}.", this, creditWindow, ignored);
}
}

@Override
public synchronized void close() {
if (started) {
Expand Down Expand Up @@ -225,6 +307,13 @@ public void onMessage(ClientMessage clientMessage) {
}
clientMessage.acknowledge();

if (pendingPullCredit.get() > 0) {
final int delta = ((ClientMessageInternal) clientMessage).getFlowControlSize();
if (pendingPullCredit.addAndGet(-delta) < 0) {
scheduleCreditOnEmpty(0, queueHandle);
}
}

if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,8 @@ public long getCreatedTimestamp() {
return createdTimestamp;
}

public long getMessageCountForRing() {
@Override
public long getPendingMessageCount() {
return (long) pendingMetrics.getMessageCount();
}

Expand Down Expand Up @@ -4610,7 +4611,7 @@ private void enforceRing(boolean head) {
private void enforceRing(MessageReference refToAck, boolean scheduling, boolean head) {
int adjustment = head ? 1 : 0;

if (getMessageCountForRing() + adjustment > ringSize) {
if (getPendingMessageCount() + adjustment > ringSize) {
refToAck = refToAck == null ? messageReferences.poll() : refToAck;

if (refToAck != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,11 @@ public boolean hasMatchingConsumer(Message message) {
return false;
}

@Override
public long getPendingMessageCount() {
return 0;
}

@Override
public Collection<Consumer> getConsumers() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,11 @@ public boolean hasMatchingConsumer(Message message) {
return false;
}

@Override
public long getPendingMessageCount() {
return 0;
}

@Override
public Collection<Consumer> getConsumers() {
return null;
Expand Down
4 changes: 4 additions & 0 deletions docs/user-manual/en/federation-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ e.g. as many messages as possible are consumed from the same broker as they were
Here for such a migration with blue/green or canary moving a number of consumers on the same queue, you may want to set the `priority-adjustment` to 0, or even a positive value, so message would actively flow to the federated queue.


* Dual Federation - potential for messages to flip-flop between clusters.
If the backlog on your queues exceeds the available local credit across consumers, any lower priority federation consumer becomes a candidate for dispatch and messages will be federated. Eventually all messages may migrate and the scenario can repeat on the other cluster. Applying a rate limit to the connector url can help mitigate but this could have an adverse effect on migration when there are no local consumers.
To better support this use case, it is possible to configure the consumerWindowSize to zero on the referenced connector URI: ```tcp://<host>:<port>?consumerWindowSize=0```. This will cause the federation consumer to pull messages in batches only when the local queue has excess capacity. This means that federation won't ever drain more messaces than it can handle, such that messages would flip-flop. The batch size is derived from the relevant address settings defaultConsumerWindowSize.

## Configuring Queue Federation

Federation is configured in `broker.xml`.
Expand Down

0 comments on commit a8b4ee1

Please sign in to comment.