Skip to content

Commit

Permalink
ARTEMIS-3384 Fix bridge duplicate messages after reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev committed Jul 14, 2021
1 parent 18e9dee commit 1855890
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private synchronized void addToCache(final ByteArray holder, final Transaction t
}

if (instantAdd) {
tx.addOperation(new AddDuplicateIDOperation(holder, false));
addToCacheInMemory(holder);
} else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private synchronized void addToCache(final ByteArray holder,
}

if (instantAdd) {
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
addToCacheInMemory(holder, recordID);
} else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
Expand All @@ -39,12 +45,18 @@
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -399,6 +411,127 @@ public void testReconnectSameNode() throws Exception {
assertNoMoreConnections();
}

// Fail bridge and reconnect same node, no backup specified
@Test
public void testReconnectSameNodeAfterDelivery() throws Exception {
server0 = createActiveMQServer(0, isNetty(), server0Params);

TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");

server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);

BridgeConfiguration bridgeConfiguration = createBridgeConfig();

List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);

QueueConfiguration queueConfig0 = new QueueConfiguration(queueName).setAddress(testAddress);
List<QueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigs(queueConfigs0);

QueueConfiguration queueConfig1 = new QueueConfiguration(queueName).setAddress(forwardAddress);
List<QueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigs(queueConfigs1);

startServers();

locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server0tc, server1tc));
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
session0 = csf0.createSession(false, true, true);

ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
session1 = csf1.createSession(false, true, true);

ClientProducer prod0 = session0.createProducer(testAddress);

ClientConsumer cons1 = session1.createConsumer(queueName);

session1.start();

final ManagementService managementService = server0.getManagementService();
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queueName);
assertEquals(0, coreQueueControl.getDeliveringCount());

final int numMessages = NUM_MESSAGES;

SimpleString propKey = new SimpleString("propkey");

CyclicBarrier routingBarrier = new CyclicBarrier(2);
CountDownLatch deliveryBeforeFailureLatch = new CountDownLatch(numMessages);
CountDownLatch deliveryAfterFailureLatch = new CountDownLatch(2 * numMessages);
List<Message> sendingMessages = Collections.synchronizedList(new ArrayList<>());
Map<Integer, ClientMessage> clientMessages = new ConcurrentHashMap<>();

server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
@Override
public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
ActiveMQServerPlugin.super.afterDeliverBridge(bridge, ref, status);

deliveryBeforeFailureLatch.countDown();
deliveryAfterFailureLatch.countDown();
}
});


server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
sendingMessages.add(message);
try {
// Simulate CPU load until bridge delivery after failure
deliveryAfterFailureLatch.await();
} catch (InterruptedException e) {
log.debug(e);
}
}

@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
if (sendingMessages.contains(message)) {
try {
// Force duplicateID atomicVerify of messages delivered again by the bridge after failure
// before routing messages delivered by bridge before failure
routingBarrier.await();
} catch (InterruptedException e) {
log.debug(e);
} catch (BrokenBarrierException e) {
log.debug(e);
}
}
}
});

for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(false);
message.putIntProperty(propKey, i);

prod0.send(message);
}

deliveryBeforeFailureLatch.await();

assertEquals(numMessages, coreQueueControl.getDeliveringCount());

// Now we will simulate a failure of the bridge connection between server0 and server1
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
assertNotNull(bridge);
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new ActiveMQNotConnectedException());

for (int i = 0; i < numMessages; i++) {
ClientMessage r1 = cons1.receive(1500);
assertNotNull(r1);
assertNull(clientMessages.putIfAbsent(r1.getIntProperty(propKey), r1));
}
closeServers();

assertNoMoreConnections();
}

// We test that we can pause more than client failure check period (to prompt the pinger to failing)
// before reconnecting
@Test
Expand Down

0 comments on commit 1855890

Please sign in to comment.