Skip to content

Commit

Permalink
[AMQ-9484] Support exporting kahadb messages from a queue with an offset
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Apr 24, 2024
1 parent 6e6caf7 commit 61e437e
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public interface MessageStore extends Service {

void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception;

void dispose(ConnectionContext context);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
delegate.recoverNextMessages(maxReturned, listener);
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener, keepCurrentNextMessageId);
}

@Override
public void resetBatching() {
delegate.resetBatching();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,32 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if(offset > 0 && offset > position) {
position++;
continue;
}
if (pastLackBatch) {
Object msg = entry.getValue();
lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId) msg);
} else {
listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
}
position++;
}
}
}

@Override
public void resetBatching() {
lastBatchId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ public boolean recoverMessageReference(String reference) throws Exception {

}

/**
* @param offset
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
@Override
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported.");
}

public void trackRollbackAck(Message message) {
synchronized (rolledBackAcks) {
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,52 @@ public void execute(Transaction tx) throws Exception {
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
long currentDefaultCursorPosition = sd.orderIndex.cursor.defaultCursorPosition;

try {
if(offset > 0) {
// setBatch does a +1 internally
sd.orderIndex.setBatch(tx, currentDefaultCursorPosition - 1 + offset);
}
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}

Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
sd.orderIndex.stoppedIterating();
} finally {
if(offset > 0 && keepCurrentNextMessageId) {
sd.orderIndex.setBatch(tx, currentDefaultCursorPosition - 1);
}
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}

protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
int counter = 0;
String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,43 @@ public void execute(Transaction tx) throws Exception {
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean keepCurrentNextMessageId) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageRecord> entry=null;
int counter = 0;
long currentCursorPosition = cursorPos;
try {
long recoverCursPos = cursorPos;
if(offset > 0) {
recoverCursPos = cursorPos + offset;
}
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, recoverCursPos); iterator.hasNext();) {
entry = iterator.next();

listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
counter++;
if( counter >= maxReturned ) {
break;
}
}
if( entry!=null ) {
cursorPos = entry.getKey()+1;
}
} finally {
if(offset > 0 && keepCurrentNextMessageId) {
cursorPos = currentCursorPosition;
}
}
}
});
}
}

@Override
public void resetBatching() {
cursorPos=0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean keepCurrentNextMessageId) throws Exception {

}

@Override
public void setBatch(MessageId message) {
batch.set((Long)message.getFutureOrSequenceLong());
Expand Down
Loading

0 comments on commit 61e437e

Please sign in to comment.