Skip to content

Commit

Permalink
ARTEMIS-4558 Idempotent Mirrored ACKs
Browse files Browse the repository at this point in the history
Mirror acks should be performed atomically with the storage of the source ACK. Both the send of the ack and the recording of the ack should be part of the same transaction (in case of transactional).

We are also adding support on transactions for an afterWired callback for the proper plug of OperationContext sync.
  • Loading branch information
clebertsuconic committed Jan 16, 2024
1 parent 1887b3f commit bc7e463
Show file tree
Hide file tree
Showing 7 changed files with 737 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.toSimpleString(BROKER_ID.toString());

// Events:
public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
Expand Down Expand Up @@ -154,7 +156,7 @@ public void addAddress(AddressInfo addressInfo) throws Exception {

if (addQueues) {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}

Expand All @@ -170,7 +172,7 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
}
if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}

Expand All @@ -193,7 +195,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
route(server, message);
routeMirrorCommand(server, message);
}
}

Expand All @@ -213,15 +215,34 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti

if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
route(server, message);
routeMirrorCommand(server, message);
}
}

private boolean invalidTarget(MirrorController controller, Message message) {
if (controller == null) {
return false;
}
String remoteID = getRemoteMirrorId();
if (remoteID == null) {
// This is to avoid a reflection (Miror sendin messages back to itself) from a small period of time one node reconnects but not the opposite direction.
Object localRemoteID = message.getAnnotation(BROKER_ID_SIMPLE_STRING);
if (localRemoteID != null) {
remoteID = String.valueOf(localRemoteID);
logger.debug("Remote link is not initialized yet, setting remoteID from message as {}", remoteID);
}
}
return sameNode(remoteID, controller.getRemoteMirrorId());
}

private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
}
return !addressFilter.match(address);
}

Expand All @@ -238,7 +259,7 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

if (invalidTarget(context.getMirrorSource())) {
if (invalidTarget(context.getMirrorSource(), message)) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
return;
}
Expand Down Expand Up @@ -444,13 +465,14 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
MirrorACKOperation operation = getAckOperation(tx);
// notice the operationContext.replicationLineUp is done on beforeCommit as part of the TX
operation.addMessage(messageCommand, ref);
routeMirrorCommand(server, messageCommand, tx);
} else {
server.getStorageManager().afterStoreOperations(new IOCallback() {
@Override
public void done() {
try {
logger.debug("preAcknowledge::afterStoreOperation for messageReference {}", ref);
route(server, messageCommand);
routeMirrorCommand(server, messageCommand);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
Expand All @@ -469,7 +491,7 @@ private MirrorACKOperation getAckOperation(Transaction tx) {
logger.trace("getAckOperation::setting operation on transaction {}", tx);
ackOperation = new MirrorACKOperation(server);
tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, ackOperation);
tx.afterStore(ackOperation);
tx.afterWired(ackOperation);
}

return ackOperation;
Expand All @@ -490,7 +512,7 @@ private MirrorSendOperation getSendOperation(Transaction tx) {
return sendOperation;
}

private static class MirrorACKOperation extends TransactionOperationAbstract {
private static class MirrorACKOperation implements Runnable {

final ActiveMQServer server;

Expand All @@ -511,47 +533,19 @@ public void addMessage(Message message, MessageReference ref) {
}

@Override
public void beforeCommit(Transaction tx) {
logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
acks.forEach(this::doBeforeCommit);
public void run() {
logger.debug("MirrorACKOperation::wired processing {}", acks);
acks.forEach(this::doWired);
}

// callback to be used on forEach
private void doBeforeCommit(Message ack, MessageReference ref) {
private void doWired(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationLineUp();
}
}

@Override
public void afterCommit(Transaction tx) {
logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
acks.forEach(this::doAfterCommit);
}

// callback to be used on forEach
private void doAfterCommit(Message ack, MessageReference ref) {
try {
route(server, ack);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
ref.getMessage().usageDown();
}

@Override
public void afterRollback(Transaction tx) {
acks.forEach(this::doAfterRollback);
}

// callback to be used on forEach
private void doAfterRollback(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationDone();
}
}

}

Expand Down Expand Up @@ -609,10 +603,17 @@ private Message createMessage(SimpleString address, SimpleString queue, Object e
return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, ackReason);
}

public static void route(ActiveMQServer server, Message message) throws Exception {
public static void routeMirrorCommand(ActiveMQServer server, Message message) throws Exception {
routeMirrorCommand(server, message, null);
}

public static void routeMirrorCommand(ActiveMQServer server, Message message, Transaction tx) throws Exception {
message.setMessageID(server.getStorageManager().generateID());
RoutingContext ctx = mirrorControlRouting.get();
ctx.clear().setMirrorOption(MirrorOption.disabled);
// it is important to use local only at the source to avoid having the message strictly load balancing
// to other nodes if the SNF queue has the same name as the one on this node.
ctx.clear().setMirrorOption(MirrorOption.disabled).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY).setTransaction(tx);
logger.debug("SetTX {}", tx);
server.getPostOffice().route(message, ctx, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMes
targetQueue.expire(reference, null, false);
break;
default:
targetQueue.acknowledge(null, reference, reason, null, false);
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(transaction, reference, reason, null, false);
transaction.commit();
break;
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
Expand All @@ -470,7 +472,7 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

String internalMirrorID = (String) deliveryAnnotations.getValue().get(BROKER_ID);
if (internalMirrorID == null) {
internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means the data was generated on the remote broker
internalMirrorID = getRemoteMirrorId(); // not passing the ID means the data was generated on the remote broker
}
Long internalIDLong = (Long) deliveryAnnotations.getValue().get(INTERNAL_ID);
String internalAddress = (String) deliveryAnnotations.getValue().get(INTERNAL_DESTINATION);
Expand Down Expand Up @@ -516,7 +518,7 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
message.setAddress(internalAddress);
}

final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager());
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
Expand Down Expand Up @@ -588,7 +590,9 @@ public boolean getAsBoolean() {
if (reference == null) {
return false;
} else {
targetQueue.acknowledge(null, reference, AckReason.NORMAL, null, false);
TransactionImpl tx = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(tx, reference, AckReason.NORMAL, null, false);
tx.commit();
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,5 +1306,69 @@ private void multiCastReplicaTest(boolean largeMessage,
}


@Test
public void testSimpleReplicaTX() throws Exception {

String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
server.setIdentity("targetServer");
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("thisone");

AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
replica.setName("theReplica");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.getConfiguration().setName("server_2");

int NUMBER_OF_MESSAGES = 10;

server_2.start();
Wait.assertTrue(server_2::isStarted);

// We create the address to avoid auto delete on the queue
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));

ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage(getText(true, i));
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();

Queue queueOnServer1 = locateQueue(server, getQueueName());
Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Assert.assertNotNull(snfreplica);

Wait.assertEquals(0, snfreplica::getMessageCount);

Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount, 2000);
Queue queueOnServer2 = locateQueue(server_2, getQueueName());
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);

MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
connection.start();

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message m = consumer.receive(1000);
Assert.assertNotNull(m);
}
session.commit();

Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(0, queueOnServer1::getMessageCount);
Wait.assertEquals(0, queueOnServer2::getMessageCount);
}



}

0 comments on commit bc7e463

Please sign in to comment.