From 73de2b1258b74ef911fba7e9adce59e0d689e5dd Mon Sep 17 00:00:00 2001 From: Jonas Waage Date: Fri, 18 May 2018 02:00:10 +0200 Subject: [PATCH] CAMEL-12521: Add websocket remote address to headers --- .../src/main/docs/websocket-component.adoc | 2 ++ .../component/websocket/DefaultWebsocket.java | 10 ++++++++-- .../component/websocket/WebsocketConstants.java | 1 + .../component/websocket/WebsocketConsumer.java | 14 +++++++++++--- .../component/websocket/DefaultWebsocketTest.java | 11 +++++++++-- .../websocket/WebsocketComponentServletTest.java | 12 ++++++++++-- .../component/websocket/WebsocketConsumerTest.java | 9 ++++++--- 7 files changed, 47 insertions(+), 12 deletions(-) diff --git a/components/camel-websocket/src/main/docs/websocket-component.adoc b/components/camel-websocket/src/main/docs/websocket-component.adoc index 82146fb8bfe72..63141f7e1ea0e 100644 --- a/components/camel-websocket/src/main/docs/websocket-component.adoc +++ b/components/camel-websocket/src/main/docs/websocket-component.adoc @@ -130,6 +130,8 @@ messages back to a single/current client, or to all clients. use the `sendToAll` option on the endpoint instead of using this header. |`WebsocketConstants.CONNECTION_KEY` |Sends the message to the client with the given connection key. + +|`WebsocketConstants.REMOTE_ADDRESS` |Remote address of the websocket session. |======================================================================= ### Usage diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java index 05c596d6a7d60..2c176f6aedfdb 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/DefaultWebsocket.java @@ -17,6 +17,7 @@ package org.apache.camel.component.websocket; import java.io.Serializable; +import java.net.InetSocketAddress; import java.util.UUID; import org.eclipse.jetty.websocket.api.Session; @@ -62,7 +63,7 @@ public void onConnect(Session session) { public void onMessage(String message) { LOG.debug("onMessage: {}", message); if (this.consumer != null) { - this.consumer.sendMessage(this.connectionKey, message); + this.consumer.sendMessage(this.connectionKey, message, getRemoteAddress()); } else { LOG.debug("No consumer to handle message received: {}", message); } @@ -75,12 +76,17 @@ public void onMessage(byte[] data, int offset, int length) { if (this.consumer != null) { byte[] message = new byte[length]; System.arraycopy(data, offset, message, 0, length); - this.consumer.sendMessage(this.connectionKey, message); + this.consumer.sendMessage(this.connectionKey, message, getRemoteAddress()); } else { LOG.debug("No consumer to handle message received: byte[]"); } } + private InetSocketAddress getRemoteAddress() { + Session current = session; + return current != null ? current.getRemoteAddress() : null; + } + public Session getSession() { return session; } diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java index bbed2224219b7..1cb3296ad2e07 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConstants.java @@ -23,6 +23,7 @@ public final class WebsocketConstants { public static final String CONNECTION_KEY = "websocket.connectionKey"; public static final String SEND_TO_ALL = "websocket.sendToAll"; + public static final String REMOTE_ADDRESS = "websocket.remoteAddress"; public static final String WS_PROTOCOL = "ws"; public static final String WSS_PROTOCOL = "wss"; diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java index 68953d4eae321..83446065a0c87 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.websocket; +import java.net.InetSocketAddress; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -50,15 +51,22 @@ public String getPath() { return endpoint.getPath(); } - public void sendMessage(final String connectionKey, final String message) { - sendMessage(connectionKey, (Object)message); + public void sendMessage( + final String connectionKey, + final String message, + final InetSocketAddress remote) { + sendMessage(connectionKey, (Object)message, remote); } - public void sendMessage(final String connectionKey, final Object message) { + public void sendMessage( + final String connectionKey, + final Object message, + final InetSocketAddress remote) { final Exchange exchange = getEndpoint().createExchange(); // set header and body + exchange.getIn().setHeader(WebsocketConstants.REMOTE_ADDRESS, remote); exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); exchange.getIn().setBody(message); diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java index b7a02544e7c12..a06199c240c44 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/DefaultWebsocketTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.websocket; +import java.net.InetSocketAddress; + import org.eclipse.jetty.websocket.api.Session; import org.junit.Before; import org.junit.Test; @@ -29,6 +31,7 @@ import static org.junit.Assert.assertNull; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultWebsocketTest { @@ -36,6 +39,7 @@ public class DefaultWebsocketTest { private static final int CLOSE_CODE = -1; private static final String MESSAGE = "message"; private static final String CONNECTION_KEY = "random-connection-key"; + private static final InetSocketAddress ADDRESS = InetSocketAddress.createUnresolved("127.0.0.1", 12345); @Mock private Session session; @@ -46,10 +50,12 @@ public class DefaultWebsocketTest { private DefaultWebsocket defaultWebsocket; + @Before public void setUp() throws Exception { defaultWebsocket = new DefaultWebsocket(sync, null, consumer); defaultWebsocket.setConnectionKey(CONNECTION_KEY); + when(session.getRemoteAddress()).thenReturn(ADDRESS); } @Test @@ -74,9 +80,10 @@ public void testOnConnect() { @Test public void testOnMessage() { defaultWebsocket.setConnectionKey(CONNECTION_KEY); + defaultWebsocket.setSession(session); defaultWebsocket.onMessage(MESSAGE); InOrder inOrder = inOrder(session, consumer, sync); - inOrder.verify(consumer, times(1)).sendMessage(CONNECTION_KEY, MESSAGE); + inOrder.verify(consumer, times(1)).sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); inOrder.verifyNoMoreInteractions(); } @@ -86,7 +93,7 @@ public void testOnMessageWithNullConsumer() { defaultWebsocket.setConnectionKey(CONNECTION_KEY); defaultWebsocket.onMessage(MESSAGE); InOrder inOrder = inOrder(session, consumer, sync); - inOrder.verify(consumer, times(0)).sendMessage(CONNECTION_KEY, MESSAGE); + inOrder.verify(consumer, times(0)).sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); inOrder.verifyNoMoreInteractions(); } diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java index a12410de86257..5871ad4c5a2d4 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentServletTest.java @@ -16,9 +16,11 @@ */ package org.apache.camel.component.websocket; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.junit.Before; import org.junit.Test; @@ -30,8 +32,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; /** * @@ -42,7 +46,10 @@ public class WebsocketComponentServletTest { private static final String PROTOCOL = "ws"; private static final String MESSAGE = "message"; private static final String CONNECTION_KEY = "random-connection-key"; + private static final InetSocketAddress ADDRESS = InetSocketAddress.createUnresolved("127.0.0.1", 12345); + @Mock + private Session session; @Mock private WebsocketConsumer consumer; @Mock @@ -58,8 +65,8 @@ public class WebsocketComponentServletTest { public void setUp() throws Exception { socketFactory = new HashMap<>(); socketFactory.put("default", new DefaultWebsocketFactory()); - websocketComponentServlet = new WebsocketComponentServlet(sync, null, socketFactory); + when(session.getRemoteAddress()).thenReturn(ADDRESS); } @Test @@ -82,9 +89,10 @@ public void testDoWebSocketConnect() { assertEquals(DefaultWebsocket.class, webSocket.getClass()); DefaultWebsocket defaultWebsocket = webSocket; defaultWebsocket.setConnectionKey(CONNECTION_KEY); + defaultWebsocket.setSession(session); defaultWebsocket.onMessage(MESSAGE); InOrder inOrder = inOrder(consumer, sync, request); - inOrder.verify(consumer, times(1)).sendMessage(CONNECTION_KEY, MESSAGE); + inOrder.verify(consumer, times(1)).sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); inOrder.verifyNoMoreInteractions(); } diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java index 38a601b39f331..bdf811af40dc0 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketConsumerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.websocket; +import java.net.InetSocketAddress; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; @@ -39,6 +41,7 @@ public class WebsocketConsumerTest { private static final String CONNECTION_KEY = "random-connection-key"; private static final String MESSAGE = "message"; + private static final InetSocketAddress ADDRESS = InetSocketAddress.createUnresolved("127.0.0.1", 12345); @Mock private WebsocketEndpoint endpoint; @@ -65,7 +68,7 @@ public void testSendExchange() throws Exception { when(endpoint.createExchange()).thenReturn(exchange); when(exchange.getIn()).thenReturn(outMessage); - websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE); + websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); InOrder inOrder = inOrder(endpoint, exceptionHandler, processor, exchange, outMessage); inOrder.verify(endpoint, times(1)).createExchange(); @@ -85,7 +88,7 @@ public void testSendExchangeWithException() throws Exception { doThrow(exception).when(processor).process(exchange); when(exchange.getException()).thenReturn(exception); - websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE); + websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); InOrder inOrder = inOrder(endpoint, exceptionHandler, processor, exchange, outMessage); inOrder.verify(endpoint, times(1)).createExchange(); @@ -106,7 +109,7 @@ public void testSendExchangeWithExchangeExceptionIsNull() throws Exception { doThrow(exception).when(processor).process(exchange); when(exchange.getException()).thenReturn(null); - websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE); + websocketConsumer.sendMessage(CONNECTION_KEY, MESSAGE, ADDRESS); InOrder inOrder = inOrder(endpoint, exceptionHandler, processor, exchange, outMessage); inOrder.verify(endpoint, times(1)).createExchange();