Skip to content

Commit

Permalink
NO-JIRA clarify & verify web socket support for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Mar 6, 2020
1 parent ead80ea commit 8c25911
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 12 deletions.
Expand Up @@ -47,8 +47,6 @@

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

private static final String WEBSOCKET_PATH = "/stomp";

private HttpRequest httpRequest;
private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols;
Expand Down Expand Up @@ -142,7 +140,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

private String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
return "ws://" + req.headers().get(HttpHeaderNames.HOST);
}

public HttpRequest getHttpRequest() {
Expand Down
Expand Up @@ -62,8 +62,8 @@ public interface ProtocolManager<P extends BaseInterceptor> {
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);

/**
* A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used
* during the websocket subprotocol handshake.
* A list of the IANA websocket subprotocol identifiers (https://www.iana.org/assignments/websocket/websocket.xhtml)
* supported by this protocol manager. These are used during the websocket subprotocol handshake.
*
* @return A list of subprotocol ids
*/
Expand Down
18 changes: 17 additions & 1 deletion docs/user-manual/en/amqp.md
Expand Up @@ -155,4 +155,20 @@ This contains a real example for configuring amqpIdleTimeout:

```xml
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10</acceptor>
```
```

## Web Sockets

Apache ActiveMQ Artemis also supports AMQP over [Web
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
browsers which support Web Sockets can send and receive AMQP messages.

AMQP over Web Sockets is supported via a normal AMQP acceptor:

```xml
<acceptor name="amqp-ws-acceptor">tcp://localhost:5672?protocols=AMQP</acceptor>
```

With this configuration, Apache ActiveMQ Artemis will accept AMQP connections
over Web Sockets on the port `5672`. Web browsers can then connect to
`ws://<server>:5672` using a Web Socket to send and receive AMQP messages.
5 changes: 3 additions & 2 deletions docs/user-manual/en/configuring-transports.md
Expand Up @@ -108,9 +108,10 @@ We believe this caters for the vast majority of transport requirements.

Apache ActiveMQ Artemis supports using a single port for all protocols, Apache
ActiveMQ Artemis will automatically detect which protocol is being used CORE,
AMQP, STOMP or OPENWIRE and use the appropriate Apache ActiveMQ Artemis
AMQP, STOMP, MQTT or OPENWIRE and use the appropriate Apache ActiveMQ Artemis
handler. It will also detect whether protocols such as HTTP or Web Sockets are
being used and also use the appropriate decoders
being used and also use the appropriate decoders. Web Sockets are supported for
AMQP, STOMP, and MQTT.

It is possible to limit which protocols are supported by using the `protocols`
parameter on the Acceptor like so:
Expand Down
15 changes: 15 additions & 0 deletions docs/user-manual/en/mqtt.md
Expand Up @@ -135,3 +135,18 @@ There are 2 types of wild cards in MQTT:
Matches a single level in the address hierarchy. For example `/uk/+/stores`
would match `/uk/newcastle/stores` but not `/uk/cities/newcastle/stores`.

## Web Sockets

Apache ActiveMQ Artemis also supports MQTT over [Web
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
browsers which support Web Sockets can send and receive MQTT messages.

MQTT over Web Sockets is supported via a normal MQTT acceptor:

```xml
<acceptor name="mqtt-ws-acceptor">tcp://localhost:1883?protocols=MQTT</acceptor>
```

With this configuration, Apache ActiveMQ Artemis will accept MQTT connections
over Web Sockets on the port `1883`. Web browsers can then connect to
`ws://<server>:1883` using a Web Socket to send and receive MQTT messages.
2 changes: 1 addition & 1 deletion docs/user-manual/en/stomp.md
Expand Up @@ -285,7 +285,7 @@ the same as the default value of

## Web Sockets

Apache ActiveMQ Artemis also support STOMP over [Web
Apache ActiveMQ Artemis also supports STOMP over [Web
Sockets](https://html.spec.whatwg.org/multipage/web-sockets.html). Modern web
browsers which support Web Sockets can send and receive STOMP messages.

Expand Down
2 changes: 1 addition & 1 deletion tests/integration-tests/pom.xml
Expand Up @@ -29,7 +29,7 @@

<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<paho.client.mqttv3.version>1.1.0</paho.client.mqttv3.version>
<paho.client.mqttv3.version>1.2.2</paho.client.mqttv3.version>
</properties>

<dependencies>
Expand Down
Expand Up @@ -56,7 +56,6 @@ public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSup
@Parameterized.Parameter(2)
public String expectedMessage;


@Parameterized.Parameters(name = "useModified={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.activemq.artemis.tests.integration.mqtt.imported;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,11 +32,25 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class PahoMQTTTest extends MQTTTestSupport {

private static MQTTLogger LOG = MQTTLogger.LOGGER;

@Parameterized.Parameters(name = "protocol={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][] {{"tcp"}, {"ws"}});
}

public String protocol;

public PahoMQTTTest(String protocol) {
this.protocol = protocol;
}

@Test(timeout = 300000)
public void testLotsOfClients() throws Exception {

Expand Down Expand Up @@ -146,7 +162,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
}

private MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence());
}

}

0 comments on commit 8c25911

Please sign in to comment.