Skip to content

Commit

Permalink
followup: websockets proxy support (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramGarlapati committed Sep 21, 2018
1 parent b385527 commit a2959c0
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@ public class ConnectionHandler extends BaseHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

private final AmqpConnection messagingFactory;
private final AmqpConnection amqpConnection;

protected ConnectionHandler(final AmqpConnection messagingFactory) {
protected ConnectionHandler(final AmqpConnection amqpConnection) {

add(new Handshaker());
this.messagingFactory = messagingFactory;
this.amqpConnection = amqpConnection;
}

static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) {
static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
return new WebSocketProxyConnectionHandler(messagingFactory);
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
return new WebSocketProxyConnectionHandler(amqpConnection);
} else {
return new WebSocketConnectionHandler(messagingFactory);
return new WebSocketConnectionHandler(amqpConnection);
}
case AMQP:
default:
return new ConnectionHandler(messagingFactory);
return new ConnectionHandler(amqpConnection);
}
}

protected AmqpConnection getMessagingFactory() {
return this.messagingFactory;
protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}

private static SslDomain makeDomain(SslDomain.Mode mode) {
Expand All @@ -68,7 +68,7 @@ private static SslDomain makeDomain(SslDomain.Mode mode) {
public void onConnectionInit(Event event) {

final Connection connection = event.getConnection();
final String hostName = new StringBuilder(this.messagingFactory.getHostName())
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();
Expand Down Expand Up @@ -107,16 +107,16 @@ protected void notifyTransportErrors(final Event event) {
* for ex: in case of proxy server - this could be proxy ip address
* @return host name
*/
public String getOutboundSocketHostName() {
return messagingFactory.getHostName();
public String getRemoteHostName() {
return amqpConnection.getHostName();
}

/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
* @return port
*/
protected int getOutboundSocketPort() {
protected int getRemotePort() {
return this.getProtocolPort();
}

Expand Down Expand Up @@ -168,7 +168,7 @@ public void onTransportError(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}

// onTransportError event is not handled by the global IO Handler for cleanup
Expand All @@ -191,7 +191,7 @@ public void onTransportClosed(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}
}

Expand All @@ -202,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) {
TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]");
}

this.messagingFactory.onOpenComplete(null);
this.amqpConnection.onOpenComplete(null);
}

@Override
Expand Down Expand Up @@ -240,6 +240,6 @@ public void onConnectionRemoteClose(Event event) {
: "]"));
}

this.messagingFactory.onConnectionError(error);
this.amqpConnection.onConnectionError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ public void onEvent() {

private void drainPendingReceives(final Exception exception) {
WorkItem<Collection<Message>> workItem;
final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException);
final boolean shouldReturnNull = (exception == null
|| (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient()));

while ((workItem = this.pendingReceives.poll()) != null) {
final CompletableFuture<Collection<Message>> future = workItem.getWork();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.TransportType;

/**
* Abstracts all amqp related details and exposes AmqpConnection object
Expand Down Expand Up @@ -173,8 +172,8 @@ public void onReactorInit(Event e) {

final Reactor r = e.getReactor();
connection = r.connectionToHost(
connectionHandler.getOutboundSocketHostName(),
connectionHandler.getOutboundSocketPort(),
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
});
Expand Down Expand Up @@ -221,8 +220,8 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getOutboundSocketHostName(),
this.connectionHandler.getOutboundSocketPort(),
this.connectionHandler.getRemoteHostName(),
this.connectionHandler.getRemotePort(),
this.connectionHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static Boolean shouldUseProxy(final String hostName) {
return isProxyAddressLegal(proxies);
}

public WebSocketProxyConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketProxyConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
Expand Down Expand Up @@ -102,28 +102,28 @@ protected void notifyTransportErrors(final Event event) {

final IOException ioException = reconstructIOException(errorCondition);
proxySelector.connectFailed(
createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()),
createURIFromHostNamePort(this.getAmqpConnection().getHostName(), this.getProtocolPort()),
new InetSocketAddress(hostNameParts[0], port),
ioException);
}

@Override
public String getOutboundSocketHostName() {
public String getRemoteHostName() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getHostString();
}

@Override
public int getOutboundSocketPort() {
public int getRemotePort() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getPort();
}

private Map<String, String> getAuthorizationHeader() {
final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication(
getOutboundSocketHostName(),
getRemoteHostName(),
null,
getOutboundSocketPort(),
getRemotePort(),
null,
null,
"http",
Expand Down Expand Up @@ -153,7 +153,7 @@ private Map<String, String> getAuthorizationHeader() {

private InetSocketAddress getProxyAddress() {
final URI serviceUri = createURIFromHostNamePort(
this.getMessagingFactory().getHostName(),
this.getAmqpConnection().getHostName(),
this.getProtocolPort());
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.TransportType;
import com.microsoft.azure.eventhubs.impl.ConnectionHandler;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
Expand All @@ -15,9 +14,7 @@
import com.microsoft.azure.eventhubs.lib.TestContext;

import org.jutils.jproxy.ProxyServer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -47,7 +44,7 @@ public void transportTypeAmqpCreatesConnectionWithPort5671() throws Exception {
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down Expand Up @@ -76,7 +73,7 @@ public void transportTypeAmqpWebSocketsCreatesConnectionWithPort443() throws Exc
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down Expand Up @@ -127,7 +124,7 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down

0 comments on commit a2959c0

Please sign in to comment.