Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3384 Fix bridge duplicate messages after reconnection #3652

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ public void load(List<Pair<byte[], Long>> ids) throws Exception {

@Override
public void deleteFromCache(byte[] duplicateID) {
deleteFromCache(new ByteArray(duplicateID));
}

private void deleteFromCache(final ByteArray duplicateID) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
}

ByteArray bah = new ByteArray(duplicateID);

Integer posUsed = cache.remove(bah);
Integer posUsed = cache.remove(duplicateID);

if (posUsed != null) {
ByteArray id;
Expand All @@ -90,10 +92,10 @@ public void deleteFromCache(byte[] duplicateID) {
final int index = posUsed.intValue();
id = ids.get(index);

if (id.equals(bah)) {
if (id.equals(duplicateID)) {
ids.set(index, null);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID.bytes));
}
}
}
Expand Down Expand Up @@ -158,6 +160,7 @@ private synchronized void addToCache(final ByteArray holder, final Transaction t
}

if (instantAdd) {
addToCacheInMemory(holder);
tx.addOperation(new AddDuplicateIDOperation(holder, false));
} else {
// For a tx, it's important that the entry is not added to the cache until commit
Expand Down Expand Up @@ -262,9 +265,9 @@ public void afterCommit(final Transaction tx) {
}

@Override
public void beforeCommit(Transaction tx) throws Exception {
public void beforeRollback(Transaction tx) throws Exception {
if (!afterCommit) {
process();
deleteFromCache(id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,26 @@ public synchronized void load(final List<Pair<byte[], Long>> ids) throws Excepti

@Override
public void deleteFromCache(byte[] duplicateID) throws Exception {
deleteFromCache(new ByteArray(duplicateID));
}

private void deleteFromCache(final ByteArray duplicateID) throws Exception {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
}

final ByteArray bah = new ByteArray(duplicateID);

final Integer posUsed = cache.remove(bah);
final Integer posUsed = cache.remove(duplicateID);

if (posUsed != null) {
synchronized (this) {
final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());

if (id.getA().equals(bah)) {
if (id.getA().equals(duplicateID)) {
final long recordID = id.getB();
id.setA(null);
id.setB(NIL);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID.bytes, id.getB()));
}
storageManager.deleteDuplicateID(recordID);
}
Expand Down Expand Up @@ -240,6 +242,7 @@ private synchronized void addToCache(final ByteArray holder,
}

if (instantAdd) {
addToCacheInMemory(holder, recordID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it.. wouldn't this be either in TX or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but it isn't executed by TX if instantAdd = true

tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
} else {
// For a tx, it's important that the entry is not added to the cache until commit
Expand Down Expand Up @@ -379,9 +382,9 @@ public void afterCommit(final Transaction tx) {
}

@Override
public void beforeCommit(Transaction tx) throws Exception {
public void beforeRollback(Transaction tx) throws Exception {
if (!afterCommit) {
process();
deleteFromCache(holder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
Expand All @@ -39,12 +45,18 @@
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -399,6 +411,127 @@ public void testReconnectSameNode() throws Exception {
assertNoMoreConnections();
}

// Fail bridge and reconnect same node, no backup specified
@Test
public void testReconnectSameNodeAfterDelivery() throws Exception {
server0 = createActiveMQServer(0, isNetty(), server0Params);

TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");

server0.getConfiguration().setConnectorConfigurations(connectors);
server1.getConfiguration().setConnectorConfigurations(connectors);

BridgeConfiguration bridgeConfiguration = createBridgeConfig();

List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);

QueueConfiguration queueConfig0 = new QueueConfiguration(queueName).setAddress(testAddress);
List<QueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigs(queueConfigs0);

QueueConfiguration queueConfig1 = new QueueConfiguration(queueName).setAddress(forwardAddress);
List<QueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigs(queueConfigs1);

startServers();

locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server0tc, server1tc));
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
session0 = csf0.createSession(false, true, true);

ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
session1 = csf1.createSession(false, true, true);

ClientProducer prod0 = session0.createProducer(testAddress);

ClientConsumer cons1 = session1.createConsumer(queueName);

session1.start();

final ManagementService managementService = server0.getManagementService();
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queueName);
assertEquals(0, coreQueueControl.getDeliveringCount());

final int numMessages = NUM_MESSAGES;

SimpleString propKey = new SimpleString("propkey");

CyclicBarrier routingBarrier = new CyclicBarrier(2);
CountDownLatch deliveryBeforeFailureLatch = new CountDownLatch(numMessages);
CountDownLatch deliveryAfterFailureLatch = new CountDownLatch(2 * numMessages);
List<Message> sendingMessages = Collections.synchronizedList(new ArrayList<>());
Map<Integer, ClientMessage> clientMessages = new ConcurrentHashMap<>();

server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
@Override
public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
ActiveMQServerPlugin.super.afterDeliverBridge(bridge, ref, status);

deliveryBeforeFailureLatch.countDown();
deliveryAfterFailureLatch.countDown();
}
});


server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
sendingMessages.add(message);
try {
// Simulate CPU load until bridge delivery after failure
deliveryAfterFailureLatch.await();
} catch (InterruptedException e) {
log.debug(e);
}
}

@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
if (sendingMessages.contains(message)) {
try {
// Force duplicateID atomicVerify of messages delivered again by the bridge after failure
// before routing messages delivered by bridge before failure
routingBarrier.await();
} catch (InterruptedException e) {
log.debug(e);
} catch (BrokenBarrierException e) {
log.debug(e);
}
}
}
});

for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(false);
message.putIntProperty(propKey, i);

prod0.send(message);
}

deliveryBeforeFailureLatch.await();

assertEquals(numMessages, coreQueueControl.getDeliveringCount());

// Now we will simulate a failure of the bridge connection between server0 and server1
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
assertNotNull(bridge);
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
forwardingConnection.fail(new ActiveMQNotConnectedException());

for (int i = 0; i < numMessages; i++) {
ClientMessage r1 = cons1.receive(1500);
assertNotNull(r1);
assertNull(clientMessages.putIfAbsent(r1.getIntProperty(propKey), r1));
}
closeServers();

assertNoMoreConnections();
}

// We test that we can pause more than client failure check period (to prompt the pinger to failing)
// before reconnecting
@Test
Expand Down