Skip to content

Commit

Permalink
CAMEL-12005: Add websocket support to camel-undertow
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga authored and oscerd committed Dec 13, 2017
1 parent 6f1da10 commit 205533f
Show file tree
Hide file tree
Showing 24 changed files with 2,374 additions and 193 deletions.
8 changes: 7 additions & 1 deletion components/camel-undertow/pom.xml
Expand Up @@ -87,11 +87,17 @@
<artifactId>camel-swagger-java</artifactId> <artifactId>camel-swagger-java</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>${ahc-version}</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId> <artifactId>log4j-api</artifactId>
Expand Down
15 changes: 13 additions & 2 deletions components/camel-undertow/src/main/docs/undertow-component.adoc
Expand Up @@ -8,6 +8,10 @@ That is, the Undertow component behaves as a simple Web server. +
Undertow can also be used as a http client which mean you can also use Undertow can also be used as a http client which mean you can also use
it with Camel as a producer. it with Camel as a producer.


Since Camel version 2.21, the *undertow* component also supports WebSocket
connections and can thus serve as a drop-in replacement for Camel websocket
component or atmosphere-websocket component.

Maven users will need to add the following dependency to their `pom.xml` Maven users will need to add the following dependency to their `pom.xml`
for this component: for this component:


Expand All @@ -26,6 +30,9 @@ for this component:
[source,java] [source,java]
------------------------------------------------------- -------------------------------------------------------
undertow:http://hostname[:port][/resourceUri][?options] undertow:http://hostname[:port][/resourceUri][?options]
undertow:https://hostname[:port][/resourceUri][?options]
undertow:ws://hostname[:port][/resourceUri][?options]
undertow:wss://hostname[:port][/resourceUri][?options]
------------------------------------------------------- -------------------------------------------------------


You can append query options to the URI in the following format, You can append query options to the URI in the following format,
Expand Down Expand Up @@ -76,21 +83,25 @@ with the following path and query parameters:
| *httpURI* | *Required* The url of the HTTP endpoint to use. | | URI | *httpURI* | *Required* The url of the HTTP endpoint to use. | | URI
|=== |===


==== Query Parameters (17 parameters): ==== Query Parameters (21 parameters):


[width="100%",cols="2,5,^1,2",options="header"] [width="100%",cols="2,5,^1,2",options="header"]
|=== |===
| Name | Description | Default | Type | Name | Description | Default | Type
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
| *fireWebSocketChannelEvents* (consumer) | if true the consumer will post notifications to the route when a new WebSocket peer connects disconnects etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean
| *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String | *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String
| *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean
| *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean
| *useStreaming* (consumer) | if true text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean
| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern
| *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler
| *keepAlive* (producer) | Setting to ensure socket is not closed due to inactivity | true | Boolean | *keepAlive* (producer) | Setting to ensure socket is not closed due to inactivity | true | Boolean
| *options* (producer) | Sets additional channel options. The options that can be used are defined in org.xnio.Options. To configure from endpoint uri then prefix each option with option. such as option.close-abort=true&option.send-buffer=8192 | | Map | *options* (producer) | Sets additional channel options. The options that can be used are defined in org.xnio.Options. To configure from endpoint uri then prefix each option with option. such as option.close-abort=true&option.send-buffer=8192 | | Map
| *reuseAddresses* (producer) | Setting to facilitate socket multiplexing | true | Boolean | *reuseAddresses* (producer) | Setting to facilitate socket multiplexing | true | Boolean
| *sendTimeout* (producer) | Timeout in milliseconds when sending to a websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer
| *sendToAll* (producer) | To send to all websocket subscribers. Can be used to configure on endpoint level instead of having to use the UndertowConstants.SEND_TO_ALL header on the message. | | Boolean
| *tcpNoDelay* (producer) | Setting to improve TCP protocol performance | true | Boolean | *tcpNoDelay* (producer) | Setting to improve TCP protocol performance | true | Boolean
| *throwExceptionOnFailure* (producer) | Option to disable throwing the HttpOperationFailedException in case of failed responses from the remote server. This allows you to get all responses regardless of the HTTP status code. | true | Boolean | *throwExceptionOnFailure* (producer) | Option to disable throwing the HttpOperationFailedException in case of failed responses from the remote server. This allows you to get all responses regardless of the HTTP status code. | true | Boolean
| *transferException* (producer) | If enabled and an Exchange failed processing on the consumer side and if the caused Exception was send back serialized in the response as a application/x-java-serialized-object content type. On the producer side the exception will be deserialized and thrown as is instead of the HttpOperationFailedException. The caused exception is required to be serialized. This is by default turned off. If you enable this then be aware that Java will deserialize the incoming data from the request to Java and that can be a potential security risk. | false | Boolean | *transferException* (producer) | If enabled and an Exchange failed processing on the consumer side and if the caused Exception was send back serialized in the response as a application/x-java-serialized-object content type. On the producer side the exception will be deserialized and thrown as is instead of the HttpOperationFailedException. The caused exception is required to be serialized. This is by default turned off. If you enable this then be aware that Java will deserialize the incoming data from the request to Java and that can be a potential security risk. | false | Boolean
Expand All @@ -106,7 +117,7 @@ with the following path and query parameters:
### Message Headers ### Message Headers


Camel uses the same message headers as the link:http.html[HTTP] Camel uses the same message headers as the link:http.html[HTTP]
component. component.
From Camel 2.2, it also uses From Camel 2.2, it also uses
`Exchange.HTTP_CHUNKED,CamelHttpChunked` header to turn on or turn off `Exchange.HTTP_CHUNKED,CamelHttpChunked` header to turn on or turn off
the chuched encoding on the camel-undertow consumer. the chuched encoding on the camel-undertow consumer.
Expand Down
Expand Up @@ -17,12 +17,15 @@
package org.apache.camel.component.undertow; package org.apache.camel.component.undertow;


import java.net.URI; import java.net.URI;
import java.util.function.Supplier;


import io.undertow.Undertow; import io.undertow.Undertow;
import io.undertow.UndertowOptions; import io.undertow.UndertowOptions;
import io.undertow.server.HttpHandler; import io.undertow.server.HttpHandler;
import io.undertow.websockets.WebSocketProtocolHandshakeHandler;


import org.apache.camel.component.undertow.handlers.CamelRootHandler; import org.apache.camel.component.undertow.handlers.CamelRootHandler;
import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler;
import org.apache.camel.component.undertow.handlers.NotFoundHandler; import org.apache.camel.component.undertow.handlers.NotFoundHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -33,9 +36,9 @@
public class DefaultUndertowHost implements UndertowHost { public class DefaultUndertowHost implements UndertowHost {
private static final Logger LOG = LoggerFactory.getLogger(DefaultUndertowHost.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultUndertowHost.class);


private UndertowHostKey key; private final UndertowHostKey key;
private UndertowHostOptions options; private final UndertowHostOptions options;
private CamelRootHandler rootHandler; private final CamelRootHandler rootHandler;
private Undertow undertow; private Undertow undertow;
private String hostString; private String hostString;


Expand All @@ -55,7 +58,7 @@ public void validateEndpointURI(URI httpURI) {
} }


@Override @Override
public synchronized void registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler) { public synchronized HttpHandler registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler) {
if (undertow == null) { if (undertow == null) {
Undertow.Builder builder = Undertow.builder(); Undertow.Builder builder = Undertow.builder();
if (key.getSslContext() != null) { if (key.getSslContext() != null) {
Expand Down Expand Up @@ -103,11 +106,7 @@ public synchronized void registerHandler(HttpHandlerRegistrationInfo registratio
throw e; throw e;
} }
} }

return rootHandler.add(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), registrationInfo.isMatchOnUriPrefix(), handler);
String path = registrationInfo.getUri().getPath();
String methods = registrationInfo.getMethodRestrict();
boolean prefixMatch = registrationInfo.isMatchOnUriPrefix();
rootHandler.add(path, methods != null ? methods.split(",") : null, prefixMatch, handler);
} }


@Override @Override
Expand All @@ -116,10 +115,7 @@ public synchronized void unregisterHandler(HttpHandlerRegistrationInfo registrat
return; return;
} }


String path = registrationInfo.getUri().getPath(); rootHandler.remove(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), registrationInfo.isMatchOnUriPrefix());
String methods = registrationInfo.getMethodRestrict();
boolean prefixMatch = registrationInfo.isMatchOnUriPrefix();
rootHandler.remove(path, methods != null ? methods.split(",") : null, prefixMatch);


if (rootHandler.isEmpty()) { if (rootHandler.isEmpty()) {
LOG.info("Stopping Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), key.getPort()); LOG.info("Stopping Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), key.getPort());
Expand Down
Expand Up @@ -20,31 +20,32 @@


public class HttpHandlerRegistrationInfo { public class HttpHandlerRegistrationInfo {


private Boolean matchOnUriPrefix; private final Boolean matchOnUriPrefix;
private String methodRestrict; private final String methodRestrict;
private URI uri; private final URI uri;


public String getMethodRestrict() { public HttpHandlerRegistrationInfo(URI uri, String methodRestrict, Boolean matchOnUriPrefix) {
return methodRestrict; super();
this.matchOnUriPrefix = matchOnUriPrefix;
this.methodRestrict = methodRestrict;
this.uri = uri;
} }


public void setMethodRestrict(String methodRestrict) { public String getMethodRestrict() {
this.methodRestrict = methodRestrict; return methodRestrict;
} }


public URI getUri() { public URI getUri() {
return uri; return uri;
} }


public void setUri(URI uri) {
this.uri = uri;
}

public Boolean isMatchOnUriPrefix() { public Boolean isMatchOnUriPrefix() {
return matchOnUriPrefix; return matchOnUriPrefix;
} }


public void setMatchOnUriPrefix(Boolean matchOnUriPrefix) { @Override
this.matchOnUriPrefix = matchOnUriPrefix; public String toString() {
return uri + "?matchOnUriPrefix=" + matchOnUriPrefix + "&methodRestrict=" + methodRestrict;
} }

} }
Expand Up @@ -23,6 +23,10 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;


import javax.net.ssl.SSLContext;

import io.undertow.server.HttpHandler;

import org.apache.camel.CamelContext; import org.apache.camel.CamelContext;
import org.apache.camel.ComponentVerifier; import org.apache.camel.ComponentVerifier;
import org.apache.camel.Consumer; import org.apache.camel.Consumer;
Expand Down Expand Up @@ -299,23 +303,20 @@ protected void doStart() throws Exception {
} }
} }


public void registerConsumer(UndertowConsumer consumer) { public HttpHandler registerEndpoint(HttpHandlerRegistrationInfo registrationInfo, SSLContext sslContext, HttpHandler handler) {
URI uri = consumer.getEndpoint().getHttpURI(); final URI uri = registrationInfo.getUri();
UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), consumer.getEndpoint().getSslContext()); final UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), sslContext);
UndertowHost host = undertowRegistry.get(key); final UndertowHost host = undertowRegistry.computeIfAbsent(key, k -> createUndertowHost(k));
if (host == null) {
host = createUndertowHost(key);
undertowRegistry.put(key, host);
}
host.validateEndpointURI(uri); host.validateEndpointURI(uri);
host.registerHandler(consumer.getHttpHandlerRegistrationInfo(), consumer.getHttpHandler()); return host.registerHandler(registrationInfo, handler);
} }


public void unregisterConsumer(UndertowConsumer consumer) { public void unregisterEndpoint(HttpHandlerRegistrationInfo registrationInfo, SSLContext sslContext) {
URI uri = consumer.getEndpoint().getHttpURI(); final URI uri = registrationInfo.getUri();
UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), consumer.getEndpoint().getSslContext()); final UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), sslContext);
UndertowHost host = undertowRegistry.get(key); final UndertowHost host = undertowRegistry.get(key);
host.unregisterHandler(consumer.getHttpHandlerRegistrationInfo()); host.unregisterHandler(registrationInfo);
} }


protected UndertowHost createUndertowHost(UndertowHostKey key) { protected UndertowHost createUndertowHost(UndertowHostKey key) {
Expand Down
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.undertow;

public final class UndertowConstants {

public static final String CONNECTION_KEY = "websocket.connectionKey";
public static final String CONNECTION_KEY_LIST = "websocket.connectionKey.list";
public static final String SEND_TO_ALL = "websocket.sendToAll";
public static final String EVENT_TYPE = "websocket.eventType";
public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum";

/**
* WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route.
*/
public enum EventType {
/**
* A new peer has connected.
*/
ONOPEN(1),

/**
* A peer has disconnected.
*/
ONCLOSE(0),

/**
* Unused in Undertow component. Kept for compatibility with Camel websocket component.
*/
ONERROR(-1);
private final int code;

EventType(int code) {
this.code = code;
}

/**
* @return a numeric identifier of this {@link EventType}. Kept for compatibility with Camel websocket
* component.
*/
public int getCode() {
return code;
}

public static EventType ofCode(int code) {
switch (code) {
case 1:
return ONOPEN;
case 0:
return ONCLOSE;
case -1:
return ONERROR;
default:
throw new IllegalArgumentException("Cannot find an " + EventType.class.getName() + " for code " + code);
}
}
}

public static final String WS_PROTOCOL = "ws";
public static final String WSS_PROTOCOL = "wss";

private UndertowConstants() {
};

}

0 comments on commit 205533f

Please sign in to comment.