Skip to content

Commit

Permalink
ARTEMIS-4171 Messages leaking thorugh AMQP Delivery
Browse files Browse the repository at this point in the history
there are two leaks here:

* QueueImpl::delivery might create a new iterator if a delivery happens right after a consumer was removed, and that iterator might belog to a consumer that was already closed
             as a result of that, the iterator may leak messages and hold references until a reboot is done. I have seen scenarios where messages would not be dleivered because of this.

* ProtonTransaction holding references: the last transaction might hold messages in the memory longer than expected. In tests I have performed the messages were accumulating in memory. and I cleared it here.
  • Loading branch information
clebertsuconic committed Feb 28, 2023
1 parent 9e524d9 commit 8078dd0
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ public int size() {

@Override
public LinkedListIterator<E> iterator() {
if (logger.isTraceEnabled()) {
logger.trace("Creating new iterator at", new Exception("trace location"));
}
return new Iterator();
}

Expand Down Expand Up @@ -434,6 +437,9 @@ private synchronized void resize(int newSize) {
}

private synchronized void removeIter(Iterator iter) {
if (logger.isTraceEnabled()) {
logger.trace("Removing iterator at", new Exception("trace location"));
}
for (int i = 0; i < numIters; i++) {
if (iter == iters[i]) {
iters[i] = null;
Expand All @@ -449,8 +455,10 @@ private synchronized void removeIter(Iterator iter) {
if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters == iters.length / 2) {
resize(numIters);
}

nextIndex--;
if (nextIndex < iters.length) {
iters[nextIndex] = null;
}

return;
}
Expand Down Expand Up @@ -515,8 +523,7 @@ public String toString() {
}
}

private class Iterator implements LinkedListIterator<E> {

public class Iterator implements LinkedListIterator<E> {
Node<E> last;

Node<E> current = head.next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,14 +743,9 @@ public void onRemoteClose(Link link) throws Exception {
}
}

/// we have to perform the link.close after the linkContext.close is finished.
// linkeContext.close will perform a few executions on the netty loop,
// this has to come next
runLater(() -> {
link.close();
link.free();
flush();
});
link.close();
link.free();
flush();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
OperationContext oldContext = sessionSPI.recoverContext();

try {
Message message = ((MessageReference) delivery.getContext()).getMessage();
MessageReference reference = (MessageReference) delivery.getContext();
Message message = reference != null ? reference.getMessage() : null;

DeliveryState remoteState = delivery.getRemoteState();

if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,38 @@ public class ProtonTransactionImpl extends TransactionImpl {
deliveries have been settled. We also need to ensure we are settling on the correct link. Hence why we keep a ref
to the ProtonServerSenderContext here.
*/
private final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();
final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>> deliveries = new HashMap<>();

private boolean discharged;

private static class TXOperations extends TransactionOperationAbstract {
final ProtonTransactionImpl protonTransaction;
final AMQPConnectionContext connection;

TXOperations(AMQPConnectionContext connection, ProtonTransactionImpl tx) {
this.protonTransaction = tx;
this.connection = connection;
}

@Override
public void afterCommit(Transaction tx) {
super.afterCommit(tx);
connection.runNow(() -> {
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : protonTransaction.deliveries.values()) {
if (!p.getA().isSettled())
p.getB().settle(p.getA());
}
connection.flush();
protonTransaction.deliveries.forEach((a, b) -> b.getA().setContext(null));
protonTransaction.deliveries.clear();
});
}
}

public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
super.afterCommit(tx);
connection.runNow(() -> {
// Settle all unsettled deliveries if commit is successful
for (Pair<Delivery, ProtonServerSenderContext> p : deliveries.values()) {
if (!p.getA().isSettled())
p.getB().settle(p.getA());
}
connection.flush();
});
}
});
addOperation(new TXOperations(connection, this));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ default boolean supportsDirectDelivery() {
default void promptDelivery() {
}

default boolean isClosed() {
return false;
}

/**
* This will proceed with the actual delivery.
* Notice that handle should hold a readLock and proceedDelivery should release the readLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public void removed(T t) {

@Override
public boolean remove(T t) {
iterator.removed(t);
boolean result = consumers.remove(t);
if (result) {
iterator.removed(t);
iterator.update(consumers.resettableIterator());
if (consumers.isEmpty()) {
reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,11 @@ public String debug() {
boolean foundRef = false;

synchronized (this) {
Iterator<MessageReference> iter = messageReferences.iterator();
while (iter.hasNext()) {
foundRef = true;
out.println("reference = " + iter.next());
try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
while (iter.hasNext()) {
foundRef = true;
out.println("reference = " + iter.next());
}
}
}

Expand Down Expand Up @@ -1483,7 +1484,7 @@ public void removeConsumer(final Consumer consumer) {
logger.debug("Removing consumer {}", consumer);

try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
synchronized (QueueImpl.this) {

boolean consumerRemoved = false;
for (ConsumerHolder holder : consumers) {
Expand Down Expand Up @@ -3060,7 +3061,7 @@ private boolean deliver() {
MessageReference ref;
Consumer handledconsumer = null;

synchronized (this) {
synchronized (QueueImpl.this) {

if (queueDestroyed) {
if (messageReferences.size() == 0) {
Expand Down Expand Up @@ -3094,6 +3095,14 @@ private boolean deliver() {
Consumer consumer = holder.consumer;
Consumer groupConsumer = null;

// we remove the consumerHolder when the Consumer is closed
// however the QueueConsumerIterator may hold a reference until the reset is called, which
// could happen a little later.
if (consumer.isClosed()) {
deliverAsync(true);
return false;
}

if (holder.iter == null) {
holder.iter = messageReferences.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public String debug() {

private boolean isClosed = false;

@Override
public boolean isClosed() {
return isClosed;
}

ServerConsumerMetrics metrics = new ServerConsumerMetrics();


Expand Down Expand Up @@ -618,11 +623,14 @@ public synchronized void close(final boolean failed) throws Exception {
server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
}

protocolContext = null;
messageQueue.getExecutor().execute(() -> {
protocolContext = null;

callback = null;
callback = null;

session = null;
});

session = null;
}

private void addLingerRefs() throws Exception {
Expand Down Expand Up @@ -1116,7 +1124,7 @@ public AtomicInteger getAvailableCredits() {
*/
@Override
public String toString() {
return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + "]";
return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ", binding=" + binding + ", closed=" + isClosed + "]";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import java.lang.invoke.MethodHandles;

import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerStatus;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -81,7 +85,7 @@ public void validateServer() throws Exception {
@Override
@Before
public void setUp() throws Exception {
server = createServer(true, createDefaultConfig(1, true));
server = createServer(false, createDefaultConfig(1, true));
server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
server.start();
}
Expand All @@ -102,6 +106,9 @@ public void testOpenWire() throws Exception {
}

private void doTest(String protocol) throws Exception {
CheckLeak checkLeak = new CheckLeak();
// Some protocols may create ServerConsumers
int originalConsumers = checkLeak.getAllObjects(ServerConsumerImpl.class).length;
int REPEATS = 100;
int MESSAGES = 20;
basicMemoryAsserts();
Expand Down Expand Up @@ -143,12 +150,17 @@ private void doTest(String protocol) throws Exception {
targetProducer.send(m);
}
Assert.assertNull(sourceConsumer.receiveNoWait());
consumerSession.commit();

Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
}
consumerSession.commit();
}
}
}

assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName());


// this is just to drain the messages
try (Connection targetConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) {
Session targetSession = targetConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand All @@ -160,6 +172,9 @@ private void doTest(String protocol) throws Exception {
}

Assert.assertNull(consumer.receiveNoWait());
assertMemory(new CheckLeak(), 0, DeliveryImpl.class.getName());
Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
consumer = null;
}

Queue sourceQueue = server.locateQueue("source");
Expand All @@ -173,6 +188,65 @@ private void doTest(String protocol) throws Exception {
}

basicMemoryAsserts();
}

@Test
public void testCheckIteratorsAMQP() throws Exception {
testCheckIterators("AMQP");
}

@Test
public void testCheckIteratorsOpenWire() throws Exception {
testCheckIterators("OPENWIRE");
}

@Test
public void testCheckIteratorsCORE() throws Exception {
testCheckIterators("CORE");
}

public void testCheckIterators(String protocol) throws Exception {
CheckLeak checkLeak = new CheckLeak();

String queueName = getName();

Queue queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));

ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
for (int i = 0; i < 10; i++) {
Connection connection = cf.createConnection();
connection.start();
for (int j = 0; j < 10; j++) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(queueName));
producer.send(session.createTextMessage("test"));
session.commit();
session.close();
}

for (int j = 0; j < 10; j++) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
consumer.receiveNoWait(); // it doesn't matter if it received or not, just doing something in the queue to kick the iterators
session.commit();
}
connection.close();

assertMemory(checkLeak, 0, 1, 1, ServerConsumerImpl.class.getName());
assertMemory(checkLeak, 0, 2, 1, LinkedListImpl.Iterator.class.getName());
}
}


private boolean validateClosedConsumers(CheckLeak checkLeak) throws Exception {
Object[] objecs = checkLeak.getAllObjects(ServerConsumerImpl.class);
for (Object obj : objecs) {
ServerConsumerImpl consumer = (ServerConsumerImpl) obj;
if (consumer.isClosed()) {
logger.info("References to closedConsumer {}\n{}", consumer, checkLeak.exploreObjectReferences(3, 1, true, consumer));
return false;
}
}
return true;
}
}

0 comments on commit 8078dd0

Please sign in to comment.