Navigation Menu

Skip to content

Commit

Permalink
ARTEMIS-1650 Improve paged message acknowledge
Browse files Browse the repository at this point in the history
Cache `messageID`, `transactionID` and `isLargeMessage`
in PagedReference, so that when acknowledge, we do not have to
get PagedMessage which may be GCed and cause re-read entire page.
  • Loading branch information
shoukunhuai authored and clebertsuconic committed Feb 8, 2018
1 parent 33b265c commit 822445a
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 4 deletions.
Expand Up @@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
PagePosition getPosition();

PagedMessage getPagedMessage();

boolean isLargeMessage();

long getTransactionID();
}
Expand Up @@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>

private Object protocolData;

private final boolean largeMessage;

private final long transactionID;

private final long messageID;

@Override
public Object getProtocolData() {
return protocolData;
Expand Down Expand Up @@ -95,6 +101,9 @@ public PagedReferenceImpl(final PagePosition position,
this.position = position;
this.message = new WeakReference<>(message);
this.subscription = subscription;
this.largeMessage = message.getMessage().isLargeMessage();
this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID();
}

@Override
Expand Down Expand Up @@ -256,4 +265,19 @@ public Long getConsumerId() {
return this.consumerId;
}

@Override
public boolean isLargeMessage() {
return largeMessage;
}

@Override
public long getTransactionID() {
return transactionID;
}

@Override
public long getMessageID() {
return messageID;
}

}
Expand Up @@ -849,8 +849,8 @@ private void installTXCallback(final Transaction tx, final PagePosition position
}

private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
if (reference.getPagedMessage().getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
if (reference.getTransactionID() >= 0) {
return pageStore.getPagingManager().getTransaction(reference.getTransactionID());
} else {
return null;
}
Expand Down
Expand Up @@ -38,6 +38,8 @@ public static MessageReference createReference(Message encode, final Queue queue

Message getMessage();

long getMessageID();

/**
* We define this method aggregation here because on paging we need to hold the original estimate,
* so we need to perform some extra steps on paging.
Expand Down
Expand Up @@ -237,6 +237,11 @@ public Message getMessage() {
return ref.getMessage();
}

@Override
public long getMessageID() {
return getMessage().getMessageID();
}

@Override
public Queue getQueue() {
return ref.getQueue();
Expand Down
Expand Up @@ -146,6 +146,11 @@ public Message getMessage() {
return message;
}

@Override
public long getMessageID() {
return getMessage().getMessageID();
}

@Override
public Queue getQueue() {
return queue;
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -159,7 +160,9 @@ public void afterCommit(final Transaction tx) {

if (pagedMessagesToPostACK != null) {
for (MessageReference refmsg : pagedMessagesToPostACK) {
decrementRefCount(refmsg);
if (((PagedReference) refmsg).isLargeMessage()) {
decrementRefCount(refmsg);
}
}
}
}
Expand Down
Expand Up @@ -867,7 +867,7 @@ public synchronized void acknowledge(Transaction tx, final long messageID) throw

acks++;
}
while (ref.getMessage().getMessageID() != messageID);
while (ref.getMessageID() != messageID);

if (startedTransaction) {
tx.commit();
Expand Down

0 comments on commit 822445a

Please sign in to comment.