Skip to content

Commit

Permalink
Bug: 482432 - WebSocket handles frames over 125 byte incorrectly
Browse files Browse the repository at this point in the history
Fixed WebSocketFrame and added test to make sure that this works.

Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
  • Loading branch information
jpwsutton committed Nov 24, 2015
1 parent b2ff952 commit 513a77a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
Expand Up @@ -13,8 +13,11 @@

package org.eclipse.paho.client.mqttv3.test;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -183,6 +186,62 @@ public void testWebSocketPubSub() throws Exception {
}
}
}

/**
* Tests Websocker support for packets over 16KB
* Prompted by Bug: 482432
* https://bugs.eclipse.org/bugs/show_bug.cgi?id=482432
* This test connects to a broker via WebSockets, subscribes
* to a topic, publishes a large payload to it and checks
* that it recieves the same payload.
* @throws Exception
*/
@Test
public void largePayloadTest() throws Exception{
// Generate large byte array;
byte[] largeByteArray = new byte[32000];
new Random().nextBytes(largeByteArray);
String methodName = Utility.getMethodName();
LoggingUtilities.banner(log, cclass, methodName);

IMqttClient client = null;
try {
String topicStr = "topic_largeFile_01";
String clientId = methodName;
client = clientFactory.createMqttClient(serverURI, clientId);

log.info("Assigning callback...");
MessageListener listener = new MessageListener();
client.setCallback(listener);

log.info("Connecting... serverURI:" + serverURI + ", ClientId:" + clientId);
client.connect();

log.info("Subscribing to..." + topicStr);
client.subscribe(topicStr);

log.info("Publishing to..." + topicStr);
MqttTopic topic = client.getTopic(topicStr);
MqttMessage message = new MqttMessage(largeByteArray);
topic.publish(message);

log.info("Checking msg");
MqttMessage msg = listener.getNextMessage();
Assert.assertNotNull(msg);
Assert.assertTrue(Arrays.equals(largeByteArray, msg.getPayload()));
log.info("Disconnecting...");
client.disconnect();
log.info("Disconnected...");
} catch (Exception e){
e.printStackTrace();
} finally {
if (client != null) {
log.info("Close...");
client.close();
}
}

}



Expand Down Expand Up @@ -211,7 +270,8 @@ public MqttMessage getNextMessage() {
synchronized (messages) {
if (messages.size() == 0) {
try {
messages.wait(1000);
// Wait a bit longer than usual because of the largePayloadTest
messages.wait(10000);
}
catch (InterruptedException e) {
// empty
Expand Down
Expand Up @@ -144,7 +144,10 @@ public WebSocketFrame(InputStream input) throws IOException {
}

// Decode the payload length
while (--byteCount > 0){
if(byteCount > 0){
payloadLength = 0;
}
while (--byteCount >= 0){
maskLengthByte = (byte) input.read();
payloadLength |= (maskLengthByte & 0xFF) << (8 * byteCount);
}
Expand All @@ -157,7 +160,15 @@ public WebSocketFrame(InputStream input) throws IOException {
}

this.payload = new byte[payloadLength];
input.read(this.payload,0,payloadLength);
int offsetIndex = 0;
int tempLength = payloadLength;
int bytesRead = 0;
while (offsetIndex != payloadLength){
bytesRead = input.read(this.payload,offsetIndex,tempLength);
offsetIndex += bytesRead;
tempLength -= bytesRead;
}


// Demask if needed
if(masked)
Expand Down

0 comments on commit 513a77a

Please sign in to comment.