Skip to content

Commit

Permalink
ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
Browse files Browse the repository at this point in the history
(cherry picked from commit 0866a2e)
  • Loading branch information
clebertsuconic committed Nov 28, 2022
1 parent b471392 commit 7e00a70
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ private static Message readEncoded(ICoreMessage message, StorageManager storageM
private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) {
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));

return readEncoded(message, storageManager, buffer);
Message largeMessageReturn = readEncoded(message, storageManager, buffer);

if (message instanceof LargeServerMessage && largeMessageReturn instanceof LargeServerMessage) {
LargeServerMessage returnMessage = (LargeServerMessage) largeMessageReturn;
LargeServerMessage sourceMessage = (LargeServerMessage) message;
returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
}

return largeMessageReturn;
}

private static boolean checkSignature(final ActiveMQBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ private void internalTest(boolean useDivert) throws Exception {
}

sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10);
server.stop();
server.start();
receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10);
if (useDivert) {
// We diverted, so messages were copied, we need to make sure we consume from the original queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";

Process server0Process;
Process server1Process;

@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
server1Process = startServer(SERVER_NAME_0, 0, 30000);
startServer(SERVER_NAME_1, 100, 30000);
server0Process = startServer(SERVER_NAME_0, 0, 30000);
server1Process = startServer(SERVER_NAME_1, 100, 30000);
}

@Test
Expand Down Expand Up @@ -96,5 +97,78 @@ private void internalTestLargeMessge(String protocol) throws Exception {
connection1.close();
connection2.close();
}

@Test
public void testKillWhileSendingLargeCORE() throws Exception {
testKillWhileSendingLarge("CORE");
}

@Test
public void testKillWhileSendingLargeAMQP() throws Exception {
testKillWhileSendingLarge("AMQP");
}

public void testKillWhileSendingLarge(String protocol) throws Exception {

ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716");
Connection keepConsumerConnection = server2CF.createConnection();
Session keepConsumerSession = keepConsumerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// a consumer that we should keep to induce message redistribution
MessageConsumer keepConsumer = keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));

String largeBody;
{
StringBuffer largeBodyBuffer = new StringBuffer();
while (largeBodyBuffer.length() < 1024 * 1024) {
largeBodyBuffer.append("This is large ");
}
largeBody = largeBodyBuffer.toString();
}

int NMESSAGES = 10;

ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection1 = server1CF.createConnection()) {
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
Queue queue1 = session1.createQueue("testQueue");
MessageProducer producer1 = session1.createProducer(queue1);
for (int i = 0; i < NMESSAGES; i++) {
TextMessage message = session1.createTextMessage(largeBody);
message.setStringProperty("i", Integer.toString(i));
producer1.send(message);

if (i == 5) {
session1.commit();
}
}
session1.commit();
}

keepConsumerConnection.close();
server1Process.destroyForcibly();
server1Process = startServer(SERVER_NAME_1, 100, 0);

for (int i = 0; i < 100; i++) {
// retrying the connection until the server is up
try (Connection willbegone = server2CF.createConnection()) {
break;
} catch (Exception ignored) {
Thread.sleep(100);
}
}

try (Connection connection2 = server2CF.createConnection()) {
Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session2.createQueue("testQueue");
MessageConsumer consumer2 = session2.createConsumer(queue2);
connection2.start();

for (int i = 0; i < NMESSAGES; i++) {
TextMessage message = (TextMessage) consumer2.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(largeBody, message.getText());
}
}
}
}

0 comments on commit 7e00a70

Please sign in to comment.