Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.io.ClientConnector;
Expand Down Expand Up @@ -122,6 +123,105 @@ public boolean isSatisified() throws Exception {
}));
}

@Test(timeout = 60000)
public void testMissingStompConnect() throws Exception {
// Send a frame without first sending a CONNECT frame, which is a protocol violation
String message = "SEND\n" + "destination:/queue/" + getTestName() + "\n\n" + "Hello World" + Stomp.NULL;
wsStompConnection.sendRawFrame(message);

String incoming = wsStompConnection.receive(5, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("ERROR"));
assertTrue(incoming.contains("Invalid frame received before CONNECT or STOMP frame: SEND"));

assertTrue("Connection should close", Wait.waitFor(
(Condition) () -> wsStompConnection.isNotConnected()));
}

@Test(timeout = 60000)
public void testNegativeContentLength() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;

wsStompConnection.sendRawFrame(connectFrame);

String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("CONNECTED"));

String message = "SEND\n" + "destination:/queue/" + getTestName() + "\ncontent-length:-1" + " \n\n" + "body" + Stomp.NULL;
wsStompConnection.sendRawFrame(message);

// Negative content length is a protocol error and should return
// an error and close the connection
incoming = wsStompConnection.receive(5, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("ERROR"));
assertTrue(incoming.contains("Specified content-length may not be negative"));

assertTrue("Connection should close", Wait.waitFor(
(Condition) () -> wsStompConnection.isNotConnected()));
}

@Test(timeout = 60000)
public void testDuplicateConnect() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;

wsStompConnection.sendRawFrame(connectFrame);

String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("CONNECTED"));

// Sending a second CONNECT frame is not allowed and should error
wsStompConnection.sendRawFrame(connectFrame);

incoming = wsStompConnection.receive(5, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("ERROR"));
assertTrue(incoming.contains("Duplicate CONNECT or STOMP packet received"));

assertTrue("Connection should close", Wait.waitFor(
(Condition) () -> wsStompConnection.isNotConnected()));
}

@Test(timeout = 60000)
public void testInvalidServerResponseReceived() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;

wsStompConnection.sendRawFrame(connectFrame);

String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("CONNECTED"));

// Sending a server response to the server, which is invalid
String invalidFrame = "RECEIPT\n" + "receipt-id:message-12345\n\n" + Stomp.NULL;
wsStompConnection.sendRawFrame(invalidFrame);
incoming = wsStompConnection.receive(5, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("ERROR"));
assertTrue(incoming.contains("Invalid response frame received from Client: RECEIPT"));

// make sure the connection was closed by the server
assertTrue("Connection should close", Wait.waitFor(
(Condition) () -> wsStompConnection.isNotConnected()));
}

@Test(timeout = 60000)
public void testConnectWithVersionOptions() throws Exception {
String connectFrame = "STOMP\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,32 @@
import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;

import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.DataByteArrayInputStream;

public class StompCodec {

final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
TcpTransport transport;
StompWireFormat wireFormat;

AtomicLong frameSize = new AtomicLong();
ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
boolean processedHeaders = false;
String action;
HashMap<String, String> headers;
int contentLength = -1;
int readLength = 0;
int previousByte = -1;
boolean awaitingCommandStart = true;
String version = Stomp.DEFAULT_VERSION;
private final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
private final TcpTransport transport;
private final StompWireFormat wireFormat;

private final ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
private boolean processedHeaders = false;
private String action;
private Map<String, String> headers;
private int contentLength = -1;
private int readLength = 0;
private int previousByte = -1;
private boolean awaitingCommandStart = true;

public StompCodec(TcpTransport transport) {
this.transport = transport;
this.wireFormat = (StompWireFormat) transport.getWireFormat();
this.transport = Objects.requireNonNull(transport);
this.wireFormat = (StompWireFormat) Objects.requireNonNull(transport.getWireFormat());
}

public void parse(ByteArrayInputStream input, int readSize) throws Exception {
Expand Down Expand Up @@ -75,12 +72,13 @@ public void parse(ByteArrayInputStream input, int readSize) throws Exception {
DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());

try {
action = wireFormat.parseAction(data, frameSize);
headers = wireFormat.parseHeaders(data, frameSize);
action = wireFormat.parseAction(data);
wireFormat.validateAction(action);
headers = wireFormat.parseHeaders(data);

String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
contentLength = wireFormat.parseContentLength(contentLengthHeader, frameSize);
contentLength = wireFormat.parseContentLength(contentLengthHeader);
} else {
contentLength = -1;
}
Expand All @@ -106,7 +104,7 @@ public void parse(ByteArrayInputStream input, int readSize) throws Exception {
transport.doConsume(errorFrame);
return;
}
if (frameSize.incrementAndGet() > wireFormat.getMaxFrameSize()) {
if (wireFormat.incrementAndGetFrameSize() > wireFormat.getMaxFrameSize()) {
StompFrameError errorFrame = new StompFrameError(new ProtocolException("The maximum frame size was exceeded", true));
errorFrame.setAction(this.action);
transport.doConsume(errorFrame);
Expand Down Expand Up @@ -135,7 +133,7 @@ protected void processCommand() throws Exception {
awaitingCommandStart = true;
currentCommand.reset();
contentLength = -1;
frameSize.set(0);
wireFormat.resetFrame();
}

public static String detectVersion(Map<String, String> headers) throws ProtocolException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public StompFrame receive(long timeOut) throws Exception {
InputStream is = stompSocket.getInputStream();
StompWireFormat wf = new StompWireFormat();
wf.setStompVersion(version);
wf.setServerMode(false);
DataInputStream dis = new DataInputStream(is);
return (StompFrame)wf.unmarshal(dis);
}
Expand Down
Loading
Loading