Skip to content

Commit

Permalink
WebSockets Proxy support (#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramGarlapati committed Sep 21, 2018
1 parent 9a96b08 commit 09f5243
Show file tree
Hide file tree
Showing 21 changed files with 773 additions and 31 deletions.
4 changes: 2 additions & 2 deletions ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down Expand Up @@ -199,7 +199,7 @@ an outdated offset.
Azure Event Hubs requires using the AMQP 1.0 protocol for consuming events.

AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using
TCP port 5672. For the WebSocket binding of AMQP, traffic flows via port 443.
TCP port 5671. For the WebSocket binding of AMQP, traffic flows via port 443.

## Connection Strings

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<version>2.0.1</version>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import org.apache.qpid.proton.engine.Link;

public interface AmqpConnection {

/**
* Host name intended to be used on Amqp Connection Open frame
* @return host name
*/
String getHostName();

void onOpenComplete(Exception exception);

void onConnectionError(ErrorCondition error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public final class AmqpErrorCode {

// connection errors
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");

// proton library introduced this amqpsymbol in their code-base to communicate IOExceptions
// while performing operations on SocketChannel (in IOHandler.java)
public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol("proton:io");
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ClientConstants {
public final static String NO_RETRY = "NoRetry";
public final static String DEFAULT_RETRY = "Default";
public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "1.1.0";
public final static String CURRENT_JAVACLIENT_VERSION = "1.2.0";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
Expand Down Expand Up @@ -74,6 +74,7 @@ public final class ClientConstants {
public static final Symbol LAST_ENQUEUED_TIME_UTC = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC);
public static final String AMQP_REQUEST_FAILED_ERROR = "status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
public static final int MAX_RECEIVER_NAME_LENGTH = 64;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.TransportType;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand All @@ -29,12 +30,30 @@ public class ConnectionHandler extends BaseHandler {

private final AmqpConnection messagingFactory;

public ConnectionHandler(final AmqpConnection messagingFactory) {
protected ConnectionHandler(final AmqpConnection messagingFactory) {

add(new Handshaker());
this.messagingFactory = messagingFactory;
}

static ConnectionHandler create(TransportType transportType, AmqpConnection messagingFactory) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
return new WebSocketProxyConnectionHandler(messagingFactory);
} else {
return new WebSocketConnectionHandler(messagingFactory);
}
case AMQP:
default:
return new ConnectionHandler(messagingFactory);
}
}

protected AmqpConnection getMessagingFactory() {
return this.messagingFactory;
}

private static SslDomain makeDomain(SslDomain.Mode mode) {

final SslDomain domain = Proton.sslDomain();
Expand All @@ -49,7 +68,10 @@ private static SslDomain makeDomain(SslDomain.Mode mode) {
public void onConnectionInit(Event event) {

final Connection connection = event.getConnection();
final String hostName = event.getReactor().getConnectionAddress(connection);
final String hostName = new StringBuilder(this.messagingFactory.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();

connection.setHostname(hostName);
connection.setContainer(StringUtil.getRandomString());
Expand All @@ -72,9 +94,37 @@ public void onConnectionInit(Event event) {
}

protected void addTransportLayers(final Event event, final TransportInternal transport) {
final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

protected void notifyTransportErrors(final Event event) {
// no-op
}

protected int getPort() {
/**
* HostName to be used for socket creation.
* for ex: in case of proxy server - this could be proxy ip address
* @return host name
*/
public String getOutboundSocketHostName() {
return messagingFactory.getHostName();
}

/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
* @return port
*/
protected int getOutboundSocketPort() {
return this.getProtocolPort();
}

/**
* Port used on connection open frame
* @return port
*/
protected int getProtocolPort() {
return ClientConstants.AMQPS_PORT;
}

Expand All @@ -88,9 +138,6 @@ public void onConnectionBound(Event event) {
final Transport transport = event.getTransport();

this.addTransportLayers(event, (TransportInternal) transport);

final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

@Override
Expand Down Expand Up @@ -126,6 +173,8 @@ public void onTransportError(Event event) {

// onTransportError event is not handled by the global IO Handler for cleanup
transport.unbind();

this.notifyTransportErrors(event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
* It will be truncated to 128 characters
*/
public static String USER_AGENT = null;

private final String eventHubName;
private final Object senderCreateSync;
private volatile boolean isSenderCreateStarted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
this.retryPolicy = retryPolicy;
this.registeredLinks = new LinkedList<>();
this.reactorLock = new Object();
this.connectionHandler =
builder.getTransportType() == TransportType.AMQP
? new ConnectionHandler(this)
: new WebSocketConnectionHandler(this);
this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this);
this.cbsChannelCreateLock = new Object();
this.mgmtChannelCreateLock = new Object();
this.tokenProvider = builder.getSharedAccessSignature() == null
Expand Down Expand Up @@ -146,6 +143,7 @@ public void run() {
return messagingFactory.open;
}

@Override
public String getHostName() {
return this.hostName;
}
Expand Down Expand Up @@ -174,7 +172,10 @@ public void onReactorInit(Event e) {
super.onReactorInit(e);

final Reactor r = e.getReactor();
connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler);
connection = r.connectionToHost(
connectionHandler.getOutboundSocketHostName(),
connectionHandler.getOutboundSocketPort(),
connectionHandler);
}
});
}
Expand Down Expand Up @@ -219,7 +220,10 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes
}

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(this.hostName, this.connectionHandler.getPort(), this.connectionHandler);
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getOutboundSocketHostName(),
this.connectionHandler.getOutboundSocketPort(),
this.connectionHandler);
}

final Session session = this.connection.session();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
import org.slf4j.LoggerFactory;

public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
}

@Override
protected void addTransportLayers(final Event event, final TransportInternal transport) {
final String hostName = event.getConnection().getHostname();

final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(
event.getConnection().getHostname(),
hostName,
"/$servicebus/websocket",
null,
"",
0,
"AMQPWSB10",
null,
Expand All @@ -32,12 +34,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra
transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]");
}

super.addTransportLayers(event, transport);
}

@Override
protected int getPort() {
protected int getProtocolPort() {
return ClientConstants.HTTPS_PORT;
}

Expand Down
Loading

0 comments on commit 09f5243

Please sign in to comment.