Skip to content

Commit

Permalink
ARTEMIS-4670 Slow processing with Large Messages and JDBC
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 7, 2024
1 parent c83ed89 commit b63aaae
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,6 +102,29 @@ protected synchronized void init(Configuration config, IOCriticalErrorListener c
}
}


@Override
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final byte[] bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
OperationContext context = getContext(true);
context.storeLineUp();
file.writeDirect(ByteBuffer.wrap(bytes), false, context);
}
}

@Override
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final ActiveMQBuffer bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
final byte[] bytesCopy = new byte[bytes.readableBytes()];
bytes.readBytes(bytesCopy);
addBytesToLargeMessage(file, messageId, bytesCopy);
}
}

@Override
public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ public void stopReplication() {
}

@Override
public final void addBytesToLargeMessage(final SequentialFile file,
public void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final ActiveMQBuffer bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
Expand Down Expand Up @@ -859,7 +859,7 @@ private void historyBody(long messageId, EncodingSupport partialBuffer) {
}

@Override
public final void addBytesToLargeMessage(final SequentialFile file,
public void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final byte[] bytes) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
} else if (bytesRead == bufferBytes.length && this.storageManager instanceof JournalStorageManager && !((JournalStorageManager) this.storageManager).isReplicated()) {
} else if ((bytesRead == bufferBytes.length && this.storageManager instanceof JournalStorageManager && !((JournalStorageManager) this.storageManager).isReplicated() &&
!(this.storageManager instanceof JDBCJournalStorageManager))) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
Expand All @@ -374,6 +375,7 @@ public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipB
break;
}
}
newMessage.releaseResources(true, false);
} finally {
cloneFile.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.db.largeMessages;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.tests.db.common.Database;
import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.activemq.artemis.utils.TestParameters.testProperty;

public class RealServerDatabaseLargeMessageTest extends ParameterDBTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String TEST_NAME = "LMDB";

private static final int MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "300000"));

private static final int PRODUCERS = 50;

private static final int MESSAGES_PER_PRODUCER = 2;

private ExecutorService executorService;

Process serverProcess;

@Parameterized.Parameters(name = "db={0}")
public static Collection<Object[]> parameters() {
return convertParameters(Database.selectedList());
}


@Before
public void before() throws Exception {
serverProcess = startServer(database.getName(), 0, 60_000);
executorService = Executors.newFixedThreadPool(PRODUCERS + 1); // there will be one consumer
runAfter(executorService::shutdownNow);
}

@Test
public void testLargeMessage() throws Exception {
testLargeMessage("CORE");
testLargeMessage("AMQP");
testLargeMessage("OPENWIRE");
}

public void testLargeMessage(String protocol) throws Exception {
logger.info("testLargeMessage({})", protocol);
final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" + protocol + "_" + database;

ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");

AtomicInteger errors = new AtomicInteger(0);
CyclicBarrier startflag = new CyclicBarrier(PRODUCERS);
CountDownLatch done = new CountDownLatch(PRODUCERS + 1);

byte[] messageLoad = RandomUtil.randomBytes(MESSAGE_SIZE);

for (int i = 0; i < PRODUCERS; i++) {
executorService.execute(() -> {
try {
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);

// align all producers right before start sending
startflag.await(5, TimeUnit.SECONDS);

for (int messageI = 0; messageI < MESSAGES_PER_PRODUCER; messageI++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageLoad);
producer.send(message);
session.commit();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});
}

executorService.execute(() -> {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
for (int messageI = 0; messageI < PRODUCERS * MESSAGES_PER_PRODUCER; messageI++) {
BytesMessage message = (BytesMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
logger.debug("Received message");
Assert.assertEquals(messageLoad.length, message.getBodyLength());
byte[] messageRead = new byte[(int)message.getBodyLength()];
message.readBytes(messageRead);
Assert.assertArrayEquals(messageRead, messageLoad);
if (messageI % 5 == 0) {
session.commit();
}
}
session.commit();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});

Assert.assertTrue(done.await(120, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,7 @@ public void testLargeMessageBodySize() throws Exception {
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
fileMessage.releaseResources(true, false);

Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());

Expand Down Expand Up @@ -2725,7 +2726,7 @@ public void testSendServerMessageMetrics() throws Exception {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);

fileMessage.releaseResources(false, false);
fileMessage.releaseResources(true, false);

prod.send(fileMessage);

Expand Down

0 comments on commit b63aaae

Please sign in to comment.