Skip to content

Commit

Permalink
ARTEMIS-4163 Fixing openwire race while chunkSend is happening
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Feb 10, 2023
1 parent 446df6d commit 69e21a0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,16 @@ public void physicalSend(Command command) throws IOException {
final int bufferSize = bytes.length;
final int maxChunkSize = protocolManager.getOpenwireMaxPacketChunkSize();

if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
chunkSend(bytes, bufferSize, maxChunkSize);
} else {
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
// We can't let any other packet to sneak in while chunkSend is happening.
// otherwise we may get wrong packts delivered
synchronized (transportConnection) {
if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
chunkSend(bytes, bufferSize, maxChunkSize);
} else {
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
transportConnection.write(buffer, false, false);
}
}
bufferSent();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -190,7 +192,7 @@ public void testFastLargeMessageProducerDropOnPaging() throws Exception {

@Test
public void testSendReceiveLargeMessageTX() throws Exception {
int NUMBER_OF_MESSAGES = 400;
int NUMBER_OF_MESSAGES = 1000;
int TX_SIZE = 100;

ExecutorService executorService = Executors.newFixedThreadPool(1);
Expand Down Expand Up @@ -220,8 +222,11 @@ public void testSendReceiveLargeMessageTX() throws Exception {
Queue queue = session.createQueue(lmAddress.toString());
MessageConsumer consumer = session.createConsumer(queue);
for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
TextMessage m = (TextMessage) consumer.receive(5000);
assertEquals(largeString, m.getText());
Message m = consumer.receive(5000);
Assert.assertNotNull(m);
if (m instanceof TextMessage) {
assertEquals(largeString, ((TextMessage) m).getText());
}
if (received > 0 && received % TX_SIZE == 0) {
logger.info("Received {} messages", received);
session.commit();
Expand All @@ -247,7 +252,14 @@ public void testSendReceiveLargeMessageTX() throws Exception {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
TextMessage message = session.createTextMessage(largeString);
Message message;
if (sent % 2 == 0) {
message = session.createTextMessage(largeString);
} else {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(largeString.getBytes(StandardCharsets.UTF_8));
message = bytesMessage;
}
producer.send(message);
if (sent > 0 && sent % TX_SIZE == 0) {
logger.info("Sent {} messages", sent);
Expand Down

0 comments on commit 69e21a0

Please sign in to comment.