Skip to content

Commit

Permalink
ARTEMIS-4758 Hardening Mirroring
Browse files Browse the repository at this point in the history
This is a list of improvements done as part of this commit / task:

* Page Transactions on mirror target are now optional.

If you had an interrupt mirror while the target destination was paging, duplicate detection would be ineffective unless you used paged transactions
Users can now configure the ack manager retries intervals.
Say you need some time to remove a consumer from a target mirror. The delivering references would prevent acks from happening. You can allow bigger retry intervals and number of retries by tinkiering with ack manager retry parameters.

* AckManager restarted independent of incoming acks

The ackManager was only restarted when new acks were coming in. If you stopped receiving acks on a target server and restarted that server with pending acks, those acks would never be exercised. The AckManager is now restarted as soon as the server is started.
  • Loading branch information
clebertsuconic committed May 2, 2024
1 parent afd7951 commit c523458
Show file tree
Hide file tree
Showing 24 changed files with 670 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,20 @@ public long getPeriod() {
}

public synchronized ActiveMQScheduledComponent setPeriod(long period) {
this.period = period;
restartIfNeeded();
if (this.period != period) {
this.period = period;
restartIfNeeded();
}
return this;
}

public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) {
this.period = period;
this.timeUnit = unit;
restartIfNeeded();
if (unit == null) throw new NullPointerException("unit is required");
if (this.period != period || this.timeUnit != unit) {
this.period = period;
this.timeUnit = unit;
restartIfNeeded();
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,52 @@ public void run() {
}
}


@Test
public void testUpdatePeriod() throws Throwable {
final ReusableLatch latch = new ReusableLatch(1);

final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
latch.countDown();
}
};

local.start();

try {
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));

latch.setCount(1);
local.delay();
Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));

latch.setCount(1);
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));

local.setPeriod(TimeUnit.HOURS.toMillis(1), TimeUnit.MILLISECONDS);

latch.setCount(1);
local.delay();
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));

local.setPeriod(1);
local.delay();
Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));

local.setPeriod(1, TimeUnit.SECONDS);

latch.setCount(1);
local.delay();

Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
local.stop();
local.stop(); // calling stop again should not be an issue.
}
}

@Test
public void testUsingCustomInitialDelay() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,17 @@ public static String getDefaultHapolicyBackupStrategy() {

private static final boolean DEFAULT_MANAGEMENT_MESSAGE_RBAC = false;


// These properties used to defined with this prefix.
// I'm keeping the older property name in an attempt to guarantee compatibility
private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;

private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));;

private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;

/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
Expand Down Expand Up @@ -1918,4 +1929,25 @@ public static String getManagementRbacPrefix() {
public static boolean getManagementMessagesRbac() {
return DEFAULT_MANAGEMENT_MESSAGE_RBAC;
}


/** This configures the Mirror Ack Manager number of attempts on queues before trying page acks.
* It is not intended to be configured through the XML.
* The default value here is 5. */
public static int getMirrorAckManagerMinQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS;
}

public static int getMirrorAckManagerMaxPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS;
}

public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}

public static boolean getDefaultMirrorPageTransaction() {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* K = Key
* V = Value
* C = Context
* */
public class JournalHashMap<K, V, C> implements Map<K, V> {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public String getModuleName() {
@Override
public void loadProtocolServices(ActiveMQServer server, List<ActiveMQComponent> services) {
try {
AckManager ackManager = AckManagerProvider.getManager(server, false);
AckManager ackManager = AckManagerProvider.getManager(server);
services.add(ackManager);
server.registerRecordsLoader(ackManager::reload);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ public void onError(int errorCode, String errorMessage) {
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

snfQueue.deliverAsync();
}

private void syncDone(MessageReference reference) {
Expand Down Expand Up @@ -516,6 +518,7 @@ public void postAcknowledge(MessageReference ref, AckReason reason) throws Excep
postACKInternalMessage(ref);
return;
}
snfQueue.deliverAsync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
Expand Down Expand Up @@ -161,6 +162,8 @@ public void onError(int errorCode, String errorMessage) {

final ActiveMQServer server;

final Configuration configuration;

DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;

Expand All @@ -183,6 +186,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.basicController = new BasicMirrorController(server);
this.basicController.setLink(receiver);
this.server = server;
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
}
Expand Down Expand Up @@ -389,8 +393,8 @@ public boolean postAcknowledge(String queue,
}

if (logger.isTraceEnabled()) {
logger.trace("Server {} with queue = {} being acked for {} coming from {} targetQueue = {}",
server.getIdentity(), queue, messageID, messageID, targetQueue);
logger.trace("Server {} with queue = {} being acked for {} from {} targetQueue = {} reason = {}",
server.getIdentity(), queue, messageID, ackMessage, targetQueue, reason);
}

performAck(nodeID, targetQueue, messageID, ackMessage, reason);
Expand All @@ -407,7 +411,7 @@ private void performAck(String nodeID,
}

if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server, true);
ackManager = AckManagerProvider.getManager(server);
}

ackManager.ack(nodeID, targetQueue, messageID, reason, true);
Expand Down Expand Up @@ -473,7 +477,7 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
message.setAddress(internalAddress);
}

final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true);
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
Expand Down
Loading

0 comments on commit c523458

Please sign in to comment.