Skip to content

Commit

Permalink
ARTEMIS-2785 test Netty direct ByteBuf memory leak due to compression
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and clebertsuconic committed Jun 1, 2020
1 parent 7bda412 commit 6db63ac
Showing 1 changed file with 45 additions and 0 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
Expand All @@ -38,8 +39,12 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;

import javax.management.openmbean.CompositeData;

/**
Expand Down Expand Up @@ -129,6 +134,46 @@ public void testLargeMessageCompressionNotCompressedAndBrowsed() throws Exceptio
validateNoFilesOnLargeDir();
}

@Test
public void testNoDirectByteBufLeaksOnLargeMessageCompression() throws Exception {
Assume.assumeThat(PlatformDependent.usedDirectMemory(), not(equalTo(Long.valueOf(-1))));
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

ActiveMQServer server = createServer(true, isNetty());

server.start();

ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = addClientSession(sf.createSession(false, false, false));

session.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setDurable(false).setTemporary(true));

ClientProducer producer = session.createProducer(ADDRESS);

Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);

producer.send(clientFile);

session.commit();

session.start();

ClientConsumer consumer = session.createConsumer(ADDRESS);
final long usedDirectMemoryBeforeReceive = PlatformDependent.usedDirectMemory();
ClientMessage msg1 = consumer.receive(1000);
Assert.assertNotNull(msg1);
final long usedDirectMemoryAfterReceive = PlatformDependent.usedDirectMemory();
Assert.assertEquals("large message compression is leaking some Netty direct ByteBuff",
usedDirectMemoryBeforeReceive, usedDirectMemoryAfterReceive);
msg1.acknowledge();
session.commit();

consumer.close();

session.close();
}

@Test
public void testLargeMessageCompression() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Expand Down

0 comments on commit 6db63ac

Please sign in to comment.