Skip to content

Commit

Permalink
This closes #1394
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jul 11, 2017
2 parents f8554c7 + 905098b commit 83256c5
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
Expand Up @@ -389,8 +389,9 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except
long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);

byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
message.getBodyBuffer().readBytes(bytes);
ActiveMQBuffer body = message.getReadOnlyBodyBuffer();
byte[] bytes = new byte[body.readableBytes()];
body.readBytes(bytes);

largeMessage.addBytes(bytes);

Expand Down
Expand Up @@ -299,8 +299,6 @@ public StompFrame createMessageFrame(ICoreMessage serverMessage,

ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();

int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();

int size = buffer.writerIndex();

byte[] data = new byte[size];
Expand Down
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.stomp;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
Expand All @@ -26,13 +32,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -159,12 +158,16 @@ public void onMessage(Message arg0) {
.setMaxUsage(0)
.tick();

for (int i = 1; i <= count; i++) {
// Thread.sleep(1);
// log.info(">>> " + i);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
// Connection should be closed by broker when disk is full and attempt to send
Exception e = null;
try {
for (int i = 1; i <= count; i++) {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
}
} catch (Exception se) {
e = se;
}

assertNotNull(e);
// It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119");
} finally {
Expand Down Expand Up @@ -254,6 +257,33 @@ public void sendSTOMPReceiveMQTT() throws Exception {
clientProvider.disconnect();
}

@Test
public void testSendReceiveLargeMessage() throws Exception {
String address = "testLargeMessageAddress";
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);

// STOMP default is UTF-8 == 1 byte per char.
int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
StringBuilder b = new StringBuilder(largeMessageStringSize);
for (int i = 0; i < largeMessageStringSize; i++) {
b.append('t');
}
String payload = b.toString();

// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);

// Send Large Message
System.out.println("Sending Message Size: " + largeMessageStringSize);
send(conn, address, null, payload);

// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
System.out.println(frame.getBody().length());
assertTrue(frame.getBody().equals(payload));
}

@Test
public void sendMQTTReceiveSTOMP() throws Exception {
String payload = "This is a test message";
Expand Down
Expand Up @@ -137,6 +137,7 @@ public void setUp() throws Exception {
connection.start();
}


/**
* @return
* @throws Exception
Expand Down Expand Up @@ -168,6 +169,8 @@ protected JMSServerManager createServer() throws Exception {
config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
}

config.setPersistenceEnabled(true);

ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));

if (isSecurityEnabled()) {
Expand Down

0 comments on commit 83256c5

Please sign in to comment.