From 205533fe4024f2c2b8ef9d2b219683e65a1c7dd4 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Wed, 13 Dec 2017 11:16:07 +0100 Subject: [PATCH] CAMEL-12005: Add websocket support to camel-undertow --- components/camel-undertow/pom.xml | 8 +- .../src/main/docs/undertow-component.adoc | 15 +- .../undertow/DefaultUndertowHost.java | 22 +- .../undertow/HttpHandlerRegistrationInfo.java | 27 +- .../component/undertow/UndertowComponent.java | 29 +- .../component/undertow/UndertowConstants.java | 79 +++ .../component/undertow/UndertowConsumer.java | 98 +++- .../component/undertow/UndertowEndpoint.java | 86 ++++ .../component/undertow/UndertowHost.java | 23 +- .../component/undertow/UndertowProducer.java | 180 ++++--- .../undertow/handlers/CamelMethodHandler.java | 124 +++-- .../undertow/handlers/CamelRootHandler.java | 40 +- .../handlers/CamelWebSocketHandler.java | 372 ++++++++++++++ .../handlers/ExtendedWebSocketCallback.java | 33 ++ .../UndertowConsumerUnregisterTest.java | 8 +- .../handlers/CamelRootHandlerTest.java | 81 ++++ .../component/undertow/ws/TestClient.java | 158 ++++++ .../ws/UndertowWsConsumerRouteTest.java | 454 ++++++++++++++++++ .../UndertowWsProducerRouteRestartTest.java | 120 +++++ .../ws/UndertowWsProducerRouteTest.java | 96 ++++ .../undertow/ws/UndertowWsTwoRoutesTest.java | 138 ++++++ ...utesToSameEndpointSendToAllHeaderTest.java | 102 ++++ ...UndertowWsTwoRoutesToSameEndpointTest.java | 100 ++++ .../undertow/ws/UndertowWssRouteTest.java | 174 +++++++ 24 files changed, 2374 insertions(+), 193 deletions(-) create mode 100644 components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java create mode 100644 components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java create mode 100644 components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/ExtendedWebSocketCallback.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/handlers/CamelRootHandlerTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/TestClient.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteRestartTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointSendToAllHeaderTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointTest.java create mode 100644 components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWssRouteTest.java diff --git a/components/camel-undertow/pom.xml b/components/camel-undertow/pom.xml index 449cddcc933cb..847f683f67dd4 100644 --- a/components/camel-undertow/pom.xml +++ b/components/camel-undertow/pom.xml @@ -87,11 +87,17 @@ camel-swagger-java test + + org.asynchttpclient + async-http-client + ${ahc-version} + test + junit junit test - + org.apache.logging.log4j log4j-api diff --git a/components/camel-undertow/src/main/docs/undertow-component.adoc b/components/camel-undertow/src/main/docs/undertow-component.adoc index b81aeab7e77bc..945d26c016b91 100644 --- a/components/camel-undertow/src/main/docs/undertow-component.adoc +++ b/components/camel-undertow/src/main/docs/undertow-component.adoc @@ -8,6 +8,10 @@ That is, the Undertow component behaves as a simple Web server. + Undertow can also be used as a http client which mean you can also use it with Camel as a producer. +Since Camel version 2.21, the *undertow* component also supports WebSocket +connections and can thus serve as a drop-in replacement for Camel websocket +component or atmosphere-websocket component. + Maven users will need to add the following dependency to their `pom.xml` for this component: @@ -26,6 +30,9 @@ for this component: [source,java] ------------------------------------------------------- undertow:http://hostname[:port][/resourceUri][?options] +undertow:https://hostname[:port][/resourceUri][?options] +undertow:ws://hostname[:port][/resourceUri][?options] +undertow:wss://hostname[:port][/resourceUri][?options] ------------------------------------------------------- You can append query options to the URI in the following format, @@ -76,21 +83,25 @@ with the following path and query parameters: | *httpURI* | *Required* The url of the HTTP endpoint to use. | | URI |=== -==== Query Parameters (17 parameters): +==== Query Parameters (21 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean +| *fireWebSocketChannelEvents* (consumer) | if true the consumer will post notifications to the route when a new WebSocket peer connects disconnects etc. See UndertowConstants.EVENT_TYPE and EventType. | false | boolean | *httpMethodRestrict* (consumer) | Used to only allow consuming if the HttpMethod matches such as GET/POST/PUT etc. Multiple methods can be specified separated by comma. | | String | *matchOnUriPrefix* (consumer) | Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found. | false | Boolean | *optionsEnabled* (consumer) | Specifies whether to enable HTTP OPTIONS for this Servlet consumer. By default OPTIONS is turned off. | false | boolean +| *useStreaming* (consumer) | if true text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and java.io.InputStream respectively before they are passed to an Exchange; otherwise they will be passed as String and byte respectively. | false | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *cookieHandler* (producer) | Configure a cookie handler to maintain a HTTP session | | CookieHandler | *keepAlive* (producer) | Setting to ensure socket is not closed due to inactivity | true | Boolean | *options* (producer) | Sets additional channel options. The options that can be used are defined in org.xnio.Options. To configure from endpoint uri then prefix each option with option. such as option.close-abort=true&option.send-buffer=8192 | | Map | *reuseAddresses* (producer) | Setting to facilitate socket multiplexing | true | Boolean +| *sendTimeout* (producer) | Timeout in milliseconds when sending to a websocket channel. The default timeout is 30000 (30 seconds). | 30000 | Integer +| *sendToAll* (producer) | To send to all websocket subscribers. Can be used to configure on endpoint level instead of having to use the UndertowConstants.SEND_TO_ALL header on the message. | | Boolean | *tcpNoDelay* (producer) | Setting to improve TCP protocol performance | true | Boolean | *throwExceptionOnFailure* (producer) | Option to disable throwing the HttpOperationFailedException in case of failed responses from the remote server. This allows you to get all responses regardless of the HTTP status code. | true | Boolean | *transferException* (producer) | If enabled and an Exchange failed processing on the consumer side and if the caused Exception was send back serialized in the response as a application/x-java-serialized-object content type. On the producer side the exception will be deserialized and thrown as is instead of the HttpOperationFailedException. The caused exception is required to be serialized. This is by default turned off. If you enable this then be aware that Java will deserialize the incoming data from the request to Java and that can be a potential security risk. | false | Boolean @@ -106,7 +117,7 @@ with the following path and query parameters: ### Message Headers Camel uses the same message headers as the link:http.html[HTTP] -component. +component. From Camel 2.2, it also uses `Exchange.HTTP_CHUNKED,CamelHttpChunked` header to turn on or turn off the chuched encoding on the camel-undertow consumer. diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java index 4ef6126c0eddc..1c8cd3119bc9b 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/DefaultUndertowHost.java @@ -17,12 +17,15 @@ package org.apache.camel.component.undertow; import java.net.URI; +import java.util.function.Supplier; import io.undertow.Undertow; import io.undertow.UndertowOptions; import io.undertow.server.HttpHandler; +import io.undertow.websockets.WebSocketProtocolHandshakeHandler; import org.apache.camel.component.undertow.handlers.CamelRootHandler; +import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; import org.apache.camel.component.undertow.handlers.NotFoundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,9 +36,9 @@ public class DefaultUndertowHost implements UndertowHost { private static final Logger LOG = LoggerFactory.getLogger(DefaultUndertowHost.class); - private UndertowHostKey key; - private UndertowHostOptions options; - private CamelRootHandler rootHandler; + private final UndertowHostKey key; + private final UndertowHostOptions options; + private final CamelRootHandler rootHandler; private Undertow undertow; private String hostString; @@ -55,7 +58,7 @@ public void validateEndpointURI(URI httpURI) { } @Override - public synchronized void registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler) { + public synchronized HttpHandler registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler) { if (undertow == null) { Undertow.Builder builder = Undertow.builder(); if (key.getSslContext() != null) { @@ -103,11 +106,7 @@ public synchronized void registerHandler(HttpHandlerRegistrationInfo registratio throw e; } } - - String path = registrationInfo.getUri().getPath(); - String methods = registrationInfo.getMethodRestrict(); - boolean prefixMatch = registrationInfo.isMatchOnUriPrefix(); - rootHandler.add(path, methods != null ? methods.split(",") : null, prefixMatch, handler); + return rootHandler.add(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), registrationInfo.isMatchOnUriPrefix(), handler); } @Override @@ -116,10 +115,7 @@ public synchronized void unregisterHandler(HttpHandlerRegistrationInfo registrat return; } - String path = registrationInfo.getUri().getPath(); - String methods = registrationInfo.getMethodRestrict(); - boolean prefixMatch = registrationInfo.isMatchOnUriPrefix(); - rootHandler.remove(path, methods != null ? methods.split(",") : null, prefixMatch); + rootHandler.remove(registrationInfo.getUri().getPath(), registrationInfo.getMethodRestrict(), registrationInfo.isMatchOnUriPrefix()); if (rootHandler.isEmpty()) { LOG.info("Stopping Undertow server on {}://{}:{}", key.getSslContext() != null ? "https" : "http", key.getHost(), key.getPort()); diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/HttpHandlerRegistrationInfo.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/HttpHandlerRegistrationInfo.java index 85a18743db64e..ce93093c00e62 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/HttpHandlerRegistrationInfo.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/HttpHandlerRegistrationInfo.java @@ -20,31 +20,32 @@ public class HttpHandlerRegistrationInfo { - private Boolean matchOnUriPrefix; - private String methodRestrict; - private URI uri; + private final Boolean matchOnUriPrefix; + private final String methodRestrict; + private final URI uri; - public String getMethodRestrict() { - return methodRestrict; + public HttpHandlerRegistrationInfo(URI uri, String methodRestrict, Boolean matchOnUriPrefix) { + super(); + this.matchOnUriPrefix = matchOnUriPrefix; + this.methodRestrict = methodRestrict; + this.uri = uri; } - public void setMethodRestrict(String methodRestrict) { - this.methodRestrict = methodRestrict; + public String getMethodRestrict() { + return methodRestrict; } public URI getUri() { return uri; } - public void setUri(URI uri) { - this.uri = uri; - } - public Boolean isMatchOnUriPrefix() { return matchOnUriPrefix; } - public void setMatchOnUriPrefix(Boolean matchOnUriPrefix) { - this.matchOnUriPrefix = matchOnUriPrefix; + @Override + public String toString() { + return uri + "?matchOnUriPrefix=" + matchOnUriPrefix + "&methodRestrict=" + methodRestrict; } + } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java index 8fa093e616741..0075a17020f62 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java @@ -23,6 +23,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.ssl.SSLContext; + +import io.undertow.server.HttpHandler; + import org.apache.camel.CamelContext; import org.apache.camel.ComponentVerifier; import org.apache.camel.Consumer; @@ -299,23 +303,20 @@ protected void doStart() throws Exception { } } - public void registerConsumer(UndertowConsumer consumer) { - URI uri = consumer.getEndpoint().getHttpURI(); - UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), consumer.getEndpoint().getSslContext()); - UndertowHost host = undertowRegistry.get(key); - if (host == null) { - host = createUndertowHost(key); - undertowRegistry.put(key, host); - } + public HttpHandler registerEndpoint(HttpHandlerRegistrationInfo registrationInfo, SSLContext sslContext, HttpHandler handler) { + final URI uri = registrationInfo.getUri(); + final UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), sslContext); + final UndertowHost host = undertowRegistry.computeIfAbsent(key, k -> createUndertowHost(k)); + host.validateEndpointURI(uri); - host.registerHandler(consumer.getHttpHandlerRegistrationInfo(), consumer.getHttpHandler()); + return host.registerHandler(registrationInfo, handler); } - public void unregisterConsumer(UndertowConsumer consumer) { - URI uri = consumer.getEndpoint().getHttpURI(); - UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), consumer.getEndpoint().getSslContext()); - UndertowHost host = undertowRegistry.get(key); - host.unregisterHandler(consumer.getHttpHandlerRegistrationInfo()); + public void unregisterEndpoint(HttpHandlerRegistrationInfo registrationInfo, SSLContext sslContext) { + final URI uri = registrationInfo.getUri(); + final UndertowHostKey key = new UndertowHostKey(uri.getHost(), uri.getPort(), sslContext); + final UndertowHost host = undertowRegistry.get(key); + host.unregisterHandler(registrationInfo); } protected UndertowHost createUndertowHost(UndertowHostKey key) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java new file mode 100644 index 0000000000000..f7a784ca7277c --- /dev/null +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +public final class UndertowConstants { + + public static final String CONNECTION_KEY = "websocket.connectionKey"; + public static final String CONNECTION_KEY_LIST = "websocket.connectionKey.list"; + public static final String SEND_TO_ALL = "websocket.sendToAll"; + public static final String EVENT_TYPE = "websocket.eventType"; + public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum"; + + /** + * WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route. + */ + public enum EventType { + /** + * A new peer has connected. + */ + ONOPEN(1), + + /** + * A peer has disconnected. + */ + ONCLOSE(0), + + /** + * Unused in Undertow component. Kept for compatibility with Camel websocket component. + */ + ONERROR(-1); + private final int code; + + EventType(int code) { + this.code = code; + } + + /** + * @return a numeric identifier of this {@link EventType}. Kept for compatibility with Camel websocket + * component. + */ + public int getCode() { + return code; + } + + public static EventType ofCode(int code) { + switch (code) { + case 1: + return ONOPEN; + case 0: + return ONCLOSE; + case -1: + return ONERROR; + default: + throw new IllegalArgumentException("Cannot find an " + EventType.class.getName() + " for code " + code); + } + } + } + + public static final String WS_PROTOCOL = "ws"; + public static final String WSS_PROTOCOL = "wss"; + + private UndertowConstants() { + }; + +} diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java index f57286d3c7782..8abc793adf31b 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java @@ -28,9 +28,15 @@ import io.undertow.util.Methods; import io.undertow.util.MimeMappings; import io.undertow.util.StatusCodes; +import io.undertow.websockets.core.WebSocketChannel; + +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; +import org.apache.camel.component.undertow.UndertowConstants.EventType; +import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; import org.apache.camel.impl.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +47,7 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { private static final Logger LOG = LoggerFactory.getLogger(UndertowConsumer.class); - - private HttpHandlerRegistrationInfo registrationInfo; + private CamelWebSocketHandler webSocketHandler; public UndertowConsumer(UndertowEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -56,32 +61,30 @@ public UndertowEndpoint getEndpoint() { @Override protected void doStart() throws Exception { super.doStart(); - getEndpoint().getComponent().registerConsumer(this); + final UndertowEndpoint endpoint = getEndpoint(); + if (endpoint.isWebSocket()) { + /* + * note that the new CamelWebSocketHandler() we pass to registerEndpoint() does not necessarily have to be + * the same instance that is returned from there + */ + this.webSocketHandler = (CamelWebSocketHandler) endpoint.getComponent().registerEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext(), new CamelWebSocketHandler()); + this.webSocketHandler.setConsumer(this); + } else { + // allow for HTTP 1.1 continue + endpoint.getComponent().registerEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext(), Handlers.httpContinueRead( + // wrap with EagerFormParsingHandler to enable undertow form parsers + new EagerFormParsingHandler().setNext(UndertowConsumer.this))); + } } @Override protected void doStop() throws Exception { super.doStop(); - getEndpoint().getComponent().unregisterConsumer(this); - } - - public HttpHandlerRegistrationInfo getHttpHandlerRegistrationInfo() { - if (registrationInfo == null) { - UndertowEndpoint endpoint = getEndpoint(); - - registrationInfo = new HttpHandlerRegistrationInfo(); - registrationInfo.setUri(endpoint.getHttpURI()); - registrationInfo.setMethodRestrict(endpoint.getHttpMethodRestrict()); - registrationInfo.setMatchOnUriPrefix(endpoint.getMatchOnUriPrefix()); + if (this.webSocketHandler != null) { + this.webSocketHandler.setConsumer(null); } - return registrationInfo; - } - - public HttpHandler getHttpHandler() { - // allow for HTTP 1.1 continue - return Handlers.httpContinueRead( - // wrap with EagerFormParsingHandler to enable undertow form parsers - new EagerFormParsingHandler().setNext(this)); + UndertowEndpoint endpoint = getEndpoint(); + endpoint .getComponent().unregisterEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext()); } @Override @@ -141,6 +144,57 @@ public void handleRequest(HttpServerExchange httpExchange) throws Exception { httpExchange.getResponseSender().close(); } + /** + * Create an {@link Exchange} from the associated {@link UndertowEndpoint} and set the {@code in} {@link Message}'s + * body to the given {@code message} and {@link UndertowConstants#CONNECTION_KEY} header to the given + * {@code connectionKey}. + * + * @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received + * @param message the message received via the {@link WebSocketChannel} + */ + public void sendMessage(final String connectionKey, final Object message) { + + final Exchange exchange = getEndpoint().createExchange(); + + // set header and body + exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey); + exchange.getIn().setBody(message); + + // send exchange using the async routing engine + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } + } + }); + } + + /** + * Send a notification related a WebSocket peer. + * + * @param connectionKey of WebSocket peer + * @param eventType the type of the event + */ + public void sendEventNotification(String connectionKey, EventType eventType) { + final Exchange exchange = getEndpoint().createExchange(); + + final Message in = exchange.getIn(); + in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey); + in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode()); + in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType); + + // send exchange using the async routing engine + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } + }); + } + private Object getResponseBody(HttpServerExchange httpExchange, Exchange camelExchange) throws IOException { Object result; if (camelExchange.hasOut()) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java index 35ba81a49d3e1..81403be0dd9ce 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java @@ -31,6 +31,8 @@ import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.undertow.UndertowConstants.EventType; +import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; import org.apache.camel.http.common.cookie.CookieHandler; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.HeaderFilterStrategy; @@ -57,6 +59,9 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, private UndertowComponent component; private SSLContext sslContext; private OptionMap optionMap; + private HttpHandlerRegistrationInfo registrationInfo; + private CamelWebSocketHandler webSocketHttpHandler; + private boolean isWebSocket; @UriPath @Metadata(required = "true") private URI httpURI; @@ -87,6 +92,14 @@ public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, private boolean optionsEnabled; @UriParam(label = "producer") private CookieHandler cookieHandler; + @UriParam(label = "producer,websocket") + private Boolean sendToAll; + @UriParam(label = "producer,websocket", defaultValue = "30000") + private Integer sendTimeout = 30000; + @UriParam(label = "consumer,websocket", defaultValue = "false") + private boolean useStreaming; + @UriParam(label = "consumer,websocket", defaultValue = "false") + private boolean fireWebSocketChannelEvents; public UndertowEndpoint(String uri, UndertowComponent component) throws URISyntaxException { super(uri, component); @@ -308,10 +321,62 @@ public void setCookieHandler(CookieHandler cookieHandler) { this.cookieHandler = cookieHandler; } + public Boolean getSendToAll() { + return sendToAll; + } + + /** + * To send to all websocket subscribers. Can be used to configure on endpoint level, instead of having to use the + * {@code UndertowConstants.SEND_TO_ALL} header on the message. + */ + public void setSendToAll(Boolean sendToAll) { + this.sendToAll = sendToAll; + } + + public Integer getSendTimeout() { + return sendTimeout; + } + + /** + * Timeout in milliseconds when sending to a websocket channel. + * The default timeout is 30000 (30 seconds). + */ + public void setSendTimeout(Integer sendTimeout) { + this.sendTimeout = sendTimeout; + } + + public boolean isUseStreaming() { + return useStreaming; + } + + /** + * if {@code true}, text and binary messages coming through a WebSocket will be wrapped as java.io.Reader and + * java.io.InputStream respectively before they are passed to an {@link Exchange}; otherwise they will be passed as + * String and byte[] respectively. + */ + public void setUseStreaming(boolean useStreaming) { + this.useStreaming = useStreaming; + } + + public boolean isFireWebSocketChannelEvents() { + return fireWebSocketChannelEvents; + } + + /** + * if {@code true}, the consumer will post notifications to the route when a new WebSocket peer connects, + * disconnects, etc. See {@code UndertowConstants.EVENT_TYPE} and {@link EventType}. + */ + public void setFireWebSocketChannelEvents(boolean fireWebSocketChannelEvents) { + this.fireWebSocketChannelEvents = fireWebSocketChannelEvents; + } + @Override protected void doStart() throws Exception { super.doStart(); + final String scheme = httpURI.getScheme(); + this.isWebSocket = UndertowConstants.WS_PROTOCOL.equalsIgnoreCase(scheme) || UndertowConstants.WSS_PROTOCOL.equalsIgnoreCase(scheme); + if (sslContextParameters != null) { sslContext = sslContextParameters.createSSLContext(getCamelContext()); } @@ -367,4 +432,25 @@ protected void doStart() throws Exception { } } + /** + * @return {@code true} if {@link #getHttpURI()}'s scheme is {@code ws} or {@code wss} + */ + public boolean isWebSocket() { + return isWebSocket; + } + + public HttpHandlerRegistrationInfo getHttpHandlerRegistrationInfo() { + if (registrationInfo == null) { + registrationInfo = new HttpHandlerRegistrationInfo(getHttpURI(), getHttpMethodRestrict(), getMatchOnUriPrefix()); + } + return registrationInfo; + } + + public CamelWebSocketHandler getWebSocketHttpHandler() { + if (webSocketHttpHandler == null) { + webSocketHttpHandler = new CamelWebSocketHandler(); + } + return webSocketHttpHandler; + } + } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowHost.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowHost.java index 1609ab6b17adc..16f45788ad5ec 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowHost.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowHost.java @@ -20,6 +20,8 @@ import io.undertow.server.HttpHandler; +import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; + /** * An undertow host abstraction * @@ -32,12 +34,27 @@ public interface UndertowHost { void validateEndpointURI(URI httpURI); /** - * Register a handler with the given {@link HttpHandlerRegistrationInfo} + * Register a handler with the given {@link HttpHandlerRegistrationInfo}. Note that for some kinds of handlers (most + * notably {@link CamelWebSocketHandler}), it is legal to call this method multiple times with equal + * {@link HttpHandlerRegistrationInfo} and {@link HttpHandler}. In such cases the returned {@link HttpHandler} may + * differ from the passed {@link HttpHandler} and the returned instance is the effectively registered one for the + * given {@link HttpHandlerRegistrationInfo}. + * + * @param registrationInfo + * the {@link HttpHandlerRegistrationInfo} related to {@code handler} + * @param handler + * the {@link HttpHandler} to register + * @return the given {@code handler} or a different {@link HttpHandler} that has been registered with the given + * {@link HttpHandlerRegistrationInfo} earlier. */ - void registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler); + HttpHandler registerHandler(HttpHandlerRegistrationInfo registrationInfo, HttpHandler handler); /** - * Unregister a handler with the given {@link HttpHandlerRegistrationInfo} + * Unregister a handler with the given {@link HttpHandlerRegistrationInfo}. Note that if + * {@link #registerHandler(HttpHandlerRegistrationInfo, HttpHandler)} was successfully invoked multiple times for an + * equivalent {@link HttpHandlerRegistrationInfo} then {@link #unregisterHandler(HttpHandlerRegistrationInfo)} must + * be called the same number of times to unregister the associated handler completely. */ void unregisterHandler(HttpHandlerRegistrationInfo registrationInfo); + } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index dfa986f4df797..b78e75809f0f9 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -17,6 +17,8 @@ package org.apache.camel.component.undertow; import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -24,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; + import javax.net.ssl.SSLContext; import io.undertow.client.ClientRequest; @@ -33,10 +36,12 @@ import io.undertow.util.HeaderMap; import io.undertow.util.Headers; import io.undertow.util.HttpString; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.TypeConverter; +import org.apache.camel.component.undertow.handlers.CamelWebSocketHandler; import org.apache.camel.http.common.cookie.CookieHandler; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.URISupport; @@ -65,6 +70,7 @@ public class UndertowProducer extends DefaultAsyncProducer { private DefaultByteBufferPool pool; private XnioSsl ssl; private XnioWorker worker; + private CamelWebSocketHandler webSocketHandler; public UndertowProducer(final UndertowEndpoint endpoint, final OptionMap options) { super(endpoint); @@ -77,81 +83,135 @@ public UndertowEndpoint getEndpoint() { return endpoint; } + boolean isSendToAll(Message in) { + // header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint) + Boolean value = in.getHeader(UndertowConstants.SEND_TO_ALL, endpoint.getSendToAll(), Boolean.class); + return value == null ? false : value; + } + @Override public boolean process(final Exchange camelExchange, final AsyncCallback callback) { - final URI uri; - final HttpString method; - try { - final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint()); - uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint()); - method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null); - } catch (final URISyntaxException e) { - camelExchange.setException(e); - callback.done(true); - return true; - } - - final String pathAndQuery = URISupport.pathAndQueryOf(uri); - - final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding(); - - final CookieHandler cookieHandler = endpoint.getCookieHandler(); - final Map> cookieHeaders; - if (cookieHandler != null) { + if (endpoint.isWebSocket()) { + return processWebSocket(camelExchange, callback); + } else { + /* not a WebSocket */ + final URI uri; + final HttpString method; try { - cookieHeaders = cookieHandler.loadCookies(camelExchange, uri); - } catch (final IOException e) { + final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint()); + uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint()); + method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null); + } catch (final URISyntaxException e) { camelExchange.setException(e); callback.done(true); return true; } - } else { - cookieHeaders = Collections.emptyMap(); - } - final ClientRequest request = new ClientRequest(); - request.setMethod(method); - request.setPath(pathAndQuery); + final String pathAndQuery = URISupport.pathAndQueryOf(uri); + + final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding(); + + final CookieHandler cookieHandler = endpoint.getCookieHandler(); + final Map> cookieHeaders; + if (cookieHandler != null) { + try { + cookieHeaders = cookieHandler.loadCookies(camelExchange, uri); + } catch (final IOException e) { + camelExchange.setException(e); + callback.done(true); + return true; + } + } else { + cookieHeaders = Collections.emptyMap(); + } - final HeaderMap requestHeaders = request.getRequestHeaders(); + final ClientRequest request = new ClientRequest(); + request.setMethod(method); + request.setPath(pathAndQuery); - // Set the Host header - final Message message = camelExchange.getIn(); - final String host = message.getHeader(Headers.HOST_STRING, String.class); - requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(() -> uri.getAuthority())); + final HeaderMap requestHeaders = request.getRequestHeaders(); - final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); + // Set the Host header + final Message message = camelExchange.getIn(); + final String host = message.getHeader(Headers.HOST_STRING, String.class); + requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(() -> uri.getAuthority())); - final TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); - final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); + final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); - // As tryConvertTo is used to convert the body, we should do null check - // or the call bodyAsByte.remaining() may throw an NPE - if (body != null && bodyAsByte != null) { - requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); - } + final TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); + final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); - for (final Map.Entry> entry : cookieHeaders.entrySet()) { - requestHeaders.putAll(HttpString.tryFromString(entry.getKey()), entry.getValue()); - } + // As tryConvertTo is used to convert the body, we should do null check + // or the call bodyAsByte.remaining() may throw an NPE + if (body != null && bodyAsByte != null) { + requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Executing http {} method: {}", method, pathAndQuery); + for (final Map.Entry> entry : cookieHeaders.entrySet()) { + requestHeaders.putAll(HttpString.tryFromString(entry.getKey()), entry.getValue()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Executing http {} method: {}", method, pathAndQuery); + } + + final UndertowClientCallback clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), + request, bodyAsByte); + + // when connect succeeds or fails UndertowClientCallback will + // get notified on a I/O thread run by Xnio worker. The writing + // of request and reading of response is performed also in the + // callback + client.connect(clientCallback, uri, worker, ssl, pool, options); + + // the call above will proceed on Xnio I/O thread we will + // notify the exchange asynchronously when the HTTP exchange + // ends with success or failure from UndertowClientCallback + return false; } - final UndertowClientCallback clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), - request, bodyAsByte); + } - // when connect succeeds or fails UndertowClientCallback will - // get notified on a I/O thread run by Xnio worker. The writing - // of request and reading of response is performed also in the - // callback - client.connect(clientCallback, uri, worker, ssl, pool, options); + private boolean processWebSocket(final Exchange camelExchange, final AsyncCallback camelCallback) { + final Message in = camelExchange.getIn(); + try { + Object message = in.getBody(); + if (!(message instanceof String || message instanceof byte[] || message instanceof Reader + || message instanceof InputStream)) { + message = in.getBody(String.class); + } - // the call above will proceed on Xnio I/O thread we will - // notify the exchange asynchronously when the HTTP exchange - // ends with success or failure from UndertowClientCallback - return false; + if (message != null) { + final int timeout = endpoint.getSendTimeout(); + if (isSendToAll(in)) { + return webSocketHandler.send(peer -> true, message, timeout, camelExchange, camelCallback); + } + final List connectionKeys = in.getHeader(UndertowConstants.CONNECTION_KEY_LIST, List.class); + if (connectionKeys != null) { + return webSocketHandler.send( + peer -> connectionKeys.contains(peer.getAttribute(UndertowConstants.CONNECTION_KEY)), message, + timeout, camelExchange, camelCallback); + } + final String connectionKey = in.getHeader(UndertowConstants.CONNECTION_KEY, String.class); + if (connectionKey != null) { + return webSocketHandler.send( + peer -> connectionKey.equals(peer.getAttribute(UndertowConstants.CONNECTION_KEY)), message, + timeout, camelExchange, camelCallback); + } + throw new IllegalStateException( + String.format("Cannot process message which has none of the headers %s, %s or %s set: %s", + UndertowConstants.SEND_TO_ALL, UndertowConstants.CONNECTION_KEY_LIST, + UndertowConstants.CONNECTION_KEY, in)); + } else { + /* nothing to do for a null body */ + camelCallback.done(true); + return true; + } + } catch (Exception e) { + camelExchange.setException(e); + camelCallback.done(true); + return true; + } } @Override @@ -171,6 +231,10 @@ protected void doStart() throws Exception { client = UndertowClient.getInstance(); + if (endpoint.isWebSocket()) { + this.webSocketHandler = (CamelWebSocketHandler) endpoint.getComponent().registerEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext(), new CamelWebSocketHandler()); + } + LOG.debug("Created worker: {} with options: {}", worker, options); } @@ -178,6 +242,10 @@ protected void doStart() throws Exception { protected void doStop() throws Exception { super.doStop(); + if (endpoint.isWebSocket()) { + endpoint.getComponent().unregisterEndpoint(endpoint.getHttpHandlerRegistrationInfo(), endpoint.getSslContext()); + } + if (worker != null && !worker.isShutdown()) { LOG.debug("Shutting down worker: {}", worker); worker.shutdown(); diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java index ed7eb93a22884..21af810b4e638 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelMethodHandler.java @@ -16,76 +16,130 @@ */ package org.apache.camel.component.undertow.handlers; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; import io.undertow.util.MimeMappings; import io.undertow.util.StatusCodes; + import org.apache.camel.component.undertow.ExchangeHeaders; /** * A HttpHandler build a mapping between HTTP methods and handlers and dispatch requests along the map. */ public class CamelMethodHandler implements HttpHandler { - private Map methodMap = new HashMap(); - private HttpHandler defaultHandler; + /** + * A key to use for handlers with no method specified + */ + private static final String DEFAULT_HANDLER_KEY = ""; + private static final String[] DEFAULT_METHODS; + static { + DEFAULT_METHODS = new String[] {DEFAULT_HANDLER_KEY}; + } + + private final Map methodMap = new ConcurrentHashMap<>(); private String handlerString; + CamelMethodHandler() { + } + @Override public void handleRequest(HttpServerExchange exchange) throws Exception { - HttpHandler handler = methodMap.get(exchange.getRequestMethod().toString()); - if (handler != null) { + HttpHandler handler = null; + /* No need to lock methodMap for read access in this method */ + MethodEntry entry = methodMap.get(exchange.getRequestMethod().toString()); + if (entry != null && (handler = entry.handler) != null) { handler.handleRequest(exchange); - } else if (defaultHandler != null) { - defaultHandler.handleRequest(exchange); } else { - exchange.setStatusCode(StatusCodes.METHOD_NOT_ALLOWED); - exchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_TYPE, MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt")); - exchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_LENGTH, 0); - exchange.endExchange(); + entry = methodMap.get(DEFAULT_HANDLER_KEY); + if (entry != null && (handler = entry.handler) != null) { + handler.handleRequest(exchange); + } else { + exchange.setStatusCode(StatusCodes.METHOD_NOT_ALLOWED); + exchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_TYPE, MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt")); + exchange.getResponseHeaders().put(ExchangeHeaders.CONTENT_LENGTH, 0); + exchange.endExchange(); + } } } - public synchronized void add(String[] methods, HttpHandler handler) { - Map adding = new HashMap(); - for (String method : methods) { - adding.put(method, handler); + public HttpHandler add(String methods, HttpHandler handler) { + HttpHandler result = null; + synchronized (methodMap) { // we lock on methodMap to get a reliable sum of refCounts in remove(String) + for (String method : splitMethods(methods)) { + MethodEntry en = methodMap.computeIfAbsent(method, m -> new MethodEntry()); + result = en.addRef(handler, method); + } } - methodMap.putAll(adding); handlerString = null; + return result; } - public synchronized void remove(String[] methods) { - for (String method : methods) { - methodMap.remove(method); + + public boolean remove(String methods) { + boolean result; + synchronized (methodMap) { // we lock on methodMap to get a reliable sum of refCounts + for (String method : splitMethods(methods)) { + final MethodEntry en = methodMap.get(method); + if (en != null) { + en.removeRef(); + } + } + result = methodMap.values().stream().mapToInt(en -> en.refCount).sum() == 0; } handlerString = null; + return result; } - public synchronized void addDefault(HttpHandler handler) { - if (defaultHandler != null) { - throw new IllegalArgumentException(String.format( - "Duplicate default handler: '%s', '%s'", defaultHandler, handler)); + public String toString() { + if (handlerString == null) { + handlerString = "CamelMethodHandler[" + methodMap + "]"; } - defaultHandler = handler; - handlerString = null; + return handlerString; } - public synchronized void removeDefault() { - defaultHandler = null; - handlerString = null; + private String[] splitMethods(String methods) { + String[] result = methods != null ? methods.split(",") : DEFAULT_METHODS; + return result.length == 0 ? DEFAULT_METHODS : result; } - public boolean isEmpty() { - return defaultHandler == null && methodMap.isEmpty(); - } + static class MethodEntry { - public String toString() { - if (handlerString == null) { - handlerString = "CamelMethodHandler[default=" + defaultHandler + ", " + methodMap + "]"; + /** + * The number of references pointing to {@link #handler} + */ + private int refCount; + private HttpHandler handler; + + MethodEntry() { } - return handlerString; + + public HttpHandler addRef(HttpHandler handler, String method) { + if (this.handler == null) { + this.handler = handler; + refCount++; + return handler; + } else if ("OPTIONS".equals(method) || CamelWebSocketHandler.class == this.handler.getClass() && CamelWebSocketHandler.class == handler.getClass()) { + refCount++; + return this.handler; + } else { + throw new IllegalArgumentException(String.format( + "Duplicate handler for %s method: '%s', '%s'", method, this.handler, handler)); + } + } + + public void removeRef() { + if (--refCount == 0) { + this.handler = null; + } + } + + @Override + public String toString() { + return handler == null ? "null" : handler.toString(); + } + } } diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java index 311b919c9a9d5..3261331425dfe 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelRootHandler.java @@ -35,7 +35,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception { pathHandler.handleRequest(exchange); } - public synchronized void add(String path, String[] methods, boolean prefixMatch, HttpHandler handler) { + public synchronized HttpHandler add(String path, String methods, boolean prefixMatch, HttpHandler handler) { String basePath = getBasePath(path); HttpHandler basePathHandler = pathHandler.getHandler(basePath); @@ -44,7 +44,7 @@ public synchronized void add(String path, String[] methods, boolean prefixMatch, // Adding a handler for the template path String relativePath = path.substring(basePath.length()); if (basePathHandler instanceof CamelPathTemplateHandler) { - CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler)basePathHandler; + CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; targetHandler = templateHandler.get(relativePath); if (targetHandler == null) { targetHandler = new CamelMethodHandler(); @@ -54,7 +54,7 @@ public synchronized void add(String path, String[] methods, boolean prefixMatch, CamelPathTemplateHandler templateHandler; if (basePathHandler instanceof CamelMethodHandler) { // A static path handler is already set for the base path. Use it as a default handler - templateHandler = new CamelPathTemplateHandler((CamelMethodHandler)basePathHandler); + templateHandler = new CamelPathTemplateHandler((CamelMethodHandler) basePathHandler); } else if (basePathHandler == null) { templateHandler = new CamelPathTemplateHandler(new CamelMethodHandler()); } else { @@ -68,7 +68,7 @@ public synchronized void add(String path, String[] methods, boolean prefixMatch, } else { // Adding a handler for the static path if (basePathHandler instanceof CamelPathTemplateHandler) { - CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler)basePathHandler; + CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler) basePathHandler; if (!prefixMatch) { targetHandler = templateHandler.getDefault(); } else { @@ -76,7 +76,7 @@ public synchronized void add(String path, String[] methods, boolean prefixMatch, } } else { if (basePathHandler instanceof CamelMethodHandler) { - targetHandler = (CamelMethodHandler)basePathHandler; + targetHandler = (CamelMethodHandler) basePathHandler; } else if (basePathHandler == null) { targetHandler = new CamelMethodHandler(); if (prefixMatch) { @@ -89,15 +89,10 @@ public synchronized void add(String path, String[] methods, boolean prefixMatch, } } } - - if (methods != null && methods.length != 0) { - targetHandler.add(methods, handler); - } else { - targetHandler.addDefault(handler); - } + return targetHandler.add(methods, handler); } - public synchronized void remove(String path, String[] methods, boolean prefixMatch) { + public synchronized void remove(String path, String methods, boolean prefixMatch) { String basePath = getBasePath(path); HttpHandler basePathHandler = pathHandler.getHandler(basePath); if (basePathHandler == null) { @@ -109,12 +104,7 @@ public synchronized void remove(String path, String[] methods, boolean prefixMat String relativePath = path.substring(basePath.length()); CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler)basePathHandler; CamelMethodHandler targetHandler = templateHandler.get(relativePath); - if (methods != null && methods.length != 0) { - targetHandler.remove(methods); - } else { - targetHandler.removeDefault(); - } - if (targetHandler.isEmpty()) { + if (targetHandler.remove(methods)) { templateHandler.remove(relativePath); if (templateHandler.isEmpty()) { pathHandler.removePrefixPath(basePath); @@ -127,12 +117,7 @@ public synchronized void remove(String path, String[] methods, boolean prefixMat String relativePath = path.substring(basePath.length()); CamelPathTemplateHandler templateHandler = (CamelPathTemplateHandler)basePathHandler; CamelMethodHandler targetHandler = templateHandler.getDefault(); - if (methods != null && methods.length != 0) { - targetHandler.remove(methods); - } else { - targetHandler.removeDefault(); - } - if (targetHandler.isEmpty()) { + if (targetHandler.remove(methods)) { templateHandler.remove(relativePath); if (templateHandler.isEmpty()) { pathHandler.removePrefixPath(basePath); @@ -140,12 +125,7 @@ public synchronized void remove(String path, String[] methods, boolean prefixMat } } else { CamelMethodHandler targetHandler = (CamelMethodHandler)basePathHandler; - if (methods != null && methods.length != 0) { - targetHandler.remove(methods); - } else { - targetHandler.removeDefault(); - } - if (targetHandler.isEmpty()) { + if (targetHandler.remove(methods)) { if (prefixMatch) { pathHandler.removePrefixPath(basePath); } else { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java new file mode 100644 index 0000000000000..c307665c12b05 --- /dev/null +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.handlers; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.websockets.WebSocketConnectionCallback; +import io.undertow.websockets.WebSocketProtocolHandshakeHandler; +import io.undertow.websockets.core.AbstractReceiveListener; +import io.undertow.websockets.core.BufferedBinaryMessage; +import io.undertow.websockets.core.BufferedTextMessage; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.WebSockets; +import io.undertow.websockets.spi.WebSocketHttpExchange; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.undertow.UndertowConstants; +import org.apache.camel.component.undertow.UndertowConstants.EventType; +import org.apache.camel.component.undertow.UndertowConsumer; +import org.apache.camel.component.undertow.UndertowProducer; +import org.apache.camel.converter.IOConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xnio.ChannelListener; +import org.xnio.Pooled; + +/** + * An {@link HttpHandler} that delegates to {@link WebSocketProtocolHandshakeHandler} and provides some wiring to + * connect {@link UndertowComsumer} with {@link UndertowProducer}. + */ +public class CamelWebSocketHandler implements HttpHandler { + private static final Logger LOG = LoggerFactory.getLogger(CamelWebSocketHandler.class); + + private final UndertowWebSocketConnectionCallback callback; + + private UndertowConsumer consumer; + + private final Object consumerLock = new Object(); + + private final WebSocketProtocolHandshakeHandler delegate; + + private final ChannelListener closeListener; + + private final UndertowReceiveListener receiveListener; + + public CamelWebSocketHandler() { + super(); + this.receiveListener = new UndertowReceiveListener(); + this.callback = new UndertowWebSocketConnectionCallback(); + this.closeListener = new ChannelListener() { + @Override + public void handleEvent(WebSocketChannel channel) { + sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY), + EventType.ONCLOSE); + } + }; + this.delegate = Handlers.websocket(callback); + } + + /** + * Send the given {@code message} to the given {@code channel} and report the outcome to the given {@code callback} + * within the given {@code timeoutMillis}. + * + * @param channel + * the channel to sent the {@code message} to + * @param message + * the message to send + * @param callback + * where to report the outcome + * @param timeoutMillis + * the timeout in milliseconds + * @throws IOException + */ + private static void send(WebSocketChannel channel, Object message, ExtendedWebSocketCallback callback, + long timeoutMillis) throws IOException { + if (channel.isOpen()) { + if (message instanceof String) { + WebSockets.sendText((String) message, channel, callback); + } else if (message instanceof byte[]) { + ByteBuffer buffer = ByteBuffer.wrap((byte[]) message); + WebSockets.sendBinary(buffer, channel, callback, timeoutMillis); + } else if (message instanceof Reader) { + Reader r = (Reader) message; + WebSockets.sendText(IOConverter.toString(r), channel, callback); + } else if (message instanceof InputStream) { + InputStream in = (InputStream) message; + ByteBuffer buffer = ByteBuffer.wrap(IOConverter.toBytes(in)); + WebSockets.sendBinary(buffer, channel, callback, timeoutMillis); + } else { + throw new RuntimeCamelException( + "Unexpected type of message " + message.getClass().getName() + "; expected String, byte[], " + + Reader.class.getName() + " or " + InputStream.class.getName()); + } + } else { + callback.closedBeforeSent(channel); + } + } + + /** {@inheritDoc} */ + @Override + public void handleRequest(HttpServerExchange exchange) throws Exception { + this.delegate.handleRequest(exchange); + } + + /** + * Send the given {@code message} to one or more channels selected using the given {@code peerFilter} within the + * given {@code timeout} and report the outcome to the given {@code camelExchange} and {@code camelCallback}. + * + * @param peerFilter + * a {@link Predicate} to apply to the set of peers obtained via {@link #delegate}'s + * {@link WebSocketProtocolHandshakeHandler#getPeerConnections()} + * @param message + * the message to send + * @param camelExchange to notify about the outcome + * @param camelCallback to notify about the outcome + * @param timeout + * in milliseconds + * @return {@code true} if the execution finished synchronously or {@code false} otherwise + * @throws IOException + */ + public boolean send(Predicate peerFilter, Object message, final int timeout, + final Exchange camelExchange, final AsyncCallback camelCallback) throws IOException { + List targetPeers = delegate.getPeerConnections().stream().filter(peerFilter).collect(Collectors.toList()); + if (targetPeers.isEmpty()) { + camelCallback.done(true); + return true; + } else { + /* There are some peers to send the message to */ + MultiCallback wsCallback = new MultiCallback(targetPeers, camelCallback, camelExchange); + for (WebSocketChannel peer : targetPeers) { + send(peer, message, wsCallback, timeout); + } + return false; + } + } + + /** + * @param consumer the {@link UndertowConsumer} to set + */ + public void setConsumer(UndertowConsumer consumer) { + synchronized (consumerLock) { + if (consumer != null && this.consumer != null) { + throw new IllegalStateException("Cannot call " + getClass().getName() + + ".setConsumer(UndertowConsumer) with a non-null consumer before unsetting it via setConsumer(null)"); + } + this.consumer = consumer; + } + } + + void sendEventNotificationIfNeeded(String connectionKey, EventType eventType) { + synchronized (consumerLock) { + synchronized (consumerLock) { + if (consumer != null) { + if (consumer.getEndpoint().isFireWebSocketChannelEvents()) { + consumer.sendEventNotification(connectionKey, eventType); + } + } else { + LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType); + } + } + } + } + + /** + * A {@link ExtendedWebSocketCallback} able to track sending one message to multiple peers. + */ + static class MultiCallback implements ExtendedWebSocketCallback { + private final AsyncCallback camelCallback; + private final Exchange camelExchange; + + private Map errors; + private final Object lock = new Object(); + /** + * Initially, this set contains all peers where we plan to send the message. Then the peers are removed one by + * one as we are notified via {@link #complete(WebSocketChannel, Void)} or + * {@link #onError(WebSocketChannel, Void, Throwable)}. This set being empty signals that all peers have + * finished sending the message. + */ + private final Set peers; + + public MultiCallback(Collection peers, AsyncCallback camelCallback, Exchange camelExchange) { + super(); + this.camelCallback = camelCallback; + this.camelExchange = camelExchange; + synchronized (lock) { + this.peers = new HashSet<>(peers); + } + } + + @Override + public void closedBeforeSent(WebSocketChannel channel) { + synchronized (lock) { + peers.remove(channel); + if (peers.isEmpty()) { + finish(); + } + } + } + + @Override + public void complete(WebSocketChannel channel, Void context) { + synchronized (lock) { + peers.remove(channel); + if (peers.isEmpty()) { + finish(); + } + } + } + + /** + * {@link #finish()} should be called only inside a synchronized(lock) { ... } block to prevent + * concurrent access to {@link #errors}. + */ + private void finish() { + if (errors != null && !errors.isEmpty()) { + if (errors.size() == 1) { + final Entry en = errors.entrySet().iterator().next(); + final String msg = "Delivery to the WebSocket peer " + en.getKey() + " channels has failed"; + camelExchange.setException(new CamelExchangeException(msg, camelExchange, en.getValue())); + } else { + final StringBuilder msg = new StringBuilder( + "Delivery to the following WebSocket peer channels has failed: "); + for (Entry en : errors.entrySet()) { + msg.append("\n ").append(en.getKey()).append(en.getValue().getMessage()); + } + camelExchange.setException(new CamelExchangeException(msg.toString(), camelExchange)); + } + } + camelCallback.done(false); + } + + @Override + public void onError(WebSocketChannel channel, Void context, Throwable throwable) { + synchronized (lock) { + peers.remove(channel); + final String connectionKey = (String) channel.getAttribute(UndertowConstants.CONNECTION_KEY); + if (connectionKey == null) { + throw new RuntimeCamelException(UndertowConstants.CONNECTION_KEY + " attribute not found on " + + WebSocketChannel.class.getSimpleName() + " " + channel); + } + if (errors == null) { + errors = new HashMap<>(); + } + errors.put(connectionKey, throwable); + if (peers.isEmpty()) { + finish(); + } + } + } + + } + + /** + * A {@link ChannelListener} that forwards the messages received over the WebSocket to + * {@link CamelWebSocketHandler#consumer}. + */ + class UndertowReceiveListener extends AbstractReceiveListener { + + protected void onFullBinaryMessage(final WebSocketChannel channel, BufferedBinaryMessage message) + throws IOException { + LOG.debug("onFullBinaryMessage()"); + final String connectionKey = (String) channel.getAttribute(UndertowConstants.CONNECTION_KEY); + if (connectionKey == null) { + throw new RuntimeCamelException(UndertowConstants.CONNECTION_KEY + " attribute not found on " + + WebSocketChannel.class.getSimpleName() + " " + channel); + } + final Pooled data = message.getData(); + try { + final ByteBuffer[] buffers = data.getResource(); + int len = 0; + for (ByteBuffer buffer : buffers) { + len += buffer.remaining(); + } + byte[] bytes = new byte[len]; + int offset = 0; + for (ByteBuffer buffer : buffers) { + int increment = buffer.remaining(); + buffer.get(bytes, offset, increment); + offset += increment; + } + synchronized (consumerLock) { + if (consumer != null) { + final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes; + consumer.sendMessage(connectionKey, outMsg); + } else { + LOG.debug("No consumer to handle message received: {}", message); + } + } + } finally { + data.free(); + } + } + + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) { + final String text = message.getData(); + LOG.debug("onFullTextMessage(): {}", text); + final String connectionKey = (String) channel.getAttribute(UndertowConstants.CONNECTION_KEY); + if (connectionKey == null) { + throw new RuntimeCamelException(UndertowConstants.CONNECTION_KEY + " attribute not found on " + + WebSocketChannel.class.getSimpleName() + " " + channel); + } + synchronized (consumerLock) { + if (consumer != null) { + final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text; + consumer.sendMessage(connectionKey, outMsg); + } else { + LOG.debug("No consumer to handle message received: {}", message); + } + } + } + + } + + /** + * Sets the {@link UndertowReceiveListener} to the given channel on connect. + */ + class UndertowWebSocketConnectionCallback implements WebSocketConnectionCallback { + + public UndertowWebSocketConnectionCallback() { + super(); + } + + @Override + public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { + LOG.trace("onConnect {}", exchange); + final String connectionKey = UUID.randomUUID().toString(); + channel.setAttribute(UndertowConstants.CONNECTION_KEY, connectionKey); + channel.getReceiveSetter().set(receiveListener); + channel.addCloseTask(closeListener); + sendEventNotificationIfNeeded(connectionKey, EventType.ONOPEN); + channel.resumeReceives(); + } + + } + +} diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/ExtendedWebSocketCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/ExtendedWebSocketCallback.java new file mode 100644 index 0000000000000..6502ffcd79efe --- /dev/null +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/ExtendedWebSocketCallback.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.handlers; + +import io.undertow.websockets.core.WebSocketCallback; +import io.undertow.websockets.core.WebSocketChannel; + +/** + * An extension of {@link WebSocketCallback} that adds a notification for the situation when a {@link WebSocketChannel} + * is closed before any message could be sent to it. + */ +public interface ExtendedWebSocketCallback extends WebSocketCallback { + + /** + * @param channel the channel that was closed before any messages could be sent to it + */ + void closedBeforeSent(WebSocketChannel channel); + +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowConsumerUnregisterTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowConsumerUnregisterTest.java index ad064517b2562..5a696aaab2b08 100644 --- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowConsumerUnregisterTest.java +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowConsumerUnregisterTest.java @@ -34,8 +34,8 @@ public void testUnregisterUndertowConsumersForPort() throws Exception { UndertowConsumer consumerFoo = (UndertowConsumer) context.getRoute("route-foo").getConsumer(); UndertowConsumer consumerBar = (UndertowConsumer) context.getRoute("route-bar").getConsumer(); - component.unregisterConsumer(consumerFoo); - component.unregisterConsumer(consumerBar); + component.unregisterEndpoint(consumerFoo.getEndpoint().getHttpHandlerRegistrationInfo(), consumerFoo.getEndpoint().getSslContext()); + component.unregisterEndpoint(consumerBar.getEndpoint().getHttpHandlerRegistrationInfo(), consumerBar.getEndpoint().getSslContext()); try { template.requestBody("undertow:http://localhost:{{port}}/foo", null, String.class); @@ -67,8 +67,8 @@ public void process(Exchange exchange) { UndertowComponent component = context.getComponent("undertow", UndertowComponent.class); UndertowConsumer consumerFoo = (UndertowConsumer) context.getRoute("route-foo").getConsumer(); - component.unregisterConsumer(consumerFoo); - + component.unregisterEndpoint(consumerFoo.getEndpoint().getHttpHandlerRegistrationInfo(), consumerFoo.getEndpoint().getSslContext()); + ret = template.request("undertow:http://localhost:{{port}}/foo", sender); Assert.assertEquals(404, ret.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); Assert.assertEquals("No matching path found", ret.getOut().getBody(String.class)); diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/handlers/CamelRootHandlerTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/handlers/CamelRootHandlerTest.java new file mode 100644 index 0000000000000..d02e9b055f391 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/handlers/CamelRootHandlerTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.handlers; + +import io.undertow.server.HttpHandler; +import io.undertow.server.handlers.RedirectHandler; + +import org.junit.Assert; +import org.junit.Test; + +public class CamelRootHandlerTest { + + private static final HttpHandler DEFAULT_HANDLER = new NotFoundHandler(); + + @Test + public void httpAndWsUnssupportedForTheSamePath() { + + final CamelRootHandler root = new CamelRootHandler(DEFAULT_HANDLER); + + final RedirectHandler httpHandler = new RedirectHandler("http://whereever"); + + Assert.assertTrue(root.isEmpty()); + root.add("/app1", null, false, httpHandler); + Assert.assertFalse(root.isEmpty()); + + try { + root.add("/app1", null, false, new CamelWebSocketHandler()); + Assert.fail(IllegalArgumentException.class.getName() + " expected"); + } catch (IllegalArgumentException expected) { + } + + root.remove("/app1", null, false); + + Assert.assertTrue(root.isEmpty()); + + /* now the other way round: register wsHandler and try to register httpHandler for the same path */ + root.add("/app2", null, false, new CamelWebSocketHandler()); + try { + root.add("/app2", null, false, httpHandler); + Assert.fail(IllegalArgumentException.class.getName() + " expected"); + } catch (IllegalArgumentException expected) { + } + + } + + @Test + public void countWsHandlerInstances() { + + final CamelRootHandler root = new CamelRootHandler(DEFAULT_HANDLER); + Assert.assertTrue(root.isEmpty()); + + root.add("/app1", null, false, new CamelWebSocketHandler()); + Assert.assertFalse(root.isEmpty()); + + /* registering twice must work */ + root.add("/app1", null, false, new CamelWebSocketHandler()); + Assert.assertFalse(root.isEmpty()); + + /* we have to remove twice for the root to become empty */ + root.remove("/app1", null, false); + Assert.assertFalse(root.isEmpty()); + root.remove("/app1", null, false); + Assert.assertTrue(root.isEmpty()); + + } + +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/TestClient.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/TestClient.java new file mode 100644 index 0000000000000..b2d5a89a7b535 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/TestClient.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketByteListener; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestClient { + private static final Logger LOG = LoggerFactory.getLogger(TestClient.class); + + private List received; + private CountDownLatch latch; + private AsyncHttpClient client; + private WebSocket websocket; + private String url; + + public TestClient(String url, AsyncHttpClientConfig conf) { + this(url, conf, 1); + } + + public TestClient(String url, int count) { + this(url, null, count); + } + + public TestClient(String url) { + this(url, null, 1); + } + + public TestClient(String url, AsyncHttpClientConfig conf, int count) { + this.received = new ArrayList(); + this.latch = new CountDownLatch(count); + this.client = conf == null ? new DefaultAsyncHttpClient() : new DefaultAsyncHttpClient(conf); + this.url = url; + } + + public void connect() throws InterruptedException, ExecutionException, IOException { + websocket = client.prepareGet(url).execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new TestWebSocketListener()).build()).get(); + } + + public void sendTextMessage(String message) { + websocket.sendMessage(message); + } + + public void sendBytesMessage(byte[] message) { + websocket.sendMessage(message); + } + + public boolean await(int secs) throws InterruptedException { + return latch.await(secs, TimeUnit.SECONDS); + } + + public void reset(int count) { + latch = new CountDownLatch(count); + received.clear(); + } + + public List getReceived() { + return received; + } + + public List getReceived(Class cls) { + List list = new ArrayList(); + for (Object o : received) { + list.add(getValue(o, cls)); + } + return list; + } + + @SuppressWarnings("unchecked") + private static T getValue(Object o, Class cls) { + if (cls.isInstance(o)) { + return (T)o; + } else if (cls == String.class) { + if (o instanceof byte[]) { + return (T)new String((byte[])o); + } else { + return (T)o.toString(); + } + } else if (cls == byte[].class) { + if (o instanceof String) { + return (T)((String)o).getBytes(); + } + } + return null; + } + + public void close() throws IOException { + websocket.close(); + client.close(); + } + + private class TestWebSocketListener implements WebSocketTextListener, WebSocketByteListener { + + @Override + public void onOpen(WebSocket websocket) { + LOG.info("[ws] opened"); + } + + @Override + public void onClose(WebSocket websocket) { + LOG.info("[ws] closed"); + } + + @Override + public void onError(Throwable t) { + LOG.error("[ws] error", t); + } + + @Override + public void onMessage(byte[] message) { + received.add(message); + LOG.info("[ws] received bytes --> " + Arrays.toString(message)); + latch.countDown(); + } + + + @Override + public void onMessage(String message) { + received.add(message); + LOG.info("[ws] received --> " + message); + latch.countDown(); + } + + + + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java new file mode 100644 index 0000000000000..b1df0a47d01f1 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java @@ -0,0 +1,454 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.io.InputStream; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.apache.camel.component.undertow.UndertowConstants; +import org.apache.camel.component.undertow.UndertowConstants.EventType; +import org.apache.camel.converter.IOConverter; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.DefaultWebSocketListener; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Assert; +import org.junit.Test; + +public class UndertowWsConsumerRouteTest extends BaseUndertowTest { + + private static final String CONNECTED_PREFIX = "connected "; + private static final String BROADCAST_MESSAGE_PREFIX = "broadcast "; + + @Test + public void wsClientSingleText() throws Exception { + AsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/app1") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onMessage(String message) { + System.out.println("got message " + message); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result1"); + result.expectedBodiesReceived("Test"); + + websocket.sendMessage("Test"); + + result.await(60, TimeUnit.SECONDS); + result.assertIsSatisfied(); + + websocket.close(); + c.close(); + } + + @Test + public void wsClientSingleTextStreaming() throws Exception { + AsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/app2") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onMessage(String message) { + System.out.println("got message " + message); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result2"); + result.expectedMessageCount(1); + + websocket.sendMessage("Test"); + + result.await(60, TimeUnit.SECONDS); + List exchanges = result.getReceivedExchanges(); + Assert.assertEquals(1, exchanges.size()); + Object body = result.getReceivedExchanges().get(0).getIn().getBody(); + Assert.assertTrue("body is " + body.getClass().getName(), body instanceof Reader); + Reader r = (Reader) body; + Assert.assertEquals("Test", IOConverter.toString(r)); + + websocket.close(); + c.close(); + } + + @Test + public void wsClientSingleBytes() throws Exception { + AsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/app1") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onMessage(byte[] message) { + System.out.println("got byte[] message"); + } + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result1"); + final byte[] testmessage = "Test".getBytes("utf-8"); + result.expectedBodiesReceived(testmessage); + + websocket.sendMessage(testmessage); + + result.assertIsSatisfied(); + + websocket.close(); + c.close(); + } + + @Test + public void wsClientSingleBytesStreaming() throws Exception { + AsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/app2") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onMessage(byte[] message) { + System.out.println("got message " + message); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result2"); + result.expectedMessageCount(1); + + final byte[] testmessage = "Test".getBytes("utf-8"); + websocket.sendMessage(testmessage); + + result.await(60, TimeUnit.SECONDS); + List exchanges = result.getReceivedExchanges(); + Assert.assertEquals(1, exchanges.size()); + Object body = result.getReceivedExchanges().get(0).getIn().getBody(); + Assert.assertTrue("body is " + body.getClass().getName(), body instanceof InputStream); + InputStream in = (InputStream) body; + Assert.assertArrayEquals(testmessage, IOConverter.toBytes(in)); + + websocket.close(); + c.close(); + } + + @Test + public void wsClientMultipleText() throws Exception { + AsyncHttpClient c1 = new DefaultAsyncHttpClient(); + + WebSocket websocket1 = c1.prepareGet("ws://localhost:" + getPort() + "/app1") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onMessage(String message) { + System.out.println("got message " + message); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + }).build()).get(); + AsyncHttpClient c2 = new DefaultAsyncHttpClient(); + + WebSocket websocket2 = c2.prepareGet("ws://localhost:" + getPort() + "/app1") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new DefaultWebSocketListener() { + + @Override + public void onMessage(String message) { + System.out.println("got message " + message); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + }).build()).get(); + + MockEndpoint result = getMockEndpoint("mock:result1"); + result.expectedMessageCount(2); + + websocket1.sendMessage("Test1"); + websocket2.sendMessage("Test2"); + + result.await(60, TimeUnit.SECONDS); + result.assertIsSatisfied(); + List exchanges = result.getReceivedExchanges(); + Set actual = new HashSet<>(); + actual.add(exchanges.get(0).getIn().getBody(String.class)); + actual.add(exchanges.get(1).getIn().getBody(String.class)); + Assert.assertEquals(new HashSet(Arrays.asList("Test1", "Test2")), actual); + + websocket1.close(); + websocket2.close(); + c1.close(); + c2.close(); + } + + @Test + public void echo() throws Exception { + TestClient wsclient1 = new TestClient("ws://localhost:" + getPort() + "/app3", 2); + wsclient1.connect(); + + wsclient1.sendTextMessage("Test1"); + wsclient1.sendTextMessage("Test2"); + + Assert.assertTrue(wsclient1.await(10)); + + Assert.assertEquals(Arrays.asList("Test1", "Test2"), wsclient1.getReceived(String.class)); + + wsclient1.close(); + } + + @Test + public void echoMulti() throws Exception { + TestClient wsclient1 = new TestClient("ws://localhost:" + getPort() + "/app3", 1); + TestClient wsclient2 = new TestClient("ws://localhost:" + getPort() + "/app3", 1); + wsclient1.connect(); + wsclient2.connect(); + + wsclient1.sendTextMessage("Gambas"); + wsclient2.sendTextMessage("Calamares"); + + Assert.assertTrue(wsclient1.await(10)); + Assert.assertTrue(wsclient2.await(10)); + + Assert.assertEquals(Arrays.asList("Gambas"), wsclient1.getReceived(String.class)); + Assert.assertEquals(Arrays.asList("Calamares"), wsclient2.getReceived(String.class)); + + wsclient1.close(); + wsclient2.close(); + } + + @Test + public void sendToAll() throws Exception { + TestClient wsclient1 = new TestClient("ws://localhost:" + getPort() + "/app4", 2); + TestClient wsclient2 = new TestClient("ws://localhost:" + getPort() + "/app4", 2); + wsclient1.connect(); + wsclient2.connect(); + + wsclient1.sendTextMessage("Gambas"); + wsclient2.sendTextMessage("Calamares"); + + Assert.assertTrue(wsclient1.await(10)); + Assert.assertTrue(wsclient2.await(10)); + + List received1 = wsclient1.getReceived(String.class); + Assert.assertEquals(2, received1.size()); + + Assert.assertTrue(received1.contains("Gambas")); + Assert.assertTrue(received1.contains("Calamares")); + + List received2 = wsclient2.getReceived(String.class); + Assert.assertEquals(2, received2.size()); + Assert.assertTrue(received2.contains("Gambas")); + Assert.assertTrue(received2.contains("Calamares")); + + wsclient1.close(); + wsclient2.close(); + } + + @Test + public void fireWebSocketChannelEvents() throws Exception { + + MockEndpoint result = getMockEndpoint("mock:result5"); + result.expectedMessageCount(6); + + TestClient wsclient1 = new TestClient("ws://localhost:" + getPort() + "/app5", 2); + TestClient wsclient2 = new TestClient("ws://localhost:" + getPort() + "/app5", 2); + wsclient1.connect(); + wsclient2.connect(); + + wsclient1.sendTextMessage("Gambas"); + wsclient2.sendTextMessage("Calamares"); + + wsclient1.close(); + wsclient2.close(); + + result.await(60, TimeUnit.SECONDS); + + final List exchanges = result.getReceivedExchanges(); + final Map> connections = new HashMap<>(); + for (Exchange exchange : exchanges) { + final Message in = exchange.getIn(); + final String key = (String) in.getHeader(UndertowConstants.CONNECTION_KEY); + Assert.assertNotNull(key); + List messages = connections.get(key); + if (messages == null) { + messages = new ArrayList(); + connections.put(key, messages); + } + String body = in.getBody(String.class); + if (body != null) { + messages.add(body); + } else { + messages.add(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class).name()); + } + } + + final List expected1 = Arrays.asList(EventType.ONOPEN.name(), "Gambas", EventType.ONCLOSE.name()); + final List expected2 = Arrays.asList(EventType.ONOPEN.name(), "Calamares", EventType.ONCLOSE.name()); + + Assert.assertEquals(2, connections.size()); + final Iterator> it = connections.values().iterator(); + final List actual1 = it.next(); + Assert.assertTrue("actual " + actual1, actual1.equals(expected1) || actual1.equals(expected2)); + final List actual2 = it.next(); + Assert.assertTrue("actual " + actual2, actual2.equals(expected1) || actual2.equals(expected2)); + + } + + @Test + public void connectionKeyList() throws Exception { + + TestClient wsclient1 = new TestClient("ws://localhost:" + getPort() + "/app6", 1); + TestClient wsclient2 = new TestClient("ws://localhost:" + getPort() + "/app6", 1); + TestClient wsclient3 = new TestClient("ws://localhost:" + getPort() + "/app6", 1); + wsclient1.connect(); + wsclient2.connect(); + wsclient3.connect(); + + wsclient1.await(10); + final String connectionKey1 = assertConnected(wsclient1); + Assert.assertNotNull(connectionKey1); + wsclient2.await(10); + final String connectionKey2 = assertConnected(wsclient2); + wsclient3.await(10); + final String connectionKey3 = assertConnected(wsclient3); + + wsclient1.reset(1); + wsclient2.reset(1); + wsclient3.reset(1); + final String broadcastMsg = BROADCAST_MESSAGE_PREFIX + connectionKey2 + " " + connectionKey3; + wsclient1.sendTextMessage(broadcastMsg); // this one should go to wsclient2 and wsclient3 + wsclient1.sendTextMessage("private"); // this one should go to wsclient1 only + + wsclient2.await(10); + Assert.assertEquals(broadcastMsg, wsclient2.getReceived(String.class).get(0)); + wsclient3.await(10); + Assert.assertEquals(broadcastMsg, wsclient3.getReceived(String.class).get(0)); + wsclient1.await(10); + Assert.assertEquals("private", wsclient1.getReceived(String.class).get(0)); + + wsclient1.close(); + wsclient2.close(); + wsclient3.close(); + + } + + private String assertConnected(TestClient wsclient1) { + final String msg0 = wsclient1.getReceived(String.class).get(0); + Assert.assertTrue("'" + msg0 + "' should start with '" + CONNECTED_PREFIX + "'", + msg0.startsWith(CONNECTED_PREFIX)); + return msg0.substring(CONNECTED_PREFIX.length()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + final int port = getPort(); + from("undertow:ws://localhost:" + port + "/app1") + .log(">>> Message received from WebSocket Client : ${body}").to("mock:result1"); + + from("undertow:ws://localhost:" + port + "/app2?useStreaming=true").to("mock:result2"); + + /* echo */ + from("undertow:ws://localhost:" + port + "/app3").to("undertow:ws://localhost:" + port + "/app3"); + + /* sendToAll */ + from("undertow:ws://localhost:" + port + "/app4") // + .to("undertow:ws://localhost:" + port + "/app4?sendToAll=true"); + + /* fireWebSocketChannelEvents */ + from("undertow:ws://localhost:" + port + "/app5?fireWebSocketChannelEvents=true") // + .to("mock:result5") // + .to("undertow:ws://localhost:" + port + "/app5"); + + /* fireWebSocketChannelEvents */ + from("undertow:ws://localhost:" + port + "/app6?fireWebSocketChannelEvents=true") // + .process(new Processor() { + private final Set connectionKeys = new LinkedHashSet<>(); + + public void process(final Exchange exchange) throws Exception { + final Message in = exchange.getIn(); + final String connectionKey = in.getHeader(UndertowConstants.CONNECTION_KEY, + String.class); + final EventType eventType = in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, + EventType.class); + final String body = in.getBody(String.class); + if (eventType == EventType.ONOPEN) { + connectionKeys.add(connectionKey); + in.setBody(CONNECTED_PREFIX + connectionKey); + } else if (eventType == EventType.ONCLOSE) { + connectionKeys.remove(connectionKey); + } else if (body != null) { + if (body.startsWith(BROADCAST_MESSAGE_PREFIX)) { + List keys = Arrays + .asList(body.substring(BROADCAST_MESSAGE_PREFIX.length()).split(" ")); + in.setHeader(UndertowConstants.CONNECTION_KEY_LIST, keys); + } + } + } + })// + .to("undertow:ws://localhost:" + port + "/app6"); + } + }; + } + +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteRestartTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteRestartTest.java new file mode 100644 index 0000000000000..3f45f827fc0b2 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteRestartTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.apache.camel.component.undertow.UndertowConstants; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Test; + +public class UndertowWsProducerRouteRestartTest extends BaseUndertowTest { + + private static final String ROUTE_ID = UndertowWsProducerRouteRestartTest.class.getSimpleName(); + + @Produce(uri = "direct:shop") + private ProducerTemplate producer; + + @Test + public void testWSSuspendResumeRoute() throws Exception { + context.suspendRoute(ROUTE_ID); + context.resumeRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + @Test + public void testWSStopStartRoute() throws Exception { + context.stopRoute(ROUTE_ID); + context.startRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + @Test + public void testWSRemoveAddRoute() throws Exception { + context.removeRoute(ROUTE_ID); + context.addRoutes(createRouteBuilder()); + context.startRoute(ROUTE_ID); + doTestWSHttpCall(); + } + + private void doTestWSHttpCall() throws Exception { + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(1); + + AsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/shop") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + // Send message to the direct endpoint + producer.sendBodyAndHeader("Beer on stock at Apache Mall", UndertowConstants.SEND_TO_ALL, "true"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + Object r = received.get(0); + assertTrue(r instanceof String); + assertEquals("Beer on stock at Apache Mall", r); + + websocket.close(); + c.close(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:shop") // + .id(ROUTE_ID) // + .log(">>> Message received from Shopping center : ${body}") // + .to("undertow:ws://localhost:" + getPort() + "/shop"); + } + }; + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteTest.java new file mode 100644 index 0000000000000..f089813f6ba9d --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsProducerRouteTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.apache.camel.component.undertow.UndertowConstants; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Test; + +public class UndertowWsProducerRouteTest extends BaseUndertowTest { + + @Produce(uri = "direct:shop") + private ProducerTemplate producer; + + @Test + public void testWSHttpCall() throws Exception { + + final CountDownLatch latch = new CountDownLatch(1); + AsyncHttpClient c = new DefaultAsyncHttpClient(); + final List received = Collections.synchronizedList(new ArrayList()); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/shop") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener() { + + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + // Send message to the direct endpoint + producer.sendBodyAndHeader("Beer on stock at Apache Mall", UndertowConstants.SEND_TO_ALL, "true"); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + Object r = received.get(0); + assertTrue(r instanceof String); + assertEquals("Beer on stock at Apache Mall", r); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:shop").log(">>> Message received from Shopping center : ${body}") + .to("undertow:ws://localhost:" + getPort() + "/shop"); + } + }; + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesTest.java new file mode 100644 index 0000000000000..cf091c89dd3d3 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Test; + +public class UndertowWsTwoRoutesTest extends BaseUndertowTest { + + @Test + public void testWSHttpCallEcho() throws Exception { + + // We call the route WebSocket BAR + { + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpClient c = new DefaultAsyncHttpClient(); + final WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/bar").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + assertEquals("The bar has Beer", received.get(0)); + + websocket.close(); + c.close(); + } + + + // We call the route WebSocket PUB + { + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpClient c = new DefaultAsyncHttpClient(); + final WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/pub").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("wine"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(1, received.size()); + assertEquals("The pub has wine", received.get(0)); + + websocket.close(); + c.close(); + } + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + int port = getPort(); + from("undertow:ws://localhost:" + port + "/bar") + .log(">>> Message received from BAR WebSocket Client : ${body}") + .transform().simple("The bar has ${body}") + .to("undertow:ws://localhost:" + port + "/bar"); + + from("undertow:ws://localhost:" + port + "/pub") + .log(">>> Message received from PUB WebSocket Client : ${body}") + .transform().simple("The pub has ${body}") + .to("undertow:ws://localhost:" + port + "/pub"); + } + }; + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointSendToAllHeaderTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointSendToAllHeaderTest.java new file mode 100644 index 0000000000000..3b67a8a878e9e --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointSendToAllHeaderTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.apache.camel.component.undertow.UndertowConstants; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Test; + +public class UndertowWsTwoRoutesToSameEndpointSendToAllHeaderTest extends BaseUndertowTest { + + + @Test + public void testWSHttpCallEcho() throws Exception { + + // We call the route WebSocket BAR + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(2); + + DefaultAsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/bar").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(2, received.size()); + + //Cannot guarantee the order in which messages are received + assertTrue(received.contains("The bar has Beer")); + assertTrue(received.contains("Broadcasting to Bar")); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + final int port = getPort(); + from("undertow:ws://localhost:" + port + "/bar") + .log(">>> Message received from BAR WebSocket Client : ${body}") + .transform().simple("The bar has ${body}") + .to("undertow:ws://localhost:" + port + "/bar"); + + from("timer://foo?fixedRate=true&period=12000") + //Use a period which is longer then the latch await time + .setBody(constant("Broadcasting to Bar")) + .log(">>> Broadcasting message to Bar WebSocket Client") + .setHeader(UndertowConstants.SEND_TO_ALL, constant(true)) + .to("undertow:ws://localhost:" + port + "/bar"); + } + }; + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointTest.java new file mode 100644 index 0000000000000..f6e1ed0a226a0 --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsTwoRoutesToSameEndpointTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.Test; + +public class UndertowWsTwoRoutesToSameEndpointTest extends BaseUndertowTest { + + + @Test + public void testWSHttpCallEcho() throws Exception { + + // We call the route WebSocket BAR + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(2); + + DefaultAsyncHttpClient c = new DefaultAsyncHttpClient(); + + WebSocket websocket = c.prepareGet("ws://localhost:" + getPort() + "/bar").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + websocket.sendMessage("Beer"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertEquals(2, received.size()); + + //Cannot guarantee the order in which messages are received + assertTrue(received.contains("The bar has Beer")); + assertTrue(received.contains("Broadcasting to Bar")); + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + final int port = getPort(); + from("undertow:ws://localhost:" + port + "/bar") + .log(">>> Message received from BAR WebSocket Client : ${body}") + .transform().simple("The bar has ${body}") + .to("undertow:ws://localhost:" + port + "/bar"); + + from("timer://foo?fixedRate=true&period=12000") + //Use a period which is longer then the latch await time + .setBody(constant("Broadcasting to Bar")) + .log(">>> Broadcasting message to Bar WebSocket Client") + .to("undertow:ws://localhost:" + port + "/bar?sendToAll=true"); + } + }; + } +} diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWssRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWssRouteTest.java new file mode 100644 index 0000000000000..bd4152d6b151f --- /dev/null +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWssRouteTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow.ws; + +import java.io.IOException; +import java.net.URL; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.SSLContextParametersAware; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.undertow.BaseUndertowTest; +import org.apache.camel.component.undertow.UndertowHttpsSpringTest; +import org.apache.camel.util.jsse.KeyManagersParameters; +import org.apache.camel.util.jsse.KeyStoreParameters; +import org.apache.camel.util.jsse.SSLContextParameters; +import org.apache.camel.util.jsse.SSLContextServerParameters; +import org.apache.camel.util.jsse.TrustManagersParameters; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class UndertowWssRouteTest extends BaseUndertowTest { + + @BeforeClass + public static void setUpJaas() throws Exception { + URL trustStoreUrl = UndertowHttpsSpringTest.class.getClassLoader().getResource("ssl/keystore.jks"); + System.setProperty("javax.net.ssl.trustStore", trustStoreUrl.toURI().getPath()); + } + + @AfterClass + public static void tearDownJaas() throws Exception { + System.clearProperty("java.security.auth.login.config"); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + KeyStoreParameters ksp = new KeyStoreParameters(); + ksp.setResource("ssl/keystore.jks"); + ksp.setPassword("password"); + + KeyManagersParameters kmp = new KeyManagersParameters(); + kmp.setKeyPassword("password"); + kmp.setKeyStore(ksp); + + TrustManagersParameters tmp = new TrustManagersParameters(); + tmp.setKeyStore(ksp); + + // NOTE: Needed since the client uses a loose trust configuration when no ssl context + // is provided. We turn on WANT client-auth to prefer using authentication + SSLContextServerParameters scsp = new SSLContextServerParameters(); + + SSLContextParameters sslContextParameters = new SSLContextParameters(); + sslContextParameters.setKeyManagers(kmp); + sslContextParameters.setTrustManagers(tmp); + sslContextParameters.setServerParameters(scsp); + context.setSSLContextParameters(sslContextParameters); + + ((SSLContextParametersAware) context.getComponent("undertow")).setUseGlobalSslContextParameters(true); + return context; + } + + protected AsyncHttpClient createAsyncHttpSSLClient() throws IOException, GeneralSecurityException { + + AsyncHttpClient c; + AsyncHttpClientConfig config; + + DefaultAsyncHttpClientConfig.Builder builder = + new DefaultAsyncHttpClientConfig.Builder(); + + SslContext sslContext = SslContextBuilder + .forClient() + .sslProvider(SslProvider.JDK) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + builder.setSslContext(sslContext); + builder.setAcceptAnyCertificate(true); + config = builder.build(); + c = new DefaultAsyncHttpClient(config); + + return c; + } + + @Test + public void testWSHttpCall() throws Exception { + final List received = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(10); + + AsyncHttpClient c = createAsyncHttpSSLClient(); + WebSocket websocket = c.prepareGet("wss://localhost:" + getPort() + "/test").execute( + new WebSocketUpgradeHandler.Builder() + .addWebSocketListener(new WebSocketTextListener() { + @Override + public void onMessage(String message) { + received.add(message); + log.info("received --> " + message); + latch.countDown(); + } + + @Override + public void onOpen(WebSocket websocket) { + } + + @Override + public void onClose(WebSocket websocket) { + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + }).build()).get(); + + getMockEndpoint("mock:client").expectedBodiesReceived("Hello from WS client"); + + websocket.sendMessage("Hello from WS client"); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + assertMockEndpointsSatisfied(); + + assertEquals(10, received.size()); + for (int i = 0; i < 10; i++) { + assertEquals(">> Welcome on board!", received.get(i)); + } + + websocket.close(); + c.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("undertow:ws://localhost:" + getPort() + "/test") + .log(">>> Message received from WebSocket Client : ${body}") + .to("mock:client") + .loop(10) + .setBody().constant(">> Welcome on board!") + .to("undertow:ws://localhost:" + getPort() + "/test"); + } + }; + } +}