diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index ed51ce8881d..791da179073 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -206,7 +206,19 @@ public synchronized LinkedList pageInList(int maxItems) { * @throws Exception */ @Override - public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { + public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { + // Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785) + List expiredMessages = new ArrayList<>(); + boolean isExpiration = tryAddMessageLastInternal(node, maxWaitTime, expiredMessages); + for (MessageReference expiredMessage : expiredMessages) { + discardExpiredMessage(expiredMessage); + } + return isExpiration; + } + + private synchronized boolean tryAddMessageLastInternal( + MessageReference node, long maxWaitTime, List expiredMessages + ) { if (!node.isExpired()) { try { regionDestination = (Destination) node.getMessage().getRegionDestination(); @@ -220,7 +232,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai } if (!hasSpace()) { if (isDiskListEmpty()) { - expireOldMessages(); + expiredMessages.addAll(expireOldMessages()); if (hasSpace()) { memoryList.addMessageLast(node); node.incrementReferenceCount(); @@ -242,7 +254,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai throw new RuntimeException(e); } } else { - discardExpiredMessage(node); + expiredMessages.add(node); } //message expired return true; @@ -254,7 +266,16 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai * @param node */ @Override - public synchronized void addMessageFirst(MessageReference node) { + public void addMessageFirst(MessageReference node) { + // Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785) + List expiredMessages = addMessageFirstInternal(node); + for (MessageReference expiredMessage : expiredMessages) { + discardExpiredMessage(expiredMessage); + } + } + + private synchronized List addMessageFirstInternal(MessageReference node) { + List expiredMessages = new ArrayList<>(); if (!node.isExpired()) { try { regionDestination = (Destination) node.getMessage().getRegionDestination(); @@ -263,16 +284,16 @@ public synchronized void addMessageFirst(MessageReference node) { memoryList.addMessageFirst(node); node.incrementReferenceCount(); setCacheEnabled(true); - return; + return expiredMessages; } } if (!hasSpace()) { if (isDiskListEmpty()) { - expireOldMessages(); + expiredMessages = expireOldMessages(); if (hasSpace()) { memoryList.addMessageFirst(node); node.incrementReferenceCount(); - return; + return expiredMessages; } else { flushToDisk(); } @@ -289,8 +310,9 @@ public synchronized void addMessageFirst(MessageReference node) { throw new RuntimeException(e); } } else { - discardExpiredMessage(node); + expiredMessages.add(node); } + return expiredMessages; } /** diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java new file mode 100644 index 00000000000..170bee45644 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.DeliveryMode; +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FullDestinationMemoryMessageExpirationTest { + private static final Logger LOG = LoggerFactory.getLogger(FullDestinationMemoryMessageExpirationTest.class); + + private static final long DESTINATION_MEMORY_LIMIT = 1024 * 1024; // 1 MB destination memory limit + private static final long BROKER_MEMORY_LIMIT = 64 * DESTINATION_MEMORY_LIMIT; // Broker memory limit has to be bigger than destination memory limit + private static final long BROKER_TEMP_USAGE_LIMIT = 64 * DESTINATION_MEMORY_LIMIT; + private static final String BROKER_DATA_DIRECTORY = "target/test-classes/" + + FullDestinationMemoryMessageExpirationTest.class.getName().replace('.', '/') + + "-activemq-data"; + private static final String BROKER_URL = "vm://" + BrokerService.DEFAULT_BROKER_NAME; + private static final String QUEUE_NAME = "NON_PERSISTENT_TEST"; + private static final String MESSAGE_ID_PROPERTY_NAME = "MessageId"; + + private BrokerService brokerService; + + @Before + public void setUp() throws Exception { + // Delete AMQ data directory + FileUtils.deleteDirectory(new File(BROKER_DATA_DIRECTORY)); + + // Configure/Start Broker + brokerService = new BrokerService(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setProducerFlowControl(false); + defaultEntry.setMemoryLimit(DESTINATION_MEMORY_LIMIT); + defaultEntry.setExpireMessagesPeriod(0); // Disable background message expiration process + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + brokerService.setUseJmx(false); + brokerService.setDataDirectory(BROKER_DATA_DIRECTORY); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File(brokerService.getBrokerDataDirectory(),"KahaDB")); + brokerService.setPersistenceAdapter(persistenceAdapter); + brokerService.getSystemUsage().getTempUsage().setLimit(BROKER_TEMP_USAGE_LIMIT); + brokerService.getSystemUsage().getMemoryUsage().setLimit(BROKER_MEMORY_LIMIT); + brokerService.start(); + } + + @After + public void tearDown() throws Exception { + try { + // Stop Broker + if(brokerService != null) + brokerService.stop(); + } finally { + // Delete AMQ data directory + FileUtils.deleteDirectory(new File(BROKER_DATA_DIRECTORY)); + } + } + + @Test + public void destinationMemoryFullMessageExpirationTest() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + + // Producer + final long messageTtl = 50L; + Producer producer = new Producer(connectionFactory, QUEUE_NAME, messageTtl, 512); + Thread produderThread = new Thread(producer, "Producer"); + produderThread.start(); + + // Wait till temporary storage is used + while(getTempPercentUsage(QUEUE_NAME) <= 0) { + Thread.sleep(20); + } + + // Consumer + Consumer consumer = new Consumer(connectionFactory, QUEUE_NAME); + Thread consumerThread = new Thread(consumer, "Consumer"); + consumerThread.start(); + + // Stop Producer after at least one message is received + while (consumer.getReceivedMessages().size() <= 0) { + Thread.sleep(20); + } + producer.stop(); + produderThread.join(); + assertNull(producer.getException()); + LOG.info(String.format( + "Producer: SentMessageCount=%d, ExpirationCount=%d, QueueSize=%d (MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%)", + producer.getSentMessages(), + getExpirationCount(QUEUE_NAME), + getQueueSize(QUEUE_NAME), + getMemoryPercentUsage(QUEUE_NAME), + getTempPercentUsage(QUEUE_NAME) + )); + + + // Wait till received messages + expiration messages = sentMessages (or timeout expired) + final long sentMessagesCount = producer.getSentMessages(); + final long deadlineMs = System.currentTimeMillis() + (60 * 1000); + long processedMessagesCount; + do { + processedMessagesCount = consumer.getReceivedMessages().size() + getExpirationCount(QUEUE_NAME); + } while(processedMessagesCount < sentMessagesCount && System.currentTimeMillis() < deadlineMs); + + + // Stop Consumer + consumer.stop(); + consumerThread.join(); + assertNull(consumer.getException()); + LOG.info(String.format( + "Consumer: ReceivedMessageCount=%d, ExpirationCount=%d, QueueSize=%d (SentMessageCount=%d, MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%)", + consumer.getReceivedMessages().size(), + getExpirationCount(QUEUE_NAME), + getQueueSize(QUEUE_NAME), + producer.getSentMessages(), + getMemoryPercentUsage(QUEUE_NAME), + getTempPercentUsage(QUEUE_NAME) + )); + assertEquals(0, getQueueSize(QUEUE_NAME)); + assertEquals(sentMessagesCount, processedMessagesCount); + } + + private long getQueueSize(String queueName) throws Exception { + org.apache.activemq.broker.region.Destination destination = brokerService.getDestination( + new ActiveMQQueue(queueName) + ); + return destination.getDestinationStatistics().getMessages().getCount(); + } + + private long getExpirationCount(String queueName) throws Exception { + org.apache.activemq.broker.region.Destination destination = brokerService.getDestination( + new ActiveMQQueue(queueName) + ); + return destination.getDestinationStatistics().getExpired().getCount(); + } + + private int getMemoryPercentUsage(String queueName) throws Exception { + org.apache.activemq.broker.region.Destination destination = brokerService.getDestination( + new ActiveMQQueue(queueName) + ); + return destination.getMemoryUsage().getPercentUsage(); + } + + private int getTempPercentUsage(String queueName) throws Exception { + org.apache.activemq.broker.region.Destination destination = brokerService.getDestination( + new ActiveMQQueue(queueName) + ); + return destination.getTempUsage().getPercentUsage(); + } + + private class Producer implements Runnable { + private static final String MESSAGE_DATA = "012346789"; + + private final AtomicLong messageId = new AtomicLong(); + private final AtomicReference exception = new AtomicReference<>(); + private final String queueName; + private final long messageTtl; + private final int messageBodySize; + private final Connection con; + private volatile boolean isStopped = false; + + public Producer( + ConnectionFactory connectionFactory, String queueName, long messageTtl, int messageBodySize + ) throws JMSException { + this.queueName = queueName; + this.messageTtl = messageTtl; + this.messageBodySize = messageBodySize; + this.con = connectionFactory.createConnection(); + } + + @Override + public void run() { + try { + while (!isStopped) { + Message message = sendMessage( + con, queueName, messageId.incrementAndGet(), messageTtl, messageBodySize + ); + logMessageSent(message); + } + } catch(Exception e) { + exception.set(e); + } finally { + try { + close(); + } catch(Exception e) { + exception.set(e); + } + } + } + + public void stop() { + isStopped = true; + } + + public long getSentMessages() { + return messageId.get(); + } + + public Exception getException() { + return exception.get(); + } + + private Message sendMessage( + Connection con, String queueName, long messageId, long messageTTL, int messageBodySize + ) throws Exception { + TextMessage message; + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + Destination destination = new ActiveMQQueue(queueName); + MessageProducer producer = session.createProducer(destination); + try { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + if (messageTTL > 0 ) { + producer.setTimeToLive(messageTTL); + } + message = session.createTextMessage(createMessageBody(messageBodySize)); + message.setStringProperty(MESSAGE_ID_PROPERTY_NAME, String.valueOf(messageId)); + producer.send(message); + } finally { + producer.close(); + } + } finally { + session.close(); + } + return message; + } + + private String createMessageBody(int size) { + StringBuilder sb = new StringBuilder(size); + int messageDataLength = MESSAGE_DATA.length(); + for (int i = 0; i < size; i++) { + sb.append(MESSAGE_DATA.charAt(i % messageDataLength)); + } + return sb.toString(); + } + + private void logMessageSent(Message message) throws Exception { + LOG.debug(String.format( + "MessageId %s sent (Count=%d), MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%, ExpirationCount=%d, QueueSize=%d", + message.getStringProperty(MESSAGE_ID_PROPERTY_NAME), + messageId.get(), + getMemoryPercentUsage(queueName), + getTempPercentUsage(queueName), + getExpirationCount(queueName), + getQueueSize(queueName) + )); + } + + private void close() throws JMSException { + con.close(); + } + + } + + private class Consumer implements Runnable { + private final List receivedMessages = new ArrayList<>(); + private final AtomicReference exception = new AtomicReference<>(); + private final String queueName; + private final Connection con; + private volatile boolean isStopped = false; + + public Consumer(ConnectionFactory connectionFactory, String queueName) throws JMSException { + this.queueName = queueName; + this.con = connectionFactory.createConnection(); + this.con.start(); + } + + @Override + public void run() { + try { + while (!isStopped) { + Message message = receiveMessage(con, queueName); + if (message != null) { + addReceivedMessage(message); + } + logMessageReceived(message); + } + } catch(Exception e) { + exception.set(e); + } finally { + try { + close(); + } catch(Exception e) { + exception.set(e); + } + } + } + + public void stop() { + isStopped = true; + } + + public List getReceivedMessages() { + synchronized(this) { + return new ArrayList<>(receivedMessages); + } + } + + public Exception getException() { + return exception.get(); + } + + private Message receiveMessage(Connection con, String queueName) throws Exception { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + Destination destination = new ActiveMQQueue(queueName); + MessageConsumer consumer = session.createConsumer(destination); + try { + return consumer.receive(1000); + } finally { + consumer.close(); + } + } finally { + session.close(); + } + } + + private void logMessageReceived(Message message) throws Exception { + int memoryPercentUsage = getMemoryPercentUsage(queueName); + int tempPercentUsage = getTempPercentUsage(queueName); + long expirationCount = getExpirationCount(queueName); + long queueSize = getQueueSize(queueName); + if(message != null) { + LOG.debug(String.format( + "MessageId %s received (Count=%d), MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%, ExpirationCount=%d, QueueSize=%d", + message.getStringProperty(MESSAGE_ID_PROPERTY_NAME), + receivedMessages.size(), + memoryPercentUsage, + tempPercentUsage, + expirationCount, + queueSize + )); + } else { + LOG.debug(String.format( + "Message wasn't receive, MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%, ExpirationCount=%d, QueueSize=%d", + memoryPercentUsage, + tempPercentUsage, + expirationCount, + queueSize + )); + } + } + + private void addReceivedMessage(Message message) { + synchronized(receivedMessages) { + receivedMessages.add(message); + } + } + + private void close() throws JMSException { + con.close(); + } + + } + +}