From 53f427a6f1182c7a66da67d31c7fe9866c7e0028 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 10 Jul 2017 16:38:37 +0100 Subject: [PATCH] ARTEMIS-1268 Fix LargeMessages over STOMP --- .../core/protocol/stomp/StompSession.java | 5 +- .../stomp/VersionedStompFrameHandler.java | 2 - .../tests/integration/stomp/StompTest.java | 54 ++++++++++++++----- .../integration/stomp/StompTestBase.java | 3 ++ 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 797a9668d33..03b5757bb47 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -389,8 +389,9 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except long id = storageManager.generateID(); LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); - byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET]; - message.getBodyBuffer().readBytes(bytes); + ActiveMQBuffer body = message.getReadOnlyBodyBuffer(); + byte[] bytes = new byte[body.readableBytes()]; + body.readBytes(bytes); largeMessage.addBytes(bytes); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index c5fc8f1ad51..df6d9b08777 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -299,8 +299,6 @@ public StompFrame createMessageFrame(ICoreMessage serverMessage, ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer(); - int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition(); - int size = buffer.writerIndex(); byte[] data = new byte[size]; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index f023cfb25c8..c2f19648d8f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.stomp; +import javax.jms.BytesMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.HashSet; @@ -26,13 +32,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.jms.BytesMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -159,12 +158,16 @@ public void onMessage(Message arg0) { .setMaxUsage(0) .tick(); - for (int i = 1; i <= count; i++) { - // Thread.sleep(1); - // log.info(">>> " + i); - send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); + // Connection should be closed by broker when disk is full and attempt to send + Exception e = null; + try { + for (int i = 1; i <= count; i++) { + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); + } + } catch (Exception se) { + e = se; } - + assertNotNull(e); // It should encounter the exception on logs AssertionLoggerHandler.findText("AMQ119119"); } finally { @@ -254,6 +257,33 @@ public void sendSTOMPReceiveMQTT() throws Exception { clientProvider.disconnect(); } + @Test + public void testSendReceiveLargeMessage() throws Exception { + String address = "testLargeMessageAddress"; + server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false); + + // STOMP default is UTF-8 == 1 byte per char. + int largeMessageStringSize = 10 * 1024 * 1024; // 10MB + StringBuilder b = new StringBuilder(largeMessageStringSize); + for (int i = 0; i < largeMessageStringSize; i++) { + b.append('t'); + } + String payload = b.toString(); + + // Set up STOMP subscription + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true); + + // Send Large Message + System.out.println("Sending Message Size: " + largeMessageStringSize); + send(conn, address, null, payload); + + // Receive STOMP Message + ClientStompFrame frame = conn.receiveFrame(); + System.out.println(frame.getBody().length()); + assertTrue(frame.getBody().equals(payload)); + } + @Test public void sendMQTTReceiveSTOMP() throws Exception { String payload = "This is a test message"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index f885659194d..4e848576b2a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -137,6 +137,7 @@ public void setUp() throws Exception { connection.start(); } + /** * @return * @throws Exception @@ -168,6 +169,8 @@ protected JMSServerManager createServer() throws Exception { config.setOutgoingInterceptorClassNames(getOutgoingInterceptors()); } + config.setPersistenceEnabled(true); + ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); if (isSecurityEnabled()) {