Skip to content

Commit

Permalink
ARTEMIS-4278 Incorrect Paging Counters with Prepared Transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed May 11, 2023
1 parent 5320bd0 commit bea39f6
Show file tree
Hide file tree
Showing 10 changed files with 576 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public interface PageTransactionInfo extends EncodingSupport {

void setCommitted(boolean committed);

void reloadPrepared(Transaction transaction);

/* When we reload a transaction,
* We may have to add the counters after commit. */
Transaction getPreparedTransaction();

void commit();

void rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@

import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

Expand All @@ -44,18 +48,20 @@ public class PageCounterRebuildManager implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final PagingStore pgStore;
private final PagingManager pagingManager;
private final StorageManager sm;
private final LongHashSet transactions;
private final Map<Long, PageTransactionInfo> transactions;
private boolean paging;
private long limitPageId;
private int limitMessageNr;
private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
private final Set<Long> storedLargeMessages;


public PageCounterRebuildManager(PagingStore store, LongHashSet transactions, Set<Long> storedLargeMessages) {
public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store, Map<Long, PageTransactionInfo> transactions, Set<Long> storedLargeMessages) {
// we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
initialize(store);
this.pagingManager = pagingManager;
this.pgStore = store;
this.sm = store.getStorageManager();
this.transactions = transactions;
Expand Down Expand Up @@ -241,28 +247,64 @@ public void rebuild() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
}

PageTransactionInfo txInfo = null;

if (msg.getTransactionID() > 0) {
txInfo = transactions.get(msg.getTransactionID());
}

Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();

if (logger.isTraceEnabled()) {
if (logger.isTraceEnabled()) {
logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
}
}

for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());

boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID());
// if the pageTransaction is in prepare state, we have to increment the counter after the commit
// notice that there is a check if the commit is done in afterCommit
if (preparedTX != null) {
PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
preparedTX.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
// We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
// on that case the increment will wait any pending tasks on that executor to finish before this executor takes effect
pagingManager.execute(() -> {
try {
subscription.getCounter().increment(null, 1, msg.getStoredSize());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
});

if (!txOK) {
logger.debug("TX is not ok for {}", msg);
}
} else {
boolean txOK = msg.getTransactionID() <= 0 || transactions == null || txInfo != null;

if (ok && txOK) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
if (!txOK) {
logger.debug("TX is not ok for {}", msg);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);

if (ok && txOK) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,13 @@ public void reloadACK(final PagePosition position) {
public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
deliveredCount.incrementAndGet();
installTXCallback(tx, position);

try {
counter.increment(tx, -1, -position.getPersistentSize());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {

private long transactionID;

private volatile Transaction preparedTX;

private volatile long recordID = -1;

private volatile boolean committed = false;
Expand All @@ -73,6 +75,10 @@ public PageTransactionInfoImpl(final long transactionID) {
public PageTransactionInfoImpl() {
}

@Override
public Transaction getPreparedTransaction() {
return preparedTX;
}

@Override
public long getRecordID() {
Expand Down Expand Up @@ -161,6 +167,7 @@ public synchronized void commit() {
}
committed = true;
lateDeliveries = null;
preparedTX = null;
}

@Override
Expand Down Expand Up @@ -225,13 +232,20 @@ public void setCommitted(final boolean committed) {
this.committed = committed;
}

@Override
public void reloadPrepared(final Transaction tx) {
this.preparedTX = tx;
this.committed = false;
}

@Override
public boolean isRollback() {
return rolledback;
}

@Override
public synchronized void rollback() {
preparedTX = null;
rolledback = true;
committed = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
Expand All @@ -44,7 +45,6 @@
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -577,12 +577,12 @@ public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transaction

@Override
public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
LongHashSet transactionsSet = new LongHashSet();
transactions.forEach((txId, tx) -> {
transactionsSet.add(txId);
});
Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
// making a copy
transactions.forEach(transactionsSet::put);
transactionsSet.forEach((a, b) -> System.out.println(a + " = " + b));
stores.forEach((address, pgStore) -> {
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet, storedLargeMessages);
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages);
logger.debug("Setting destination {} to rebuild counters", address);
managerExecutor.execute(rebuildManager);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ private void loadSinglePreparedTransaction(PostOffice postOffice,
pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
}
} else {
pageTransactionInfo.setCommitted(false);
pageTransactionInfo.reloadPrepared(tx);

tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,9 +1748,9 @@ public long getMessageCount() {
// counted on the pageSubscription as well
long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
if (logger.isTraceEnabled()) {
logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}",
name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(),
pageSubscription.getDeliveredCount());
logger.trace("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}, \n\tpageSubscription.getDeliveredCount()={}",
name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(),
pageSubscription.getDeliveredCount());
}
return returnValue;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,21 @@ public void markAsRollbackOnly(final ActiveMQException exception) {

@Override
public synchronized void addOperation(final TransactionOperation operation) {
checkCreateOperations();

operations.add(operation);
// We do this check, because in the case of XA Transactions and paging,
// the commit could happen while the counters are being rebuilt.
// if the state is commited we should execute it right away.
// this is just to avoid a race.
switch (state) {
case COMMITTED:
operation.afterCommit(this);
return;
case ROLLEDBACK:
operation.afterRollback(this);
return;
default:
checkCreateOperations();
operations.add(operation);
}
}

@Override
Expand Down

0 comments on commit bea39f6

Please sign in to comment.