diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index 8bf8fd282..1e2c32ce6 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -28,30 +28,30 @@ public class ConnectionHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); - private final AmqpConnection messagingFactory; + private final AmqpConnection amqpConnection; - protected ConnectionHandler(final AmqpConnection messagingFactory) { + protected ConnectionHandler(final AmqpConnection amqpConnection) { add(new Handshaker()); - this.messagingFactory = messagingFactory; + this.amqpConnection = amqpConnection; } - static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) { + static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) { switch (transportType) { case AMQP_WEB_SOCKETS: - if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) { - return new WebSocketProxyConnectionHandler(messagingFactory); + if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) { + return new WebSocketProxyConnectionHandler(amqpConnection); } else { - return new WebSocketConnectionHandler(messagingFactory); + return new WebSocketConnectionHandler(amqpConnection); } case AMQP: default: - return new ConnectionHandler(messagingFactory); + return new ConnectionHandler(amqpConnection); } } - protected AmqpConnection getMessagingFactory() { - return this.messagingFactory; + protected AmqpConnection getAmqpConnection() { + return this.amqpConnection; } private static SslDomain makeDomain(SslDomain.Mode mode) { @@ -68,7 +68,7 @@ private static SslDomain makeDomain(SslDomain.Mode mode) { public void onConnectionInit(Event event) { final Connection connection = event.getConnection(); - final String hostName = new StringBuilder(this.messagingFactory.getHostName()) + final String hostName = new StringBuilder(this.amqpConnection.getHostName()) .append(":") .append(String.valueOf(this.getProtocolPort())) .toString(); @@ -107,8 +107,8 @@ protected void notifyTransportErrors(final Event event) { * for ex: in case of proxy server - this could be proxy ip address * @return host name */ - public String getOutboundSocketHostName() { - return messagingFactory.getHostName(); + public String getRemoteHostName() { + return amqpConnection.getHostName(); } /** @@ -116,7 +116,7 @@ public String getOutboundSocketHostName() { * for ex: in case of talking to event hubs service via proxy - use proxy port * @return port */ - protected int getOutboundSocketPort() { + protected int getRemotePort() { return this.getProtocolPort(); } @@ -168,7 +168,7 @@ public void onTransportError(Event event) { if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { // if the remote-peer abruptly closes the connection without issuing close frame // issue one - this.messagingFactory.onConnectionError(condition); + this.amqpConnection.onConnectionError(condition); } // onTransportError event is not handled by the global IO Handler for cleanup @@ -191,7 +191,7 @@ public void onTransportClosed(Event event) { if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { // if the remote-peer abruptly closes the connection without issuing close frame // issue one - this.messagingFactory.onConnectionError(condition); + this.amqpConnection.onConnectionError(condition); } } @@ -202,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) { TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]"); } - this.messagingFactory.onOpenComplete(null); + this.amqpConnection.onOpenComplete(null); } @Override @@ -240,6 +240,6 @@ public void onConnectionRemoteClose(Event event) { : "]")); } - this.messagingFactory.onConnectionError(error); + this.amqpConnection.onConnectionError(error); } } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 1dacf357c..ff6d418bc 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -429,7 +429,8 @@ public void onEvent() { private void drainPendingReceives(final Exception exception) { WorkItem> workItem; - final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException); + final boolean shouldReturnNull = (exception == null + || (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient())); while ((workItem = this.pendingReceives.poll()) != null) { final CompletableFuture> future = workItem.getWork(); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index ee8222c3e..52745ba89 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -37,7 +37,6 @@ import com.microsoft.azure.eventhubs.OperationCancelledException; import com.microsoft.azure.eventhubs.RetryPolicy; import com.microsoft.azure.eventhubs.TimeoutException; -import com.microsoft.azure.eventhubs.TransportType; /** * Abstracts all amqp related details and exposes AmqpConnection object @@ -173,8 +172,8 @@ public void onReactorInit(Event e) { final Reactor r = e.getReactor(); connection = r.connectionToHost( - connectionHandler.getOutboundSocketHostName(), - connectionHandler.getOutboundSocketPort(), + connectionHandler.getRemoteHostName(), + connectionHandler.getRemotePort(), connectionHandler); } }); @@ -221,8 +220,8 @@ public Session getSession(final String path, final Consumer onRemoteSes if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { this.connection = this.getReactor().connectionToHost( - this.connectionHandler.getOutboundSocketHostName(), - this.connectionHandler.getOutboundSocketPort(), + this.connectionHandler.getRemoteHostName(), + this.connectionHandler.getRemotePort(), this.connectionHandler); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 9b8f7615f..b8ab9484b 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -13,8 +13,8 @@ public class WebSocketConnectionHandler extends ConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); - public WebSocketConnectionHandler(AmqpConnection messagingFactory) { - super(messagingFactory); + public WebSocketConnectionHandler(AmqpConnection amqpConnection) { + super(amqpConnection); } @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index db916dcfb..41dc0baa3 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -44,8 +44,8 @@ public static Boolean shouldUseProxy(final String hostName) { return isProxyAddressLegal(proxies); } - public WebSocketProxyConnectionHandler(AmqpConnection messagingFactory) { - super(messagingFactory); + public WebSocketProxyConnectionHandler(AmqpConnection amqpConnection) { + super(amqpConnection); } @Override @@ -102,28 +102,28 @@ protected void notifyTransportErrors(final Event event) { final IOException ioException = reconstructIOException(errorCondition); proxySelector.connectFailed( - createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()), + createURIFromHostNamePort(this.getAmqpConnection().getHostName(), this.getProtocolPort()), new InetSocketAddress(hostNameParts[0], port), ioException); } @Override - public String getOutboundSocketHostName() { + public String getRemoteHostName() { final InetSocketAddress socketAddress = getProxyAddress(); return socketAddress.getHostString(); } @Override - public int getOutboundSocketPort() { + public int getRemotePort() { final InetSocketAddress socketAddress = getProxyAddress(); return socketAddress.getPort(); } private Map getAuthorizationHeader() { final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication( - getOutboundSocketHostName(), + getRemoteHostName(), null, - getOutboundSocketPort(), + getRemotePort(), null, null, "http", @@ -153,7 +153,7 @@ private Map getAuthorizationHeader() { private InetSocketAddress getProxyAddress() { final URI serviceUri = createURIFromHostNamePort( - this.getMessagingFactory().getHostName(), + this.getAmqpConnection().getHostName(), this.getProtocolPort()); final ProxySelector proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java index cec469fae..68eb7fb8d 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/connstrbuilder/TransportTypeTest.java @@ -6,7 +6,6 @@ import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.TransportType; import com.microsoft.azure.eventhubs.impl.ConnectionHandler; import com.microsoft.azure.eventhubs.impl.EventHubClientImpl; @@ -15,9 +14,7 @@ import com.microsoft.azure.eventhubs.lib.TestContext; import org.jutils.jproxy.ProxyServer; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -47,7 +44,7 @@ public void transportTypeAmqpCreatesConnectionWithPort5671() throws Exception { connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); @@ -76,7 +73,7 @@ public void transportTypeAmqpWebSocketsCreatesConnectionWithPort443() throws Exc connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort"); @@ -127,7 +124,7 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { connectionHandlerField.setAccessible(true); final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory); - final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort"); + final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort"); outboundSocketPort.setAccessible(true); final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");