Skip to content

Commit

Permalink
ARTEMIS-4670 Slow processing with Large Messages and JDBC
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 7, 2024
1 parent c83ed89 commit cd8d63b
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,12 @@ public void write(EncodingSupport bytes, boolean sync) throws Exception {
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
if (callback == null) {
SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
final SimpleWaitIOCallback waitIOCallback = sync ? new SimpleWaitIOCallback() : null;
try {
scheduleWrite(bytes, waitIOCallback);
waitIOCallback.waitCompletion();
if (waitIOCallback != null) {
waitIOCallback.waitCompletion();
}
} catch (Exception e) {
waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file.");
fileFactory.onIOError(e, "Failed to write to file.", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && this.storageManager instanceof JournalStorageManager && !((JournalStorageManager) this.storageManager).isReplicated()) {
} else if ((bytesRead == bufferBytes.length && this.storageManager instanceof JournalStorageManager && !((JournalStorageManager) this.storageManager).isReplicated() &&
!(this.storageManager instanceof JDBCJournalStorageManager))) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
Expand All @@ -374,6 +375,7 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
break;
}
}
newMessage.releaseResources(true, false);
} finally {
cloneFile.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.db.largeMessages;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.tests.db.common.Database;
import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.activemq.artemis.utils.TestParameters.testProperty;

public class RealServerDatabaseLargeMessageTest extends ParameterDBTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String TEST_NAME = "LMDB";

private static final int MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "300000"));

private static final int PRODUCERS = 50;

private static final int MESSAGES_PER_PRODUCER = 2;

private ExecutorService executorService;

Process serverProcess;

@Parameterized.Parameters(name = "db={0}")
public static Collection<Object[]> parameters() {
return convertParameters(Database.selectedList());
}


@Before
public void before() throws Exception {
serverProcess = startServer(database.getName(), 0, 60_000);
executorService = Executors.newFixedThreadPool(PRODUCERS + 1); // there will be one consumer
runAfter(executorService::shutdownNow);
}

@Test
public void testLargeMessage() throws Exception {
testLargeMessage("CORE");
testLargeMessage("AMQP");
testLargeMessage("OPENWIRE");
}

public void testLargeMessage(String protocol) throws Exception {
logger.info("testLargeMessage({})", protocol);
final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" + protocol + "_" + database;

ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");

AtomicInteger errors = new AtomicInteger(0);
CyclicBarrier startflag = new CyclicBarrier(PRODUCERS);
CountDownLatch done = new CountDownLatch(PRODUCERS + 1);

byte[] messageLoad = RandomUtil.randomBytes(MESSAGE_SIZE);

for (int i = 0; i < PRODUCERS; i++) {
executorService.execute(() -> {
try {
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);

// align all producers right before start sending
startflag.await(5, TimeUnit.SECONDS);

for (int messageI = 0; messageI < MESSAGES_PER_PRODUCER; messageI++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageLoad);
producer.send(message);
session.commit();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});
}

executorService.execute(() -> {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
for (int messageI = 0; messageI < PRODUCERS * MESSAGES_PER_PRODUCER; messageI++) {
BytesMessage message = (BytesMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
logger.debug("Received message");
Assert.assertEquals(messageLoad.length, message.getBodyLength());
byte[] messageRead = new byte[(int)message.getBodyLength()];
message.readBytes(messageRead);
Assert.assertArrayEquals(messageLoad, messageRead);
if (messageI % 5 == 0) {
session.commit();
}
}
session.commit();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});

Assert.assertTrue(done.await(120, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,7 @@ public void testLargeMessageBodySize() throws Exception {
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
fileMessage.releaseResources(true, false);

Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());

Expand Down Expand Up @@ -2725,7 +2726,7 @@ public void testSendServerMessageMetrics() throws Exception {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);

fileMessage.releaseResources(false, false);
fileMessage.releaseResources(true, false);

prod.send(fileMessage);

Expand Down

0 comments on commit cd8d63b

Please sign in to comment.