Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup: websockets proxy support #381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@ public class ConnectionHandler extends BaseHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

private final AmqpConnection messagingFactory;
private final AmqpConnection amqpConnection;

protected ConnectionHandler(final AmqpConnection messagingFactory) {
protected ConnectionHandler(final AmqpConnection amqpConnection) {

add(new Handshaker());
this.messagingFactory = messagingFactory;
this.amqpConnection = amqpConnection;
}

static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) {
static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
return new WebSocketProxyConnectionHandler(messagingFactory);
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
return new WebSocketProxyConnectionHandler(amqpConnection);
} else {
return new WebSocketConnectionHandler(messagingFactory);
return new WebSocketConnectionHandler(amqpConnection);
}
case AMQP:
default:
return new ConnectionHandler(messagingFactory);
return new ConnectionHandler(amqpConnection);
}
}

protected AmqpConnection getMessagingFactory() {
return this.messagingFactory;
protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}

private static SslDomain makeDomain(SslDomain.Mode mode) {
Expand All @@ -68,7 +68,7 @@ private static SslDomain makeDomain(SslDomain.Mode mode) {
public void onConnectionInit(Event event) {

final Connection connection = event.getConnection();
final String hostName = new StringBuilder(this.messagingFactory.getHostName())
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();
Expand Down Expand Up @@ -107,16 +107,16 @@ protected void notifyTransportErrors(final Event event) {
* for ex: in case of proxy server - this could be proxy ip address
* @return host name
*/
public String getOutboundSocketHostName() {
return messagingFactory.getHostName();
public String getRemoteHostName() {
return amqpConnection.getHostName();
}

/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
* @return port
*/
protected int getOutboundSocketPort() {
protected int getRemotePort() {
return this.getProtocolPort();
}

Expand Down Expand Up @@ -168,7 +168,7 @@ public void onTransportError(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}

// onTransportError event is not handled by the global IO Handler for cleanup
Expand All @@ -191,7 +191,7 @@ public void onTransportClosed(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}
}

Expand All @@ -202,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) {
TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]");
}

this.messagingFactory.onOpenComplete(null);
this.amqpConnection.onOpenComplete(null);
}

@Override
Expand Down Expand Up @@ -240,6 +240,6 @@ public void onConnectionRemoteClose(Event event) {
: "]"));
}

this.messagingFactory.onConnectionError(error);
this.amqpConnection.onConnectionError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ public void onEvent() {

private void drainPendingReceives(final Exception exception) {
WorkItem<Collection<Message>> workItem;
final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException);
final boolean shouldReturnNull = (exception == null
|| (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient()));

while ((workItem = this.pendingReceives.poll()) != null) {
final CompletableFuture<Collection<Message>> future = workItem.getWork();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.TransportType;

/**
* Abstracts all amqp related details and exposes AmqpConnection object
Expand Down Expand Up @@ -173,8 +172,8 @@ public void onReactorInit(Event e) {

final Reactor r = e.getReactor();
connection = r.connectionToHost(
connectionHandler.getOutboundSocketHostName(),
connectionHandler.getOutboundSocketPort(),
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
});
Expand Down Expand Up @@ -221,8 +220,8 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getOutboundSocketHostName(),
this.connectionHandler.getOutboundSocketPort(),
this.connectionHandler.getRemoteHostName(),
this.connectionHandler.getRemotePort(),
this.connectionHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static Boolean shouldUseProxy(final String hostName) {
return isProxyAddressLegal(proxies);
}

public WebSocketProxyConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketProxyConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
Expand Down Expand Up @@ -102,28 +102,28 @@ protected void notifyTransportErrors(final Event event) {

final IOException ioException = reconstructIOException(errorCondition);
proxySelector.connectFailed(
createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()),
createURIFromHostNamePort(this.getAmqpConnection().getHostName(), this.getProtocolPort()),
new InetSocketAddress(hostNameParts[0], port),
ioException);
}

@Override
public String getOutboundSocketHostName() {
public String getRemoteHostName() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getHostString();
}

@Override
public int getOutboundSocketPort() {
public int getRemotePort() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getPort();
}

private Map<String, String> getAuthorizationHeader() {
final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication(
getOutboundSocketHostName(),
getRemoteHostName(),
null,
getOutboundSocketPort(),
getRemotePort(),
null,
null,
"http",
Expand Down Expand Up @@ -153,7 +153,7 @@ private Map<String, String> getAuthorizationHeader() {

private InetSocketAddress getProxyAddress() {
final URI serviceUri = createURIFromHostNamePort(
this.getMessagingFactory().getHostName(),
this.getAmqpConnection().getHostName(),
this.getProtocolPort());
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.TransportType;
import com.microsoft.azure.eventhubs.impl.ConnectionHandler;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
Expand All @@ -15,9 +14,7 @@
import com.microsoft.azure.eventhubs.lib.TestContext;

import org.jutils.jproxy.ProxyServer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -47,7 +44,7 @@ public void transportTypeAmqpCreatesConnectionWithPort5671() throws Exception {
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down Expand Up @@ -76,7 +73,7 @@ public void transportTypeAmqpWebSocketsCreatesConnectionWithPort443() throws Exc
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down Expand Up @@ -127,7 +124,7 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
connectionHandlerField.setAccessible(true);
final ConnectionHandler connectionHandler = (ConnectionHandler) connectionHandlerField.get(underlyingFactory);

final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getOutboundSocketPort");
final Method outboundSocketPort = ConnectionHandler.class.getDeclaredMethod("getRemotePort");
outboundSocketPort.setAccessible(true);

final Method protocolPort = ConnectionHandler.class.getDeclaredMethod("getProtocolPort");
Expand Down