From 09f5243006c00ec18cbc67c5990901398954bf5b Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 20 Sep 2018 21:21:49 -0700 Subject: [PATCH 1/2] WebSockets Proxy support (#378) --- ConsumingEvents.md | 4 +- PublishingEvents.md | 2 +- azure-eventhubs-eph/pom.xml | 2 +- azure-eventhubs-extensions/pom.xml | 2 +- azure-eventhubs/pom.xml | 2 +- .../azure/eventhubs/impl/AmqpConnection.java | 7 + .../azure/eventhubs/impl/AmqpErrorCode.java | 4 + .../azure/eventhubs/impl/ClientConstants.java | 3 +- .../eventhubs/impl/ConnectionHandler.java | 61 +++++- .../eventhubs/impl/EventHubClientImpl.java | 1 + .../eventhubs/impl/MessagingFactory.java | 16 +- .../impl/WebSocketConnectionHandler.java | 14 +- .../impl/WebSocketProxyConnectionHandler.java | 194 ++++++++++++++++++ .../connstrbuilder/TransportTypeTest.java | 148 +++++++++++++ .../SendLargeMessageTest.java | 4 +- .../eventhubs/proxy/ProxyReceiveTest.java | 82 ++++++++ .../eventhubs/proxy/ProxySelectorTest.java | 67 ++++++ .../proxy/ProxySendLargeMessageTest.java | 79 +++++++ .../azure/eventhubs/proxy/ProxySendTest.java | 91 ++++++++ pom.xml | 19 +- readme.md | 2 +- 21 files changed, 773 insertions(+), 31 deletions(-) create mode 100644 azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxyReceiveTest.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySelectorTest.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendLargeMessageTest.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendTest.java diff --git a/ConsumingEvents.md b/ConsumingEvents.md index eb7a4b0a8..b2bca6e35 100644 --- a/ConsumingEvents.md +++ b/ConsumingEvents.md @@ -26,7 +26,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs - 1.1.0 + 1.2.0 ``` @@ -199,7 +199,7 @@ an outdated offset. Azure Event Hubs requires using the AMQP 1.0 protocol for consuming events. AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using -TCP port 5672. For the WebSocket binding of AMQP, traffic flows via port 443. +TCP port 5671. For the WebSocket binding of AMQP, traffic flows via port 443. ## Connection Strings diff --git a/PublishingEvents.md b/PublishingEvents.md index f81f84976..e79916e01 100644 --- a/PublishingEvents.md +++ b/PublishingEvents.md @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs - 1.1.0 + 1.2.0 ``` diff --git a/azure-eventhubs-eph/pom.xml b/azure-eventhubs-eph/pom.xml index dc6f2f6d0..7b192c7a9 100644 --- a/azure-eventhubs-eph/pom.xml +++ b/azure-eventhubs-eph/pom.xml @@ -4,7 +4,7 @@ com.microsoft.azure azure-eventhubs-clients - 1.1.0 + 1.2.0 2.0.1 diff --git a/azure-eventhubs-extensions/pom.xml b/azure-eventhubs-extensions/pom.xml index 36aa35ffa..6dd907426 100644 --- a/azure-eventhubs-extensions/pom.xml +++ b/azure-eventhubs-extensions/pom.xml @@ -7,7 +7,7 @@ com.microsoft.azure azure-eventhubs-clients - 1.1.0 + 1.2.0 4.0.0 diff --git a/azure-eventhubs/pom.xml b/azure-eventhubs/pom.xml index ebcd67deb..3992685b4 100644 --- a/azure-eventhubs/pom.xml +++ b/azure-eventhubs/pom.xml @@ -4,7 +4,7 @@ com.microsoft.azure azure-eventhubs-clients - 1.1.0 + 1.2.0 4.0.0 diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConnection.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConnection.java index 0219ab7f5..8713accb3 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConnection.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConnection.java @@ -8,6 +8,13 @@ import org.apache.qpid.proton.engine.Link; public interface AmqpConnection { + + /** + * Host name intended to be used on Amqp Connection Open frame + * @return host name + */ + String getHostName(); + void onOpenComplete(Exception exception); void onConnectionError(ErrorCondition error); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpErrorCode.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpErrorCode.java index 1d1579e20..79cd3af28 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpErrorCode.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpErrorCode.java @@ -23,4 +23,8 @@ public final class AmqpErrorCode { // connection errors public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced"); + + // proton library introduced this amqpsymbol in their code-base to communicate IOExceptions + // while performing operations on SocketChannel (in IOHandler.java) + public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol("proton:io"); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index 3737d04f7..69ff8bb50 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -38,7 +38,7 @@ public final class ClientConstants { public final static String NO_RETRY = "NoRetry"; public final static String DEFAULT_RETRY = "Default"; public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = "1.1.0"; + public final static String CURRENT_JAVACLIENT_VERSION = "1.2.0"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); public static final String CBS_ADDRESS = "$cbs"; @@ -74,6 +74,7 @@ public final class ClientConstants { public static final Symbol LAST_ENQUEUED_TIME_UTC = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC); public static final String AMQP_REQUEST_FAILED_ERROR = "status-code: %s, status-description: %s"; public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s"; + public static final String HTTPS_URI_FORMAT = "https://%s:%s"; public static final int MAX_RECEIVER_NAME_LENGTH = 64; /** diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index cae891d0b..8bf8fd282 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -4,6 +4,7 @@ */ package com.microsoft.azure.eventhubs.impl; +import com.microsoft.azure.eventhubs.TransportType; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -29,12 +30,30 @@ public class ConnectionHandler extends BaseHandler { private final AmqpConnection messagingFactory; - public ConnectionHandler(final AmqpConnection messagingFactory) { + protected ConnectionHandler(final AmqpConnection messagingFactory) { add(new Handshaker()); this.messagingFactory = messagingFactory; } + static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) { + switch (transportType) { + case AMQP_WEB_SOCKETS: + if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) { + return new WebSocketProxyConnectionHandler(messagingFactory); + } else { + return new WebSocketConnectionHandler(messagingFactory); + } + case AMQP: + default: + return new ConnectionHandler(messagingFactory); + } + } + + protected AmqpConnection getMessagingFactory() { + return this.messagingFactory; + } + private static SslDomain makeDomain(SslDomain.Mode mode) { final SslDomain domain = Proton.sslDomain(); @@ -49,7 +68,10 @@ private static SslDomain makeDomain(SslDomain.Mode mode) { public void onConnectionInit(Event event) { final Connection connection = event.getConnection(); - final String hostName = event.getReactor().getConnectionAddress(connection); + final String hostName = new StringBuilder(this.messagingFactory.getHostName()) + .append(":") + .append(String.valueOf(this.getProtocolPort())) + .toString(); connection.setHostname(hostName); connection.setContainer(StringUtil.getRandomString()); @@ -72,9 +94,37 @@ public void onConnectionInit(Event event) { } protected void addTransportLayers(final Event event, final TransportInternal transport) { + final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT); + transport.ssl(domain); + } + + protected void notifyTransportErrors(final Event event) { + // no-op } - protected int getPort() { + /** + * HostName to be used for socket creation. + * for ex: in case of proxy server - this could be proxy ip address + * @return host name + */ + public String getOutboundSocketHostName() { + return messagingFactory.getHostName(); + } + + /** + * port used to create socket. + * for ex: in case of talking to event hubs service via proxy - use proxy port + * @return port + */ + protected int getOutboundSocketPort() { + return this.getProtocolPort(); + } + + /** + * Port used on connection open frame + * @return port + */ + protected int getProtocolPort() { return ClientConstants.AMQPS_PORT; } @@ -88,9 +138,6 @@ public void onConnectionBound(Event event) { final Transport transport = event.getTransport(); this.addTransportLayers(event, (TransportInternal) transport); - - final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT); - transport.ssl(domain); } @Override @@ -126,6 +173,8 @@ public void onTransportError(Event event) { // onTransportError event is not handled by the global IO Handler for cleanup transport.unbind(); + + this.notifyTransportErrors(event); } @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 8bd65d0d3..271db4e13 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -41,6 +41,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl * It will be truncated to 128 characters */ public static String USER_AGENT = null; + private final String eventHubName; private final Object senderCreateSync; private volatile boolean isSenderCreateStarted; diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index d25e4ae4f..ee8222c3e 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -82,10 +82,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.retryPolicy = retryPolicy; this.registeredLinks = new LinkedList<>(); this.reactorLock = new Object(); - this.connectionHandler = - builder.getTransportType() == TransportType.AMQP - ? new ConnectionHandler(this) - : new WebSocketConnectionHandler(this); + this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this); this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); this.tokenProvider = builder.getSharedAccessSignature() == null @@ -146,6 +143,7 @@ public void run() { return messagingFactory.open; } + @Override public String getHostName() { return this.hostName; } @@ -174,7 +172,10 @@ public void onReactorInit(Event e) { super.onReactorInit(e); final Reactor r = e.getReactor(); - connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler); + connection = r.connectionToHost( + connectionHandler.getOutboundSocketHostName(), + connectionHandler.getOutboundSocketPort(), + connectionHandler); } }); } @@ -219,7 +220,10 @@ public Session getSession(final String path, final Consumer onRemoteSes } if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { - this.connection = this.getReactor().connectionToHost(this.hostName, this.connectionHandler.getPort(), this.connectionHandler); + this.connection = this.getReactor().connectionToHost( + this.connectionHandler.getOutboundSocketHostName(), + this.connectionHandler.getOutboundSocketPort(), + this.connectionHandler); } final Session session = this.connection.session(); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 2a519cf53..9b8f7615f 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory; public class WebSocketConnectionHandler extends ConnectionHandler { - private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); public WebSocketConnectionHandler(AmqpConnection messagingFactory) { super(messagingFactory); @@ -19,11 +19,13 @@ public WebSocketConnectionHandler(AmqpConnection messagingFactory) { @Override protected void addTransportLayers(final Event event, final TransportInternal transport) { + final String hostName = event.getConnection().getHostname(); + final WebSocketImpl webSocket = new WebSocketImpl(); webSocket.configure( - event.getConnection().getHostname(), + hostName, "/$servicebus/websocket", - null, + "", 0, "AMQPWSB10", null, @@ -32,12 +34,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra transport.addTransportLayer(webSocket); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]"); + TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]"); } + + super.addTransportLayers(event, transport); } @Override - protected int getPort() { + protected int getProtocolPort() { return ClientConstants.HTTPS_PORT; } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java new file mode 100644 index 000000000..db916dcfb --- /dev/null +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.impl; + +import com.microsoft.azure.proton.transport.proxy.ProxyHandler; +import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl; +import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl; + +import org.apache.qpid.proton.amqp.transport.ConnectionError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.TransportInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.InetSocketAddress; +import java.net.PasswordAuthentication; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.URI; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WebSocketProxyConnectionHandler extends WebSocketConnectionHandler { + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketProxyConnectionHandler.class); + private final String proxySelectorModifiedError = "ProxySelector has been modified."; + + public static Boolean shouldUseProxy(final String hostName) { + final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT); + final ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + return false; + } + + final List proxies = proxySelector.select(uri); + return isProxyAddressLegal(proxies); + } + + public WebSocketProxyConnectionHandler(AmqpConnection messagingFactory) { + super(messagingFactory); + } + + @Override + protected void addTransportLayers(final Event event, final TransportInternal transport) { + super.addTransportLayers(event, transport); + + final ProxyImpl proxy = new ProxyImpl(); + + // host name used to create proxy connect request + // after creating the socket to proxy + final String hostName = event.getConnection().getHostname(); + final ProxyHandler proxyHandler = new ProxyHandlerImpl(); + final Map proxyHeader = getAuthorizationHeader(); + proxy.configure(hostName, proxyHeader, proxyHandler, transport); + + transport.addTransportLayer(proxy); + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info("addProxyHandshake: hostname[" + hostName +"]"); + } + } + + @Override + protected void notifyTransportErrors(final Event event) { + final Transport transport = event.getTransport(); + final Connection connection = event.getConnection(); + if (connection == null || transport == null) { + return; + } + + final ErrorCondition errorCondition = transport.getCondition(); + final String hostName = event.getReactor().getConnectionAddress(connection); + final ProxySelector proxySelector = ProxySelector.getDefault(); + + if (errorCondition == null + || !(errorCondition.getCondition().equals(ConnectionError.FRAMING_ERROR) + || errorCondition.getCondition().equals(AmqpErrorCode.PROTON_IO_ERROR)) + || proxySelector == null + || StringUtil.isNullOrEmpty(hostName)) { + return; + } + + final String[] hostNameParts = hostName.split(":"); + if (hostNameParts.length != 2) { + return; + } + + int port; + try { + port = Integer.parseInt(hostNameParts[1]); + } catch (NumberFormatException ignore){ + return; + } + + final IOException ioException = reconstructIOException(errorCondition); + proxySelector.connectFailed( + createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()), + new InetSocketAddress(hostNameParts[0], port), + ioException); + } + + @Override + public String getOutboundSocketHostName() { + final InetSocketAddress socketAddress = getProxyAddress(); + return socketAddress.getHostString(); + } + + @Override + public int getOutboundSocketPort() { + final InetSocketAddress socketAddress = getProxyAddress(); + return socketAddress.getPort(); + } + + private Map getAuthorizationHeader() { + final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication( + getOutboundSocketHostName(), + null, + getOutboundSocketPort(), + null, + null, + "http", + null, + Authenticator.RequestorType.PROXY); + if (authentication == null) { + return null; + } + + final String proxyUserName = authentication.getUserName(); + final String proxyPassword = authentication.getPassword() != null + ? new String(authentication.getPassword()) + : null; + if (StringUtil.isNullOrEmpty(proxyUserName) + || StringUtil.isNullOrEmpty(proxyPassword)) { + return null; + } + + final HashMap proxyAuthorizationHeader = new HashMap<>(); + // https://tools.ietf.org/html/rfc7617 + final String usernamePasswordPair = proxyUserName + ":" + proxyPassword; + proxyAuthorizationHeader.put( + "Proxy-Authorization", + "Basic " + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes())); + return proxyAuthorizationHeader; + } + + private InetSocketAddress getProxyAddress() { + final URI serviceUri = createURIFromHostNamePort( + this.getMessagingFactory().getHostName(), + this.getProtocolPort()); + final ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + throw new IllegalStateException(proxySelectorModifiedError); + } + + final List proxies = proxySelector.select(serviceUri); + if (!isProxyAddressLegal(proxies)) { + throw new IllegalStateException(proxySelectorModifiedError); + } + + final Proxy proxy = proxies.get(0); + return (InetSocketAddress) proxy.address(); + } + + private static URI createURIFromHostNamePort(final String hostName, final int port) { + return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port)); + } + + private static boolean isProxyAddressLegal(final List proxies) { + // we look only at the first proxy in the list + // if the proxy can be translated to InetSocketAddress + // only then - can we parse it to hostName and Port + // which is required by qpid-proton-j library reactor.connectToHost() API + return proxies != null + && !proxies.isEmpty() + && proxies.get(0).type() == Proxy.Type.HTTP + && proxies.get(0).address() != null + && proxies.get(0).address() instanceof InetSocketAddress; + } + + private static IOException reconstructIOException(ErrorCondition errorCondition) { + // since proton library communicates all errors based on amqp-error-condition + // it swallows the IOException and translates it to proton-io errorCode + // we reconstruct the IOException in this case - but, callstack is lost + return new IOException(errorCondition.getDescription()); + } +} diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java new file mode 100644 index 000000000..cec469fae --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.connstrbuilder; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.TransportType; +import com.microsoft.azure.eventhubs.impl.ConnectionHandler; +import com.microsoft.azure.eventhubs.impl.EventHubClientImpl; +import com.microsoft.azure.eventhubs.impl.MessagingFactory; +import com.microsoft.azure.eventhubs.lib.ApiTestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; + +import org.jutils.jproxy.ProxyServer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.*; +import java.util.LinkedList; +import java.util.List; + +public class TransportTypeTest extends ApiTestBase { + + public volatile boolean isProxySelectorInvoked = false; + + @Test + public void transportTypeAmqpCreatesConnectionWithPort5671() throws Exception { + ConnectionStringBuilder builder = new ConnectionStringBuilder(TestContext.getConnectionString().toString()); + builder.setTransportType(TransportType.AMQP); + + EventHubClient ehClient = EventHubClient.createSync(builder.toString(), TestContext.EXECUTOR_SERVICE); + try { + EventHubClientImpl eventHubClientImpl = (EventHubClientImpl) ehClient; + final Field factoryField = EventHubClientImpl.class.getDeclaredField("underlyingFactory"); + factoryField.setAccessible(true); + final MessagingFactory underlyingFactory = (MessagingFactory) factoryField.get(eventHubClientImpl); + + final Field connectionHandlerField = MessagingFactory.class.getDeclaredField("connectionHandler"); + connectionHandlerField.setAccessible(true); + final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); + + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + outboundSocketPort.setAccessible(true); + + final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); + protocolPort.setAccessible(true); + + Assert.assertEquals(5671, outboundSocketPort.invoke(connectionHandler)); + Assert.assertEquals(5671, protocolPort.invoke(connectionHandler)); + } finally { + ehClient.closeSync(); + } + } + + @Test + public void transportTypeAmqpWebSocketsCreatesConnectionWithPort443() throws Exception { + ConnectionStringBuilder builder = new ConnectionStringBuilder(TestContext.getConnectionString().toString()); + builder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + + EventHubClient ehClient = EventHubClient.createSync(builder.toString(), TestContext.EXECUTOR_SERVICE); + try { + EventHubClientImpl eventHubClientImpl = (EventHubClientImpl) ehClient; + final Field factoryField = EventHubClientImpl.class.getDeclaredField("underlyingFactory"); + factoryField.setAccessible(true); + final MessagingFactory underlyingFactory = (MessagingFactory) factoryField.get(eventHubClientImpl); + + final Field connectionHandlerField = MessagingFactory.class.getDeclaredField("connectionHandler"); + connectionHandlerField.setAccessible(true); + final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); + + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + outboundSocketPort.setAccessible(true); + + final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); + protocolPort.setAccessible(true); + + Assert.assertEquals(443, outboundSocketPort.invoke(connectionHandler)); + Assert.assertEquals(443, protocolPort.invoke(connectionHandler)); + } finally { + ehClient.closeSync(); + } + } + + @Test + public void transportTypeAmqpWebSocketsWithProxyCreatesConnectionWithCorrectPorts() throws Exception { + int proxyPort = 8899; + ProxyServer proxyServer = ProxyServer.create("localhost", proxyPort); + proxyServer.start(throwable -> {}); + + ProxySelector defaultProxySelector = ProxySelector.getDefault(); + this.isProxySelectorInvoked = false; + try { + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + LinkedList proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", proxyPort))); + isProxySelectorInvoked = true; + return proxies; + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + // no-op + } + }); + + ConnectionStringBuilder builder = new ConnectionStringBuilder(TestContext.getConnectionString().toString()); + builder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + + EventHubClient ehClient = EventHubClient.createSync(builder.toString(), TestContext.EXECUTOR_SERVICE); + try { + EventHubClientImpl eventHubClientImpl = (EventHubClientImpl) ehClient; + final Field factoryField = EventHubClientImpl.class.getDeclaredField("underlyingFactory"); + factoryField.setAccessible(true); + final MessagingFactory underlyingFactory = (MessagingFactory) factoryField.get(eventHubClientImpl); + + final Field connectionHandlerField = MessagingFactory.class.getDeclaredField("connectionHandler"); + connectionHandlerField.setAccessible(true); + final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); + + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + outboundSocketPort.setAccessible(true); + + final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); + protocolPort.setAccessible(true); + + Assert.assertEquals(proxyPort, outboundSocketPort.invoke(connectionHandler)); + Assert.assertEquals(443, protocolPort.invoke(connectionHandler)); + + Assert.assertTrue(isProxySelectorInvoked); + } finally { + ehClient.closeSync(); + ProxySelector.setDefault(defaultProxySelector); + } + } finally { + proxyServer.stop(); + } + } +} \ No newline at end of file diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java index 6ef1fd482..7dc2f4b80 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java @@ -32,10 +32,10 @@ public static void initialize() throws Exception { public static void initializeEventHubClients(ConnectionStringBuilder connStr) throws Exception { - ehClient = EventHubClient.create(connStr.toString(), TestContext.EXECUTOR_SERVICE).get(); + ehClient = EventHubClient.createSync(connStr.toString(), TestContext.EXECUTOR_SERVICE); sender = ehClient.createPartitionSender(partitionId).get(); - receiverHub = EventHubClient.create(connStr.toString(), TestContext.EXECUTOR_SERVICE).get(); + receiverHub = EventHubClient.createSync(connStr.toString(), TestContext.EXECUTOR_SERVICE); receiver = receiverHub.createReceiver(TestContext.getConsumerGroupName(), partitionId, EventPosition.fromEnqueuedTime(Instant.now())).get(); } diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxyReceiveTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxyReceiveTest.java new file mode 100644 index 000000000..52551cc89 --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxyReceiveTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.proxy; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.TransportType; +import com.microsoft.azure.eventhubs.lib.SasTokenTestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import com.microsoft.azure.eventhubs.sendrecv.ReceiveTest; +import org.jutils.jproxy.ProxyServer; +import org.junit.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; + +public class ProxyReceiveTest extends SasTokenTestBase { + + private static final int proxyPort = 8899; + private static ProxyServer proxyServer; + private static ReceiveTest receiveTest; + private static ProxySelector defaultProxySelector; + + @BeforeClass + public static void initialize() throws Exception { + proxyServer = ProxyServer.create("localhost", proxyPort); + proxyServer.start(t -> {}); + + defaultProxySelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + LinkedList proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", proxyPort))); + return proxies; + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + // no-op + } + }); + + Assert.assertTrue(TestContext.getConnectionString().getSharedAccessSignature() != null + && TestContext.getConnectionString().getSasKey() == null + && TestContext.getConnectionString().getSasKeyName() == null); + + receiveTest = new ReceiveTest(); + ConnectionStringBuilder connectionString = TestContext.getConnectionString(); + connectionString.setTransportType(TransportType.AMQP_WEB_SOCKETS); + ReceiveTest.initializeEventHub(connectionString); + } + + @AfterClass() + public static void cleanup() throws Exception { + ReceiveTest.cleanup(); + + if (proxyServer != null) { + proxyServer.stop(); + } + + ProxySelector.setDefault(defaultProxySelector); + } + + @Test() + public void testReceiverStartOfStreamFilters() throws EventHubException { + receiveTest.testReceiverStartOfStreamFilters(); + } + + @After + public void testCleanup() throws EventHubException { + receiveTest.testCleanup(); + } +} \ No newline at end of file diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySelectorTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySelectorTest.java new file mode 100644 index 000000000..fb2733e5c --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySelectorTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.proxy; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.TransportType; +import com.microsoft.azure.eventhubs.lib.ApiTestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; +import java.net.URI; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class ProxySelectorTest extends ApiTestBase { + + @Test + public void proxySelectorConnectFailedInvokeTest() throws Exception { + // doesn't start proxy server and verifies that the connectFailed callback is invoked. + int proxyPort = 8899; + final CompletableFuture connectFailedTask = new CompletableFuture<>(); + final ProxySelector defaultProxySelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + LinkedList proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", proxyPort))); + return proxies; + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + connectFailedTask.complete(null); + } + }); + + try { + ConnectionStringBuilder builder = new ConnectionStringBuilder(TestContext.getConnectionString().toString()); + builder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + builder.setOperationTimeout(Duration.ofSeconds(10)); + + try { + EventHubClient.createSync(builder.toString(), TestContext.EXECUTOR_SERVICE); + Assert.assertTrue(false); // shouldn't reach here + } catch (EventHubException ex) { + Assert.assertEquals("connection aborted", ex.getMessage()); + } + + connectFailedTask.get(2, TimeUnit.SECONDS); + } finally { + ProxySelector.setDefault(defaultProxySelector); + } + } +} diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendLargeMessageTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendLargeMessageTest.java new file mode 100644 index 000000000..5caa1b77b --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendLargeMessageTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.proxy; + +import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.exceptioncontracts.SendLargeMessageTest; +import com.microsoft.azure.eventhubs.lib.ApiTestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import org.jutils.jproxy.ProxyServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.*; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class ProxySendLargeMessageTest extends ApiTestBase { + private static int proxyPort = 8899; + private static ProxyServer proxyServer; + private static SendLargeMessageTest sendLargeMessageTest; + private static ProxySelector defaultProxySelector; + + @BeforeClass + public static void initialize() throws Exception { + proxyServer = ProxyServer.create("localhost", proxyPort); + proxyServer.start(t -> {}); + + defaultProxySelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + LinkedList proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", proxyPort))); + return proxies; + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + // no-op + } + }); + + final ConnectionStringBuilder connectionStringBuilder = TestContext.getConnectionString(); + connectionStringBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + sendLargeMessageTest = new SendLargeMessageTest(); + SendLargeMessageTest.initializeEventHubClients(connectionStringBuilder); + } + + @AfterClass() + public static void cleanup() throws Exception { + SendLargeMessageTest.cleanup(); + + if (proxyServer != null) { + proxyServer.stop(); + } + + ProxySelector.setDefault(defaultProxySelector); + } + + @Test() + public void sendMsgLargerThan64k() throws EventHubException, InterruptedException, ExecutionException, IOException { + sendLargeMessageTest.sendMsgLargerThan64k(); + } + + @Test(expected = PayloadSizeExceededException.class) + public void sendMsgLargerThan256K() throws EventHubException, InterruptedException, ExecutionException, IOException { + sendLargeMessageTest.sendMsgLargerThan256K(); + } + + @Test() + public void sendMsgLargerThan128k() throws EventHubException, InterruptedException, ExecutionException, IOException { + sendLargeMessageTest.sendMsgLargerThan128k(); + } +} \ No newline at end of file diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendTest.java new file mode 100644 index 000000000..8c47489df --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/proxy/ProxySendTest.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.proxy; + +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.TransportType; +import com.microsoft.azure.eventhubs.lib.SasTokenTestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import com.microsoft.azure.eventhubs.sendrecv.SendTest; +import org.jutils.jproxy.ProxyServer; +import org.junit.*; + +import java.io.IOException; +import java.net.*; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +public class ProxySendTest extends SasTokenTestBase { + + private static int proxyPort = 8899; + private static ProxyServer proxyServer; + private static SendTest sendTest; + private static ProxySelector defaultProxySelector; + + @BeforeClass + public static void initialize() throws Exception { + proxyServer = ProxyServer.create("localhost", proxyPort); + proxyServer.start(t -> {}); + + defaultProxySelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + LinkedList proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", proxyPort))); + return proxies; + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + // no-op + } + }); + + Assert.assertTrue(TestContext.getConnectionString().getSharedAccessSignature() != null + && TestContext.getConnectionString().getSasKey() == null + && TestContext.getConnectionString().getSasKeyName() == null); + + sendTest = new SendTest(); + + ConnectionStringBuilder connectionString = TestContext.getConnectionString(); + connectionString.setTransportType(TransportType.AMQP_WEB_SOCKETS); + SendTest.initializeEventHub(connectionString); + } + + @AfterClass + public static void cleanupClient() throws Exception { + + SendTest.cleanupClient(); + + if (proxyServer != null) { + proxyServer.stop(); + } + + ProxySelector.setDefault(defaultProxySelector); + } + + @Test + public void sendBatchRetainsOrderWithinBatch() throws EventHubException, InterruptedException, ExecutionException, TimeoutException { + + sendTest.sendBatchRetainsOrderWithinBatch(); + } + + @Test + public void sendResultsInSysPropertiesWithPartitionKey() throws EventHubException, InterruptedException, ExecutionException, TimeoutException { + + sendTest.sendResultsInSysPropertiesWithPartitionKey(); + } + + @After + public void cleanup() throws Exception { + + sendTest.cleanup(); + } +} diff --git a/pom.xml b/pom.xml index 006de12e6..dbdcb232a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,14 +6,14 @@ com.microsoft.azure azure-eventhubs-clients - 1.1.0 + 1.2.0 pom https://github.com/Azure/azure-event-hubs - 0.28.1 - 1.0.0 + 0.29.0 + 1.1.0 4.12 1.8.0-alpha2 @@ -34,7 +34,12 @@ - + + + HTTP proxy server in java + http://raw.github.com/SreeramGarlapati/jproxy/master/releases + + org.apache.qpid @@ -57,6 +62,12 @@ ${junit-version} test + + org.jutils.jproxy + jproxy + 0.0.1 + test + diff --git a/readme.md b/readme.md index 9865e9cd8..e90f81349 100644 --- a/readme.md +++ b/readme.md @@ -44,7 +44,7 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK com.microsoft.azure azure-eventhubs - 1.1.0 + 1.2.0 ``` From f8493e54be28a97112e700d6124fe339d2b8274f Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 20 Sep 2018 22:17:19 -0700 Subject: [PATCH 2/2] followup: websockets proxy support (#381) --- .../eventhubs/impl/ConnectionHandler.java | 36 +++++++++---------- .../azure/eventhubs/impl/MessageReceiver.java | 3 +- .../eventhubs/impl/MessagingFactory.java | 9 +++-- .../impl/WebSocketConnectionHandler.java | 4 +-- .../impl/WebSocketProxyConnectionHandler.java | 16 ++++----- .../connstrbuilder/TransportTypeTest.java | 9 ++--- 6 files changed, 37 insertions(+), 40 deletions(-) diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index 8bf8fd282..1e2c32ce6 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -28,30 +28,30 @@ public class ConnectionHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); - private final AmqpConnection messagingFactory; + private final AmqpConnection amqpConnection; - protected ConnectionHandler(final AmqpConnection messagingFactory) { + protected ConnectionHandler(final AmqpConnection amqpConnection) { add(new Handshaker()); - this.messagingFactory = messagingFactory; + this.amqpConnection = amqpConnection; } - static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) { + static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) { switch (transportType) { case AMQP_WEB_SOCKETS: - if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) { - return new WebSocketProxyConnectionHandler(messagingFactory); + if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) { + return new WebSocketProxyConnectionHandler(amqpConnection); } else { - return new WebSocketConnectionHandler(messagingFactory); + return new WebSocketConnectionHandler(amqpConnection); } case AMQP: default: - return new ConnectionHandler(messagingFactory); + return new ConnectionHandler(amqpConnection); } } - protected AmqpConnection getMessagingFactory() { - return this.messagingFactory; + protected AmqpConnection getAmqpConnection() { + return this.amqpConnection; } private static SslDomain makeDomain(SslDomain.Mode mode) { @@ -68,7 +68,7 @@ private static SslDomain makeDomain(SslDomain.Mode mode) { public void onConnectionInit(Event event) { final Connection connection = event.getConnection(); - final String hostName = new StringBuilder(this.messagingFactory.getHostName()) + final String hostName = new StringBuilder(this.amqpConnection.getHostName()) .append(":") .append(String.valueOf(this.getProtocolPort())) .toString(); @@ -107,8 +107,8 @@ protected void notifyTransportErrors(final Event event) { * for ex: in case of proxy server - this could be proxy ip address * @return host name */ - public String getOutboundSocketHostName() { - return messagingFactory.getHostName(); + public String getRemoteHostName() { + return amqpConnection.getHostName(); } /** @@ -116,7 +116,7 @@ public String getOutboundSocketHostName() { * for ex: in case of talking to event hubs service via proxy - use proxy port * @return port */ - protected int getOutboundSocketPort() { + protected int getRemotePort() { return this.getProtocolPort(); } @@ -168,7 +168,7 @@ public void onTransportError(Event event) { if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { // if the remote-peer abruptly closes the connection without issuing close frame // issue one - this.messagingFactory.onConnectionError(condition); + this.amqpConnection.onConnectionError(condition); } // onTransportError event is not handled by the global IO Handler for cleanup @@ -191,7 +191,7 @@ public void onTransportClosed(Event event) { if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { // if the remote-peer abruptly closes the connection without issuing close frame // issue one - this.messagingFactory.onConnectionError(condition); + this.amqpConnection.onConnectionError(condition); } } @@ -202,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) { TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]"); } - this.messagingFactory.onOpenComplete(null); + this.amqpConnection.onOpenComplete(null); } @Override @@ -240,6 +240,6 @@ public void onConnectionRemoteClose(Event event) { : "]")); } - this.messagingFactory.onConnectionError(error); + this.amqpConnection.onConnectionError(error); } } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 1dacf357c..ff6d418bc 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -429,7 +429,8 @@ public void onEvent() { private void drainPendingReceives(final Exception exception) { WorkItem> workItem; - final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException); + final boolean shouldReturnNull = (exception == null + || (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient())); while ((workItem = this.pendingReceives.poll()) != null) { final CompletableFuture> future = workItem.getWork(); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index ee8222c3e..52745ba89 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -37,7 +37,6 @@ import com.microsoft.azure.eventhubs.OperationCancelledException; import com.microsoft.azure.eventhubs.RetryPolicy; import com.microsoft.azure.eventhubs.TimeoutException; -import com.microsoft.azure.eventhubs.TransportType; /** * Abstracts all amqp related details and exposes AmqpConnection object @@ -173,8 +172,8 @@ public void onReactorInit(Event e) { final Reactor r = e.getReactor(); connection = r.connectionToHost( - connectionHandler.getOutboundSocketHostName(), - connectionHandler.getOutboundSocketPort(), + connectionHandler.getRemoteHostName(), + connectionHandler.getRemotePort(), connectionHandler); } }); @@ -221,8 +220,8 @@ public Session getSession(final String path, final Consumer onRemoteSes if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { this.connection = this.getReactor().connectionToHost( - this.connectionHandler.getOutboundSocketHostName(), - this.connectionHandler.getOutboundSocketPort(), + this.connectionHandler.getRemoteHostName(), + this.connectionHandler.getRemotePort(), this.connectionHandler); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 9b8f7615f..b8ab9484b 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -13,8 +13,8 @@ public class WebSocketConnectionHandler extends ConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); - public WebSocketConnectionHandler(AmqpConnection messagingFactory) { - super(messagingFactory); + public WebSocketConnectionHandler(AmqpConnection amqpConnection) { + super(amqpConnection); } @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index db916dcfb..41dc0baa3 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -44,8 +44,8 @@ public static Boolean shouldUseProxy(final String hostName) { return isProxyAddressLegal(proxies); } - public WebSocketProxyConnectionHandler(AmqpConnection messagingFactory) { - super(messagingFactory); + public WebSocketProxyConnectionHandler(AmqpConnection amqpConnection) { + super(amqpConnection); } @Override @@ -102,28 +102,28 @@ protected void notifyTransportErrors(final Event event) { final IOException ioException = reconstructIOException(errorCondition); proxySelector.connectFailed( - createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()), + createURIFromHostNamePort(this.getAmqpConnection().getHostName(), this.getProtocolPort()), new InetSocketAddress(hostNameParts[0], port), ioException); } @Override - public String getOutboundSocketHostName() { + public String getRemoteHostName() { final InetSocketAddress socketAddress = getProxyAddress(); return socketAddress.getHostString(); } @Override - public int getOutboundSocketPort() { + public int getRemotePort() { final InetSocketAddress socketAddress = getProxyAddress(); return socketAddress.getPort(); } private Map getAuthorizationHeader() { final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication( - getOutboundSocketHostName(), + getRemoteHostName(), null, - getOutboundSocketPort(), + getRemotePort(), null, null, "http", @@ -153,7 +153,7 @@ private Map getAuthorizationHeader() { private InetSocketAddress getProxyAddress() { final URI serviceUri = createURIFromHostNamePort( - this.getMessagingFactory().getHostName(), + this.getAmqpConnection().getHostName(), this.getProtocolPort()); final ProxySelector proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java index cec469fae..68eb7fb8d 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java @@ -6,7 +6,6 @@ import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.TransportType; import com.microsoft.azure.eventhubs.impl.ConnectionHandler; import com.microsoft.azure.eventhubs.impl.EventHubClientImpl; @@ -15,9 +14,7 @@ import com.microsoft.azure.eventhubs.lib.TestContext; import org.jutils.jproxy.ProxyServer; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -47,7 +44,7 @@ public void transportTypeAmqpCreatesConnectionWithPort5671() throws Exception { connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); @@ -76,7 +73,7 @@ public void transportTypeAmqpWebSocketsCreatesConnectionWithPort443() throws Exc connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); @@ -127,7 +124,7 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");