diff --git a/.classpath b/.classpath index dbec77fa966..a168e7640d3 100644 --- a/.classpath +++ b/.classpath @@ -67,6 +67,7 @@ + @@ -101,7 +102,7 @@ - + diff --git a/docs/user-manual/en/examples.xml b/docs/user-manual/en/examples.xml index 69cc40df41f..cc51da6a5fb 100644 --- a/docs/user-manual/en/examples.xml +++ b/docs/user-manual/en/examples.xml @@ -420,6 +420,12 @@ The stomp example shows you how to configure a HornetQ server to send and receive Stomp messages. +
+ Stomp Over Web Sockets + The stomp-websockets example shows you how to configure a + HornetQ server to send and receive Stomp messages directly from Web browsers (provided + they support Web Sockets). +
Symmetric Cluster The symmetric-cluster example demonstrates a symmetric cluster diff --git a/docs/user-manual/en/interoperability.xml b/docs/user-manual/en/interoperability.xml index 1c9ba6c54b6..e461a5e90f2 100644 --- a/docs/user-manual/en/interoperability.xml +++ b/docs/user-manual/en/interoperability.xml @@ -79,10 +79,10 @@ hello queue orders send or subscribe to a JMS Topic by prepending the topic name by jms.topic.. For example to subscribe to the stocks JMS Topic, the Stomp client must send the frame: - SUBSCRIBE - destination:jms.topic.stocks - - ^@ +SUBSCRIBE +destination:jms.topic.stocks + +^@ @@ -108,6 +108,30 @@ producer.send(message);
+ +
+ Stomp Over Web Sockets + HornetQ also support Stomp over Web Sockets. Modern web browser which support Web Sockets can send and receive + Stomp messages from HornetQ. + To enable Stomp over Web Sockets, you must configure a NettyAcceptor with a protocol + parameter set to stomp_ws: + +<acceptor name="stomp-ws-acceptor"> + <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> + <param key="protocol" value="stomp_ws"/> + <param key="port" value="61614"/> +</acceptor> + + With this configuration, HornetQ will accept Stomp connections over Web Sockets on + the port 61614 with the URL path /stomp. + Web browser can then connect to ws://<server>:61614/stomp usin a Web Socket to send and receive Stomp + messages. + A companion JavaScript library to ease client-side development is available from + GitHub (please see + its documentation for a complete description). + The stomp-websockets example shows how to configure HornetQ server to have web browsers and Java + applications exchanges messages on a JMS topic. +
StompConnect diff --git a/examples/jms/stomp-websockets/build.bat b/examples/jms/stomp-websockets/build.bat new file mode 100644 index 00000000000..1f414eb35ba --- /dev/null +++ b/examples/jms/stomp-websockets/build.bat @@ -0,0 +1,13 @@ +@echo off + +set "OVERRIDE_ANT_HOME=..\..\..\tools\ant" + +if exist "..\..\..\src\bin\build.bat" ( + rem running from TRUNK + call ..\..\..\src\bin\build.bat %* +) else ( + rem running from the distro + call ..\..\..\bin\build.bat %* +) + +set "OVERRIDE_ANT_HOME=" diff --git a/examples/jms/stomp-websockets/build.sh b/examples/jms/stomp-websockets/build.sh new file mode 100755 index 00000000000..9c786d408f2 --- /dev/null +++ b/examples/jms/stomp-websockets/build.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +OVERRIDE_ANT_HOME=../../../tools/ant +export OVERRIDE_ANT_HOME + +if [ -f "../../../src/bin/build.sh" ]; then + # running from TRUNK + ../../../src/bin/build.sh "$@" +else + # running from the distro + ../../../bin/build.sh "$@" +fi + + + diff --git a/examples/jms/stomp-websockets/build.xml b/examples/jms/stomp-websockets/build.xml new file mode 100644 index 00000000000..8cd84ec56bf --- /dev/null +++ b/examples/jms/stomp-websockets/build.xml @@ -0,0 +1,28 @@ + + + ]> + + + + + + + + + + + + + \ No newline at end of file diff --git a/examples/jms/stomp-websockets/chat/chat.css b/examples/jms/stomp-websockets/chat/chat.css new file mode 100644 index 00000000000..27b9ddbc3f1 --- /dev/null +++ b/examples/jms/stomp-websockets/chat/chat.css @@ -0,0 +1,99 @@ +* { + margin: 0; + padding: 0; +} + +body { + font-family: 'Helvetica Neue', Helvetica, Verdana, Arial, sans-serif; + padding: 10px; +} + +#disconnect { + display: none; +} +#subscribe { + display: none; +} + +#debug { + background-color: #F0F0F0; + font-size: 12px; + height: 75%; + overflow: auto; + padding: 10px; + position: absolute; + right: 10px; + top: 10px; + width: 250px; + z-index: 100; +} + +#send_form { + bottom: 5px; + position: absolute; + width: 99%; +} + +#send_form #send_form_input { + border: 1px solid #CCC; + font-size: 16px; + height: 20px; + padding: 5px; + width: 98%; +} + +#send_form input[disabled] { + background-color: #EEE; +} + +#messages { + bottom: 25px; + left: 0; + overflow: auto; + padding: 5px; + right: 0; + top: 2em; + z-index: -1; +} + +.message { + width: 95%; +} + +.timestamp { + font-size: 12px; +} + +.me, .nick { + float: left; + width: 100px; +} + +.me { + color: #F99; +} + +.nick { + color: #99F; +} + +.status { + background-color: #DDD; +} + +form dt { + clear:both; + width:19%; + float:left; + text-align:right; +} + +form dd { + float:left; + width:80%; + margin:0 0 0.5em 0.25em; +} + +input { + width: 320px; +} \ No newline at end of file diff --git a/examples/jms/stomp-websockets/chat/chat.js b/examples/jms/stomp-websockets/chat/chat.js new file mode 100644 index 00000000000..bfa3aadbbc3 --- /dev/null +++ b/examples/jms/stomp-websockets/chat/chat.js @@ -0,0 +1,51 @@ +$(document).ready(function(){ + + var client, destination; + + $('#connect_form').submit(function() { + var url = $("#connect_url").val(); + var login = $("#connect_login").val(); + var passcode = $("#connect_passcode").val(); + destination = $("#destination").val(); + + client = Stomp.client(url); + + // this allows to display debug logs directly on the web page + client.debug = function(str) { + $("#debug").append(str + "\n"); + }; + // the client is notified when it is connected to the server. + var onconnect = function(frame) { + debug("connected to Stomp"); + $('#connect').fadeOut({ duration: 'fast' }); + $('#disconnect').fadeIn(); + $('#send_form_input').removeAttr('disabled'); + + client.subscribe(destination, function(message) { + $("#messages").append("

" + message.body + "

\n"); + }); + }; + client.connect(login, passcode, onconnect); + + return false; + }); + + $('#disconnect_form').submit(function() { + client.disconnect(function() { + $('#disconnect').fadeOut({ duration: 'fast' }); + $('#connect').fadeIn(); + $('#send_form_input').addAttr('disabled'); + }); + return false; + }); + + $('#send_form').submit(function() { + var text = $('#send_form_input').val(); + if (text) { + client.send(destination, {foo: 1}, text); + $('#send_form_input').val(""); + } + return false; + }); + +}); \ No newline at end of file diff --git a/examples/jms/stomp-websockets/chat/index.html b/examples/jms/stomp-websockets/chat/index.html new file mode 100644 index 00000000000..b606aa05bb4 --- /dev/null +++ b/examples/jms/stomp-websockets/chat/index.html @@ -0,0 +1,54 @@ + + + + Chat Example Using Stomp Over Web Sockets + + + + + + + + +
+
+
+
+
+
+
+
+
+
+
+
 
+
+
+
+ +

Use the form above to connect to the Stomp server and subscribe to the destination.

+

Once connected, you can send messages to the destination with the text field at the bottom of this page

+
+
+
+ +
+
+

+    
+
+
+ +
+ + + diff --git a/examples/jms/stomp-websockets/chat/stomp.js b/examples/jms/stomp-websockets/chat/stomp.js new file mode 100644 index 00000000000..f47611aaa2c --- /dev/null +++ b/examples/jms/stomp-websockets/chat/stomp.js @@ -0,0 +1,185 @@ +// (c) 2010 Jeff Mesnil -- http://jmesnil.net/ + +(function(window) { + + var Stomp = {}; + + Stomp.frame = function(command, headers, body) { + return { + command: command, + headers: headers, + body: body, + toString: function() { + var out = command + '\n'; + if (headers) { + for (header in headers) { + if(headers.hasOwnProperty(header)) { + out = out + header + ': ' + headers[header] + '\n'; + } + } + } + out = out + '\n'; + if (body) { + out = out + body; + } + return out; + } + } + }; + + trim = function(str) { + return str.replace(/^\s+/g,'').replace(/\s+$/g,''); + }; + + Stomp.unmarshal = function(data) { + var divider = data.search(/\n\n/), + headerLines = data.substring(0, divider).split('\n'), + command = headerLines.shift(), + headers = {}, + body = ''; + + // Parse headers + var line = idx = null; + for (var i = 0; i < headerLines.length; i++) { + line = headerLines[i]; + idx = line.indexOf(':'); + headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)); + } + + // Parse body, stopping at the first \0 found. + // TODO: Add support for content-length header. + var chr = null; + for (var i = divider + 2; i < data.length; i++) { + chr = data.charAt(i); + if (chr === '\0') { + break; + } + body += chr; + } + + return Stomp.frame(command, headers, body); + }; + + Stomp.marshal = function(command, headers, body) { + return Stomp.frame(command, headers, body).toString() + '\0'; + }; + + Stomp.client = function (url){ + + var that, ws, login, passcode; + var counter = 0; // used to index subscribers + // subscription callbacks indexed by subscriber's ID + var subscriptions = {}; + + debug = function(str) { + if (that.debug) { + that.debug(str); + } + }; + + onmessage = function(evt) { + debug('<<< ' + evt.data); + var frame = Stomp.unmarshal(evt.data); + if (frame.command === "CONNECTED" && that.connectCallback) { + that.connectCallback(frame); + } else if (frame.command === "MESSAGE") { + var onreceive = subscriptions[frame.headers.subscription]; + if (onreceive) { + onreceive(frame); + } + } else if (frame.command === "RECEIPT" && that.onreceipt) { + that.onreceipt(frame); + } else if (frame.command === "ERROR" && that.onerror) { + that.onerror(frame); + } + }; + + transmit = function(command, headers, body) { + var out = Stomp.marshal(command, headers, body); + debug(">>> " + out); + ws.send(out); + }; + + that = {}; + + that.connect = function(login_, passcode_, connectCallback, errorCallback) { + debug("Opening Web Socket..."); + ws = new WebSocket(url); + ws.onmessage = onmessage; + ws.onclose = function() { + var msg = "Whoops! Lost connection to " + url; + debug(msg); + if (errorCallback) { + errorCallback(msg); + } + }; + ws.onopen = function() { + debug('Web Socket Opened...'); + transmit("CONNECT", {login: login, passcode: passcode}); + // connectCallback handler will be called from onmessage when a CONNECTED frame is received + }; + login = login_; + passcode = passcode_; + that.connectCallback = connectCallback; + }; + + that.disconnect = function(disconnectCallback) { + transmit("DISCONNECT"); + ws.close(); + if (disconnectCallback) { + disconnectCallback(); + } + }; + + that.send = function(destination, headers, body) { + var headers = headers || {}; + headers.destination = destination; + transmit("SEND", headers, body); + }; + + that.subscribe = function(destination, callback, headers) { + var headers = headers || {}; + var id = "sub-" + counter++; + headers.destination = destination; + headers.id = id; + subscriptions[id] = callback; + transmit("SUBSCRIBE", headers); + return id; + }; + + that.unsubscribe = function(id, headers) { + var headers = headers || {}; + headers.id = id; + delete subscriptions[id]; + transmit("UNSUBSCRIBE", headers); + }; + + that.begin = function(transaction, headers) { + var headers = headers || {}; + headers.transaction = transaction; + transmit("BEGIN", headers); + }; + + that.commit = function(transaction, headers) { + var headers = headers || {}; + headers.transaction = transaction; + transmit("COMMIT", headers); + }; + + that.abort = function(transaction, headers) { + var headers = headers || {}; + headers.transaction = transaction; + transmit("ABORT", headers); + }; + + that.ack = function(message_id, headers) { + var headers = headers || {}; + headers["message-id"] = message_id; + transmit("ACK", headers); + }; + return that; + }; + + window.Stomp = Stomp; + +})(window); diff --git a/examples/jms/stomp-websockets/readme.html b/examples/jms/stomp-websockets/readme.html new file mode 100644 index 00000000000..9a594f9a9a4 --- /dev/null +++ b/examples/jms/stomp-websockets/readme.html @@ -0,0 +1,73 @@ + + + HornetQ Stomp WebSockets Example + + + + + + + +

Stomp WebSockets Example

+ +

This example shows you how to configure HornetQ to send and receive Stomp messages from modern web browser using Web Sockets.

+ +

At the moment, WebKit and Google Chrome are the only web browsers with Web Sockets support.

+

+ +

Example Setup

+

The example will start a HornetQ server configured with Stomp over Web Sockets and JMS. Web browsers clients and + Java application will exchange message using a JMS Topic.

+ +

Example step-by-step

+

To run the example, you need to start HornetQ server from the bin directory and specify this example's + server configuration:

+
+$ ./run.sh ../examples/jms/stomp-websockets/server0
+...
+[main] 17:45:03,498 INFO [org.hornetq.core.remoting.impl.netty.NettyAcceptor]  Started Netty Acceptor version 3.2.0.BETA1-r2215 localhost:61614 for STOMP_WS protocol
+[main] 17:45:03,505 INFO [org.hornetq.core.server.impl.HornetQServerImpl]  HornetQ Server version 2.1.0.BETA3 (Hungry Hornet, 117) started
+    
+ +

To publish a message to a JMS topic from a Java application, simply type ./build.sh + (or build.bat on windows) from this directory:

+
+$ ./build.sh
+...
+[java] Sent message: message sent from a Java application at Wed Apr 28 17:45:53 CEST 2010
+[java] Received message: message sent from a Java application at Wed Apr 28 17:45:53 CEST 2010
+[java] example complete
+[java] 
+[java] #####################
+[java] ###    SUCCESS!   ###
+[java] #####################
+    
+ +

To subscribe to the topic from your web browser, open the Chat Example from another tab. + The chat example is preconfigured to connect to the HornetQ server with the URL ws://localhost:61614/stomp and subscribe to the JMS Topic (through its core address + jms.topic.chat). +

+

You can open as many Web clients as you want and they will all exchange messages through the topic

+

If you run again the Java application (with ./build.sh), the web clients will also receive its message

+ +

Documentation

+ +

A JavaScript library is used on the browser side to be able to use Stomp Over Web Sockets (please see its documentation + for a complete description). + + + \ No newline at end of file diff --git a/examples/jms/stomp-websockets/server0/client-jndi.properties b/examples/jms/stomp-websockets/server0/client-jndi.properties new file mode 100644 index 00000000000..080524fbb80 --- /dev/null +++ b/examples/jms/stomp-websockets/server0/client-jndi.properties @@ -0,0 +1,3 @@ +java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory +java.naming.provider.url=jnp://localhost:1099 +java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces diff --git a/examples/jms/stomp-websockets/server0/hornetq-beans.xml b/examples/jms/stomp-websockets/server0/hornetq-beans.xml new file mode 100644 index 00000000000..171d3739eb7 --- /dev/null +++ b/examples/jms/stomp-websockets/server0/hornetq-beans.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + + 1099 + localhost + 1098 + localhost + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/jms/stomp-websockets/server0/hornetq-configuration.xml b/examples/jms/stomp-websockets/server0/hornetq-configuration.xml new file mode 100644 index 00000000000..e06536f1bfc --- /dev/null +++ b/examples/jms/stomp-websockets/server0/hornetq-configuration.xml @@ -0,0 +1,42 @@ + + + + + + + org.hornetq.core.remoting.impl.netty.NettyConnectorFactory + + + + + + + + org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory + + + + + org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory + + + + + + + + + + + + + + + + + + + + diff --git a/examples/jms/stomp-websockets/server0/hornetq-jms.xml b/examples/jms/stomp-websockets/server0/hornetq-jms.xml new file mode 100644 index 00000000000..1f114426ecd --- /dev/null +++ b/examples/jms/stomp-websockets/server0/hornetq-jms.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + diff --git a/examples/jms/stomp-websockets/server0/hornetq-users.xml b/examples/jms/stomp-websockets/server0/hornetq-users.xml new file mode 100644 index 00000000000..934306c4b5e --- /dev/null +++ b/examples/jms/stomp-websockets/server0/hornetq-users.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/examples/jms/stomp-websockets/server0/jndi.properties b/examples/jms/stomp-websockets/server0/jndi.properties new file mode 100644 index 00000000000..66bc5074707 --- /dev/null +++ b/examples/jms/stomp-websockets/server0/jndi.properties @@ -0,0 +1,2 @@ +java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory +java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces diff --git a/examples/jms/stomp-websockets/server0/logging.properties b/examples/jms/stomp-websockets/server0/logging.properties new file mode 100644 index 00000000000..82b6f7882ba --- /dev/null +++ b/examples/jms/stomp-websockets/server0/logging.properties @@ -0,0 +1,34 @@ +############################################################ +# Default Logging Configuration File +# +# You can use a different file by specifying a filename +# with the java.util.logging.config.file system property. +# For example java -Djava.util.logging.config.file=myfile +############################################################ + +############################################################ +# Global properties +############################################################ + +# "handlers" specifies a comma separated list of log Handler +# classes. These handlers will be installed during VM startup. +# Note that these classes must be on the system classpath. +# By default we only configure a ConsoleHandler, which will only +# show messages at the INFO and above levels. +handlers=java.util.logging.ConsoleHandler,java.util.logging.FileHandler +java.util.logging.ConsoleHandler.formatter=org.hornetq.integration.logging.HornetQLoggerFormatter +java.util.logging.FileHandler.level=INFO +java.util.logging.FileHandler.formatter=org.hornetq.integration.logging.HornetQLoggerFormatter +java.util.logging.FileHandler.pattern=../logs/hornetq.log +# Default global logging level. +# This specifies which kinds of events are logged across +# all loggers. For any given facility this global level +# can be overriden by a facility specific level +# Note that the ConsoleHandler also has a separate level +# setting to limit messages printed to the console. +.level= INFO + +############################################################ +# Handler specific properties. +# Describes specific configuration info for Handlers. +############################################################ diff --git a/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java b/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java new file mode 100644 index 00000000000..b59b8004eaa --- /dev/null +++ b/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java @@ -0,0 +1,89 @@ +/* + * Copyright 2010 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.example; + +import java.util.Date; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.hornetq.common.example.HornetQExample; + +/** + * An example where a client will send a JMS message to a Topic. + * Browser clients connected using Web Sockets will be able to receive the message. + * + * @author Jeff Mesnil + */ +public class StompWebSocketExample extends HornetQExample +{ + public static void main(final String[] args) + { + new StompWebSocketExample().run(args); + } + + @Override + public boolean runExample() throws Exception + { + Connection connection = null; + InitialContext initialContext = null; + try + { + initialContext = getContext(0); + Topic topic = (Topic)initialContext.lookup("/topic/chat"); + ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory"); + connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(topic); + MessageConsumer consumer = session.createConsumer(topic); + + // use JMS bytes message with UTF-8 String to send a text to Stomp clients + String text = "message sent from a Java application at " + new Date(); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(text.getBytes("UTF-8")); + System.out.println("Sent message: " + text); + + producer.send(message); + + connection.start(); + + message = (BytesMessage)consumer.receive(); + byte[] data = new byte[1024]; + int size = message.readBytes(data); + text = new String(data, 0, size, "UTF-8"); + System.out.println("Received message: " + text); + + return true; + } + finally + { + if (connection != null) + { + connection.close(); + } + + if (initialContext != null) + { + initialContext.close(); + } + } + } +} \ No newline at end of file diff --git a/src/main/org/hornetq/core/protocol/stomp/StompFrame.java b/src/main/org/hornetq/core/protocol/stomp/StompFrame.java index 27ee24258d5..ca1fb40ed65 100644 --- a/src/main/org/hornetq/core/protocol/stomp/StompFrame.java +++ b/src/main/org/hornetq/core/protocol/stomp/StompFrame.java @@ -18,6 +18,7 @@ package org.hornetq.core.protocol.stomp; import java.util.Map; +import java.util.Map.Entry; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; @@ -82,6 +83,18 @@ public String toString() { return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]"; } + + public String asString() + { + String out = command + '\n'; + for (Entry header : headers.entrySet()) + { + out += header.getKey() + ": " + header.getValue() + '\n'; + } + out += '\n'; + out += new String(content); + return out; + } public HornetQBuffer toHornetQBuffer() throws Exception { diff --git a/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java b/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java new file mode 100644 index 00000000000..568a0e7ae45 --- /dev/null +++ b/src/main/org/hornetq/core/protocol/stomp/WebSocketServerHandler.java @@ -0,0 +1,135 @@ +/* + * Copyright 2010 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.hornetq.core.protocol.stomp; + +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.handler.codec.http.websocket.WebSocketFrame; +import org.jboss.netty.handler.codec.http.websocket.WebSocketFrameDecoder; +import org.jboss.netty.util.CharsetUtil; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (trustin@gmail.com) + * + * @version $Rev$, $Date$ + */ +public class WebSocketServerHandler extends SimpleChannelUpstreamHandler { + + private static final String WEBSOCKET_PATH = "/stomp"; + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + Object msg = e.getMessage(); + if (msg instanceof HttpRequest) { + handleHttpRequest(ctx, (HttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) { + // Allow only GET methods. + if (req.getMethod() != HttpMethod.GET) { + sendHttpResponse( + ctx, req, new DefaultHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)); + return; + } + + // Serve the WebSocket handshake request. + if (req.getUri().equals(WEBSOCKET_PATH) && + HttpHeaders.Values.UPGRADE.equalsIgnoreCase(req.getHeader(HttpHeaders.Names.CONNECTION)) && + HttpHeaders.Values.WEBSOCKET.equalsIgnoreCase(req.getHeader(HttpHeaders.Names.UPGRADE))) { + + // Create the WebSocket handshake response. + HttpResponse res = new DefaultHttpResponse( + HttpVersion.HTTP_1_1, + new HttpResponseStatus(101, "Web Socket Protocol Handshake")); + res.addHeader(HttpHeaders.Names.UPGRADE, HttpHeaders.Values.WEBSOCKET); + res.addHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE); + res.addHeader(HttpHeaders.Names.WEBSOCKET_ORIGIN, req.getHeader(HttpHeaders.Names.ORIGIN)); + res.addHeader(HttpHeaders.Names.WEBSOCKET_LOCATION, getWebSocketLocation(req)); + String protocol = req.getHeader(HttpHeaders.Names.WEBSOCKET_PROTOCOL); + if (protocol != null) { + res.addHeader(HttpHeaders.Names.WEBSOCKET_PROTOCOL, protocol); + } + + // Upgrade the connection and send the handshake response. + ChannelPipeline p = ctx.getChannel().getPipeline(); + p.remove("http-aggregator"); + p.replace("http-decoder", "ws-decoder", new WebSocketFrameDecoder()); + + ctx.getChannel().write(res); + + p.replace("http-encoder", "ws-encoder", new WebSocketStompFrameEncoder()); + return; + } + + // Send an error page otherwise. + sendHttpResponse( + ctx, req, new DefaultHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)); + } + + private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { + // Send the uppercased string back. + Channels.fireMessageReceived(ctx, frame.getBinaryData()); + } + + private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { + // Generate an error page if response status code is not OK (200). + if (res.getStatus().getCode() != 200) { + res.setContent( + ChannelBuffers.copiedBuffer( + res.getStatus().toString(), CharsetUtil.UTF_8)); + res.setHeader( + HttpHeaders.Names.CONTENT_LENGTH, + Integer.toString(res.getContent().readableBytes())); + } + + // Send the response and close the connection if necessary. + ChannelFuture f = ctx.getChannel().write(res); + if (!HttpHeaders.isKeepAlive(req) || res.getStatus().getCode() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + e.getCause().printStackTrace(); + e.getChannel().close(); + } + + private String getWebSocketLocation(HttpRequest req) { + return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH; + } +} \ No newline at end of file diff --git a/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java b/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java new file mode 100644 index 00000000000..c1265d10855 --- /dev/null +++ b/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java @@ -0,0 +1,72 @@ +/* + * Copyright 2010 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.hornetq.core.protocol.stomp; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.core.buffers.impl.ChannelBufferWrapper; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.websocket.DefaultWebSocketFrame; +import org.jboss.netty.handler.codec.http.websocket.WebSocketFrame; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + +/** + * Encodes a {@link WebSocketFrame} into a {@link ChannelBuffer}. + *

+ * For the detailed instruction on adding add Web Socket support to your HTTP +* server, take a look into the WebSocketServer example located in the + * {@code org.jboss.netty.example.http.websocket} package. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Mike Heath (mheath@apache.org) + * @author Trustin Lee (trustin@gmail.com) + * @version $Rev: 2019 $, $Date: 2010-01-09 21:00:24 +0900 (Sat, 09 Jan 2010) $ + */ +public class WebSocketStompFrameEncoder extends OneToOneEncoder +{ + + private final StompFrameDecoder decoder = new StompFrameDecoder(); + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception + { + + if (msg instanceof ChannelBuffer) + { + // this is ugly and slow! + // we have to go ChannelBuffer -> HornetQBuffer -> StompFrame -> String -> WebSocketFrame + // since HornetQ protocol SPI requires to return HornetQBuffer to the transport + HornetQBuffer buffer = new ChannelBufferWrapper((ChannelBuffer)msg); + StompFrame frame = decoder.decode(buffer); + if (frame != null) + { + WebSocketFrame wsFrame = new DefaultWebSocketFrame(frame.asString()); + + // Text frame + ChannelBuffer data = wsFrame.getBinaryData(); + ChannelBuffer encoded = channel.getConfig().getBufferFactory().getBuffer(data.order(), + data.readableBytes() + 2); + encoded.writeByte((byte)wsFrame.getType()); + encoded.writeBytes(data, data.readableBytes()); + encoded.writeByte((byte)0xFF); + return encoded; + + } + } + return msg; + } +} \ No newline at end of file diff --git a/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java b/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java index 7a4729d7e77..ebbeaca6797 100644 --- a/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java +++ b/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java @@ -35,6 +35,7 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.logging.Logger; +import org.hornetq.core.protocol.stomp.WebSocketServerHandler; import org.hornetq.core.remoting.impl.ssl.SSLSupport; import org.hornetq.core.server.management.Notification; import org.hornetq.core.server.management.NotificationService; @@ -57,6 +58,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.DefaultChannelPipeline; import org.jboss.netty.channel.StaticChannelPipeline; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; @@ -65,6 +67,7 @@ import org.jboss.netty.channel.local.LocalAddress; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpRequestDecoder; import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.ssl.SslHandler; @@ -320,9 +323,15 @@ else if (useNio) ChannelPipelineFactory factory = new ChannelPipelineFactory() { + /** + * we use named handlers so that the web socket server handler can + * replace the http encode/decoder after the http handshake. + * + * @see WebSocketServerHandler#handleHttpRequest(ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest) + */ public ChannelPipeline getPipeline() throws Exception { - List handlers = new ArrayList(); + ChannelPipeline pipeline = new DefaultChannelPipeline(); if (sslEnabled) { @@ -332,31 +341,37 @@ public ChannelPipeline getPipeline() throws Exception SslHandler handler = new SslHandler(engine); - handlers.add(handler); + pipeline.addLast("ssl", handler); } if (httpEnabled) { - handlers.add(new HttpRequestDecoder()); + pipeline.addLast("http-decoder", new HttpRequestDecoder()); - handlers.add(new HttpResponseEncoder()); + pipeline.addLast("http-encoder", new HttpResponseEncoder()); - handlers.add(new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime)); + pipeline.addLast("http-handler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime)); } if (protocol == ProtocolType.CORE) { // Core protocol uses it's own optimised decoder - handlers.add(new HornetQFrameDecoder2()); + pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2()); + } + else if (protocol == ProtocolType.STOMP_WS) + { + pipeline.addLast("http-decoder", new HttpRequestDecoder()); + pipeline.addLast("http-aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("http-encoder", new HttpResponseEncoder()); + pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder(decoder)); + pipeline.addLast("websocket-handler", new WebSocketServerHandler()); } else { - handlers.add(new HornetQFrameDecoder(decoder)); + pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder(decoder)); } - handlers.add(new HornetQServerChannelHandler(channelGroup, handler, new Listener())); - - ChannelPipeline pipeline = new StaticChannelPipeline(handlers.toArray(new ChannelHandler[handlers.size()])); + pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener())); return pipeline; } @@ -411,7 +426,7 @@ public ChannelPipeline getPipeline() throws Exception batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS); } - NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID + " " + host + ":" + port); + NettyAcceptor.log.info("Started Netty Acceptor version " + Version.ID + " " + host + ":" + port + " for " + protocol + " protocol"); } private void startServerChannels() diff --git a/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java b/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java index e461c6d13c1..a3817b1f59b 100644 --- a/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java @@ -130,8 +130,10 @@ public RemotingServiceImpl(final Configuration config, this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors)); + // difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline() this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors)); + this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server, interceptors)); } // RemotingService implementation ------------------------------- diff --git a/src/main/org/hornetq/spi/core/protocol/ProtocolType.java b/src/main/org/hornetq/spi/core/protocol/ProtocolType.java index 450d87865e6..a7076a192bb 100644 --- a/src/main/org/hornetq/spi/core/protocol/ProtocolType.java +++ b/src/main/org/hornetq/spi/core/protocol/ProtocolType.java @@ -22,5 +22,5 @@ */ public enum ProtocolType { - CORE, STOMP, AMQP, AARDVARK; + CORE, STOMP, STOMP_WS, AMQP, AARDVARK; } diff --git a/tests/src/org/hornetq/tests/integration/stomp/StompWebSocketTest.java b/tests/src/org/hornetq/tests/integration/stomp/StompWebSocketTest.java new file mode 100644 index 00000000000..914eb962585 --- /dev/null +++ b/tests/src/org/hornetq/tests/integration/stomp/StompWebSocketTest.java @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.hornetq.tests.integration.stomp; + +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.CoreQueueConfiguration; +import org.hornetq.core.config.impl.ConfigurationImpl; +import org.hornetq.core.logging.Logger; +import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory; +import org.hornetq.core.remoting.impl.netty.TransportConstants; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServers; +import org.hornetq.jms.server.JMSServerManager; +import org.hornetq.jms.server.config.JMSConfiguration; +import org.hornetq.jms.server.config.impl.JMSConfigurationImpl; +import org.hornetq.jms.server.impl.JMSServerManagerImpl; +import org.hornetq.spi.core.protocol.ProtocolType; + +public class StompWebSocketTest extends TestCase { + private static final transient Logger log = Logger.getLogger(StompWebSocketTest.class); + private JMSServerManager server; + + /** + * to test the Stomp over Web Sockets protocol, + * uncomment the sleep call and run the stomp-websockets Javascript test suite + * from http://github.com/jmesnil/stomp-websocket + */ + public void testConnect() throws Exception { + //Thread.sleep(10000000); + } + + // Implementation methods + //------------------------------------------------------------------------- + protected void setUp() throws Exception { + server = createServer(); + server.start(); + } + + /** + * @return + * @throws Exception + */ + private JMSServerManager createServer() throws Exception + { + Configuration config = new ConfigurationImpl(); + config.setSecurityEnabled(false); + config.setPersistenceEnabled(false); + + Map params = new HashMap(); + params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP_WS.toString()); + params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT + 1); + TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); + config.getAcceptorConfigurations().add(stompTransport); + config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + config.getQueueConfigurations().add(new CoreQueueConfiguration(getQueueName(), getQueueName(), null, false)); + HornetQServer hornetQServer = HornetQServers.newHornetQServer(config); + + JMSConfiguration jmsConfig = new JMSConfigurationImpl(); + server = new JMSServerManagerImpl(hornetQServer, jmsConfig); + server.setContext(null); + return server; + } + + protected void tearDown() throws Exception { + server.stop(); + } + + protected String getQueueName() { + return "/queue/test"; + } +}