Skip to content

Commit

Permalink
ARTEMIS-4193 Large Message Files orphaned after server killed
Browse files Browse the repository at this point in the history
This fix is scanning journal and paging for existing large messages. We will remove any large messages that do not have a corresponding record in journals or paging.
  • Loading branch information
clebertsuconic committed Mar 9, 2023
1 parent 7a0bf52 commit 257dd86
Show file tree
Hide file tree
Showing 33 changed files with 766 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,26 +459,6 @@ public LargeBody getLargeBody() {
return largeBody;
}

@Override
public void clearPendingRecordID() {
largeBody.clearPendingRecordID();
}

@Override
public boolean hasPendingRecord() {
return largeBody.hasPendingRecord();
}

@Override
public void setPendingRecordID(long pendingRecordID) {
largeBody.setPendingRecordID(pendingRecordID);
}

@Override
public long getPendingRecordID() {
return largeBody.getPendingRecordID();
}

@Override
protected void releaseComplete() {
largeBody.releaseComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4648,7 +4648,7 @@ public boolean isEmbeddedWebServerStarted() {
public void rebuildPageCounters() throws Exception {
// managementLock will guarantee there's only one management operation being called
try (AutoCloseable lock = server.managementLock()) {
Future<Object> task = server.getPagingManager().rebuildCounters();
Future<Object> task = server.getPagingManager().rebuildCounters(null);
task.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.paging;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -85,6 +86,11 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository

void injectMonitor(FileStoreMonitor monitor) throws Exception;

/** Execute a runnable inside the PagingManager's executor */
default void execute(Runnable runnable) {
throw new UnsupportedOperationException("not implemented");
}

/**
* Lock the manager. This method should not be called during normal PagingManager usage.
*/
Expand Down Expand Up @@ -164,7 +170,7 @@ default long getMaxMessages() {
/**
* Rebuilds all page counters for destinations that are paging in the background.
*/
default Future<Object> rebuildCounters() {
default Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.function.BiConsumer;

/** this class will copy current data from the Subscriptions, count messages while the server is already active
Expand All @@ -49,14 +50,16 @@ public class PageCounterRebuildManager implements Runnable {
private long limitPageId;
private int limitMessageNr;
private LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new LongObjectHashMap<>();
private final Set<Long> storedLargeMessages;


public PageCounterRebuildManager(PagingStore store, LongHashSet transactions) {
public PageCounterRebuildManager(PagingStore store, LongHashSet transactions, Set<Long> storedLargeMessages) {
// we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
initialize(store);
this.pgStore = store;
this.sm = store.getStorageManager();
this.transactions = transactions;
this.storedLargeMessages = storedLargeMessages;
}
/** this method will perform the copy from Acked recorded from the subscription into a separate data structure.
* So we can count data while we consolidate at the end */
Expand Down Expand Up @@ -216,6 +219,12 @@ public void rebuild() throws Exception {
try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
while (iter.hasNext()) {
PagedMessage msg = iter.next();
if (storedLargeMessages != null && msg.getMessage().isLargeMessage()) {
if (logger.isDebugEnabled()) {
logger.debug("removing storedLargeMessage {}", msg.getMessage().getMessageID());
}
storedLargeMessages.remove(msg.getMessage().getMessageID());
}
if (limitPageId == pgid) {
if (msg.getMessageNumber() >= limitMessageNr) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ protected void checkMemoryRelease() {
}
}

@Override
public void execute(Runnable run) {
managerExecutor.execute(run);
}

@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
pagingStoreFactory.injectMonitor(monitor);
Expand Down Expand Up @@ -569,13 +574,13 @@ public void forEachTransaction(BiConsumer<Long, PageTransactionInfo> transaction
}

@Override
public Future<Object> rebuildCounters() {
public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
LongHashSet transactionsSet = new LongHashSet();
transactions.forEach((txId, tx) -> {
transactionsSet.add(txId);
});
stores.forEach((address, pgStore) -> {
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet);
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet, storedLargeMessages);
logger.debug("Setting destination {} to rebuild counters", address);
managerExecutor.execute(rebuildManager);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ default long getMaxRecordSize() {
return Long.MAX_VALUE;
}

default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception {
}

default SequentialFileFactory getJournalSequentialFileFactory() {
return null;
}
Expand Down Expand Up @@ -281,12 +284,24 @@ default SequentialFile createFileForLargeMessage(long messageID, boolean durable

void deletePageTransactional(long recordID) throws Exception;

default JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, null, pendingNonTXPageCounter, journalLoader);
}

JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -925,6 +924,7 @@ public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
Map<Long, QueueBindingInfo> queueInfos,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
final Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
final JournalLoader journalLoader) throws Exception {
SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
Expand Down Expand Up @@ -995,6 +995,10 @@ final class MutableLong {

messages.put(record.id, largeMessage.toMessage());

if (storedLargeMessages != null) {
storedLargeMessages.remove(largeMessage.getMessageID());
}

largeMessages.add(largeMessage);

break;
Expand All @@ -1007,6 +1011,10 @@ final class MutableLong {

Message message = decodeMessage(pools, buff);

if (message.isLargeMessage() && storedLargeMessages != null) {
storedLargeMessages.remove(message.getMessageID());
}

messages.put(record.id, message);

break;
Expand Down Expand Up @@ -1272,13 +1280,18 @@ final class MutableLong {

journalLoader.handleAddMessage(queueMap);

loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, journalLoader);
loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, storedLargeMessages, journalLoader);

for (PageSubscription sub : pageSubscriptions.values()) {
sub.getCounter().processReload();
}

for (LargeServerMessage msg : largeMessages) {
if (storedLargeMessages != null && storedLargeMessages.remove(msg.getMessageID())) {
if (logger.isDebugEnabled()) {
logger.debug("Large message in folder removed on {}", msg.getMessageID());
}
}
if (msg.toMessage().getRefCount() == 0 && msg.toMessage().getDurableCount() == 0) {
ActiveMQServerLogger.LOGGER.largeMessageWithNoRef(msg.getMessageID());
msg.toMessage().usageDown();
Expand Down Expand Up @@ -1753,23 +1766,6 @@ public Journal getBindingsJournal() {
return bindingsJournal;
}

protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
synchronized (largeServerMessage) {
if (largeServerMessage.getPendingRecordID() >= 0) {
try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.clearPendingRecordID();
} catch (ActiveMQShutdownException e) {
// this may happen, this is asynchronous as all that would happen is we missed the update
// since the update was missed, next restart this operation will be retried
logger.debug(e.getMessage(), e);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
}

protected abstract LargeServerMessage parseLargeMessage(ActiveMQBuffer buff) throws Exception;

private void loadPreparedTransactions(final PostOffice postOffice,
Expand All @@ -1780,13 +1776,14 @@ private void loadPreparedTransactions(final PostOffice postOffice,
final BiConsumer<PreparedTransactionInfo, Throwable> failedTransactionCallback,
final Map<Long, PageSubscription> pageSubscriptions,
final Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
JournalLoader journalLoader) throws Exception {
// recover prepared transactions
final CoreMessageObjectPools pools = new CoreMessageObjectPools();

for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
try {
loadSinglePreparedTransaction(postOffice, pagingManager, resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, journalLoader, pools, preparedTransaction);
loadSinglePreparedTransaction(postOffice, pagingManager, resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, storedLargeMessages, journalLoader, pools, preparedTransaction);
} catch (Throwable e) {
if (failedTransactionCallback != null) {
failedTransactionCallback.accept(preparedTransaction, e);
Expand All @@ -1803,6 +1800,7 @@ private void loadSinglePreparedTransaction(PostOffice postOffice,
Map<Long, QueueBindingInfo> queueInfos,
Map<Long, PageSubscription> pageSubscriptions,
Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
JournalLoader journalLoader,
CoreMessageObjectPools pools,
PreparedTransactionInfo preparedTransaction) throws Exception {
Expand All @@ -1829,6 +1827,11 @@ private void loadSinglePreparedTransaction(PostOffice postOffice,

switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE: {
if (storedLargeMessages != null && storedLargeMessages.remove(record.id)) {
if (logger.isDebugEnabled()) {
logger.debug("PreparedTX/AddLargeMessage load removing stored large message {}", record.id);
}
}
messages.put(record.id, parseLargeMessage(buff).toMessage());

break;
Expand All @@ -1839,6 +1842,9 @@ private void loadSinglePreparedTransaction(PostOffice postOffice,
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = decodeMessage(pools, buff);
if (storedLargeMessages != null && message.isLargeMessage() && storedLargeMessages.remove(record.id)) {
logger.debug("PreparedTX/AddMessgeProtocol load removing stored large message {}", record.id);
}

messages.put(record.id, message);

Expand Down

0 comments on commit 257dd86

Please sign in to comment.