Skip to content

Commit

Permalink
ARTEMIS-1268 Fix LargeMessages over STOMP
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaylor committed Jul 10, 2017
1 parent cc04b8f commit d8be0ef
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 10 deletions.
Expand Up @@ -389,7 +389,7 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except
long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);

byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
byte[] bytes = new byte[message.getBodyBuffer().readableBytes()];
message.getBodyBuffer().readBytes(bytes);

largeMessage.addBytes(bytes);
Expand Down
Expand Up @@ -299,8 +299,6 @@ public StompFrame createMessageFrame(ICoreMessage serverMessage,

ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();

int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();

int size = buffer.writerIndex();

byte[] data = new byte[size];
Expand Down
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.stomp;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
Expand All @@ -26,13 +32,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -254,6 +253,33 @@ public void sendSTOMPReceiveMQTT() throws Exception {
clientProvider.disconnect();
}

@Test
public void testSendReceiveLargeMessage() throws Exception {
String address = "testLargeMessageAddress";
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);

// STOMP default is UTF-8 == 1 byte per char.
int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
StringBuilder b = new StringBuilder(largeMessageStringSize);
for (int i = 0; i < largeMessageStringSize; i++) {
b.append('t');
}
String payload = b.toString();

// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);

// Send Large Message
System.out.println("Sending Message Size: " + largeMessageStringSize);
send(conn, address, null, payload);

// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
System.out.println(frame.getBody().length());
assertTrue(frame.getBody().equals(payload));
}

@Test
public void sendMQTTReceiveSTOMP() throws Exception {
String payload = "This is a test message";
Expand Down
Expand Up @@ -137,6 +137,7 @@ public void setUp() throws Exception {
connection.start();
}


/**
* @return
* @throws Exception
Expand Down Expand Up @@ -168,6 +169,8 @@ protected JMSServerManager createServer() throws Exception {
config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
}

config.setPersistenceEnabled(true);

ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));

if (isSecurityEnabled()) {
Expand Down

0 comments on commit d8be0ef

Please sign in to comment.