Skip to content

Commit

Permalink
ARTEMIS-4171 potential large message file leak
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 16, 2023
1 parent c123a29 commit 82fc429
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,12 @@ private void sendContinuations(final int packetSize,
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, producers.get(senderID), false);
try {
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
} catch (Exception e) {
message.deleteFile();
throw e;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.nio.ByteBuffer;
Expand All @@ -39,6 +40,7 @@

import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand All @@ -62,7 +64,9 @@
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
Expand All @@ -77,7 +81,6 @@
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

public class LargeMessageTest extends LargeMessageTestBase {

Expand Down Expand Up @@ -326,6 +329,52 @@ public void testDeleteOnNoBinding() throws Exception {
validateNoFilesOnLargeDir();
}

@Test
public void testFileRemovalOnFailure() throws Exception {
final AtomicBoolean throwException = new AtomicBoolean(false);
final String queueName = RandomUtil.randomString();
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

ActiveMQServer server = createServer(true, isNetty(), storeType);

server.start();

server.registerBrokerPlugin(new ActiveMQServerMessagePlugin() {
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
if (throwException.get()) {
throw new ActiveMQException();
}
}
});

server.createQueue(new QueueConfiguration(queueName));

ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));

ClientSession session = addClientSession(sf.createSession(false, true, false));

ClientProducer producer = session.createProducer(queueName);

Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);

try {
throwException.set(true);
producer.send(clientFile);
fail("Should have thrown an exception here");
} catch (Exception e) {
// expected exception from plugin
} finally {
throwException.set(false);
}

assertEquals(0, server.locateQueue(queueName).getMessageCount());

session.close();

validateNoFilesOnLargeDir();
}


@Test
public void testPendingRecord() throws Exception {
Expand Down

0 comments on commit 82fc429

Please sign in to comment.