Skip to content

Commit

Permalink
Websockets support (#362)
Browse files Browse the repository at this point in the history
1. Supports max_frame_size of 4k (current limitation of <a href="https://github.com/Azure/qpid-proton-j-extensions">qpid-proton-j-extensions</a> library)

2. doesn't support PROXY on websockets - this will follow.

Amqp over WebSockets is particularly used when enterprise policies (ex: firewall outbound port rules) doesn't allow traffic on the default Amqp secure port (`5671`).
To send or receive over websockets - which uses port `443`, set the `TransportType` on `ConnectionStringBuilder`, like this:

```
connectionStringBuilder.setTransportType(TransportType.AmqpWebSockets)
```

related: #264
  • Loading branch information
SreeramGarlapati committed Aug 9, 2018
1 parent 8a901ab commit 42d7363
Show file tree
Hide file tree
Showing 25 changed files with 389 additions and 47 deletions.
2 changes: 1 addition & 1 deletion azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.0.2</version>
<version>1.1.0-SNAPSHOT</version>
</parent>

<version>2.0.1</version>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.0.2</version>
<version>1.1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public final class EventHubsAppender extends AbstractAppender {
private static final int MAX_BATCH_SIZE_BYTES = 200 * 1024;

// this constant is tuned to use the MaximumAllowedMessageSize(256K) including Amqp-Headers for a LogEvent of 1Char
// this constant is tuned to use the MaximumAllowedMessageSize(256K) including AMQP-Headers for a LogEvent of 1Char
private static final int MAX_BATCH_SIZE = 21312;
private static final long serialVersionUID = 1L;

Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.0.2</version>
<version>1.1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* </ul>
*/
public final class ConnectionStringBuilder {

final static String endpointFormat = "sb://%s.%s";
final static String hostnameFormat = "sb://%s";
final static String defaultDomainName = "servicebus.windows.net";
Expand All @@ -60,10 +61,11 @@ public final class ConnectionStringBuilder {
final static String SharedAccessKeyNameConfigName = "SharedAccessKeyName"; // We use a (KeyName, Key) pair OR the SAS token - never both.
final static String SharedAccessKeyConfigName = "SharedAccessKey";
final static String SharedAccessSignatureConfigName = "SharedAccessSignature";
final static String TransportTypeConfigName = "TransportType";

private static final String AllKeyEnumerateRegex = "(" + HostnameConfigName + "|" + EndpointConfigName + "|" + SharedAccessKeyNameConfigName
+ "|" + SharedAccessKeyConfigName + "|" + SharedAccessSignatureConfigName + "|" + EntityPathConfigName + "|" + OperationTimeoutConfigName
+ "|" + ")";
+ "|" + TransportTypeConfigName + ")";

private static final String KeysWithDelimitersRegex = KeyValuePairDelimiter + AllKeyEnumerateRegex
+ KeyValueSeparator;
Expand All @@ -74,6 +76,7 @@ public final class ConnectionStringBuilder {
private String sharedAccessKey;
private String sharedAccessSignature;
private Duration operationTimeout;
private TransportType transportType;

/**
* Creates an empty {@link ConnectionStringBuilder}. At minimum, a namespace name, an entity path, SAS key name, and SAS key
Expand Down Expand Up @@ -256,6 +259,27 @@ public ConnectionStringBuilder setOperationTimeout(final Duration operationTimeo
return this;
}

/**
* TransportType on which all the communication for the EventHub objects created using this ConnectionString.
* Default value is {@link TransportType#AMQP}.
*
* @return transportType
*/
public TransportType getTransportType() {
return (this.transportType == null ? TransportType.AMQP : transportType);
}

/**
* Set the TransportType value in the Connection String. If no TransportType is set, this defaults to {@link TransportType#AMQP}.
*
* @param transportType Transport Type
* @return the {@link ConnectionStringBuilder} instance being set.
*/
public ConnectionStringBuilder setTransportType(final TransportType transportType) {
this.transportType = transportType;
return this;
}

/**
* Returns an inter-operable connection string that can be used to connect to EventHubs instances.
*
Expand Down Expand Up @@ -294,6 +318,11 @@ public String toString() {
KeyValueSeparator, this.operationTimeout.toString(), KeyValuePairDelimiter));
}

if (this.transportType != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", TransportTypeConfigName,
KeyValueSeparator, this.transportType.toString(), KeyValuePairDelimiter));
}

connectionStringBuilder.deleteCharAt(connectionStringBuilder.length() - 1);
return connectionStringBuilder.toString();
}
Expand Down Expand Up @@ -373,6 +402,14 @@ private void parseConnectionString(final String connectionString) {
} catch (DateTimeParseException exception) {
throw new IllegalConnectionStringFormatException("Invalid value specified for property 'Duration' in the ConnectionString.", exception);
}
} else if (key.equalsIgnoreCase(TransportTypeConfigName)) {
try {
this.transportType = TransportType.fromString(values[valueIndex]);
} catch (IllegalArgumentException exception) {
throw new IllegalConnectionStringFormatException(
String.format("Invalid value specified for property '%s' in the ConnectionString.", TransportTypeConfigName),
exception);
}
} else {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Illegal connection string parameter name: %s", key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package com.microsoft.azure.eventhubs;

/**
* This exception is thrown when the underlying Amqp layer encounter an abnormal link abort or disconnect of connection in an unexpected fashion.
* This exception is thrown when the underlying AMQP layer encounter an abnormal link abort or disconnect of connection in an unexpected fashion.
*/
public class OperationCancelledException extends EventHubException {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs;

/**
* All TransportType switches available for communicating to EventHubs service.
*/
public enum TransportType {
/**
* AMQP over TCP. Uses port 5671 - assigned by IANA for secure AMQP (AMQPS).
*/
AMQP("Amqp"),

/**
* AMQP over Web Sockets. Uses port 443.
*/
AMQP_WEB_SOCKETS("AmqpWebSockets");

private final String value;

TransportType(final String value) {
this.value = value;
}

@Override
public String toString() {
return this.value;
}

static TransportType fromString(final String value) {
for (TransportType transportType : values()) {
if (transportType.value.equalsIgnoreCase(value)) {
return transportType;
}
}

throw new IllegalArgumentException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

public final class ClientConstants {
public final static int AMQPS_PORT = 5671;
public final static int HTTPS_PORT = 443;
public final static int MAX_PARTITION_KEY_LENGTH = 128;
public final static Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy");
public final static Symbol ARGUMENT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-error");
Expand Down Expand Up @@ -37,7 +38,7 @@ public final class ClientConstants {
public final static String NO_RETRY = "NoRetry";
public final static String DEFAULT_RETRY = "Default";
public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "1.0.2";
public final static String CURRENT_JAVACLIENT_VERSION = "1.1.0-SNAPSHOT";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
Expand Down Expand Up @@ -74,6 +75,7 @@ public final class ClientConstants {
public static final String AMQP_REQUEST_FAILED_ERROR = "status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
public static final int MAX_RECEIVER_NAME_LENGTH = 64;

/**
* This is a constant defined to represent the start of a partition stream in EventHub.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.Handshaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,7 +23,7 @@

// ServiceBus <-> ProtonReactor interaction handles all
// amqp_connection/transport related events from reactor
public final class ConnectionHandler extends BaseHandler {
public class ConnectionHandler extends BaseHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

Expand Down Expand Up @@ -65,11 +71,24 @@ public void onConnectionInit(Event event) {
connection.open();
}

protected void addTransportLayers(final Event event, final TransportInternal transport) {
}

protected int getPort() {
return ClientConstants.AMQPS_PORT;
}

protected int getMaxFrameSize() {
return AmqpConstants.MAX_FRAME_SIZE;
}

@Override
public void onConnectionBound(Event event) {

final Transport transport = event.getTransport();

this.addTransportLayers(event, (TransportInternal) transport);

final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void onError(Exception error) {
this.underlyingFactory);
}

// @param connection Connection on which the MessageReceiver's receive Amqp link need to be created on.
// @param connection Connection on which the MessageReceiver's receive AMQP link need to be created on.
// Connection has to be associated with Reactor before Creating a receiver on it.
public static CompletableFuture<MessageReceiver> create(
final MessagingFactory factory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.TransportType;

/**
* Abstracts all amqp related details and exposes AmqpConnection object
Expand Down Expand Up @@ -81,7 +82,10 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
this.retryPolicy = retryPolicy;
this.registeredLinks = new LinkedList<>();
this.reactorLock = new Object();
this.connectionHandler = new ConnectionHandler(this);
this.connectionHandler =
builder.getTransportType() == TransportType.AMQP
? new ConnectionHandler(this)
: new WebSocketConnectionHandler(this);
this.cbsChannelCreateLock = new Object();
this.mgmtChannelCreateLock = new Object();
this.tokenProvider = builder.getSharedAccessSignature() == null
Expand Down Expand Up @@ -170,13 +174,13 @@ public void onReactorInit(Event e) {
super.onReactorInit(e);

final Reactor r = e.getReactor();
connection = r.connectionToHost(hostName, ClientConstants.AMQPS_PORT, connectionHandler);
connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler);
}
});
}

private void startReactor(final ReactorHandler reactorHandler) throws IOException {
final Reactor newReactor = this.reactorFactory.create(reactorHandler);
final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize());
synchronized (this.reactorLock) {
this.reactor = newReactor;
this.reactorScheduler = new ReactorDispatcher(newReactor);
Expand Down Expand Up @@ -215,7 +219,7 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes
}

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(this.hostName, ClientConstants.AMQPS_PORT, this.connectionHandler);
this.connection = this.getReactor().connectionToHost(this.hostName, this.connectionHandler.getPort(), this.connectionHandler);
}

final Session session = this.connection.session();
Expand Down Expand Up @@ -381,8 +385,8 @@ public void scheduleOnReactorThread(final int delay, final DispatchHandler handl

public static class ReactorFactory {

public Reactor create(final ReactorHandler reactorHandler) throws IOException {
return ProtonUtil.reactor(reactorHandler);
public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException {
return ProtonUtil.reactor(reactorHandler, maxFrameSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public final class ProtonUtil {
private ProtonUtil() {
}

public static Reactor reactor(ReactorHandler reactorHandler) throws IOException {
public static Reactor reactor(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException {

final ReactorOptions reactorOptions = new ReactorOptions();
reactorOptions.setMaxFrameSize(AmqpConstants.MAX_FRAME_SIZE);
reactorOptions.setMaxFrameSize(maxFrameSize);
reactorOptions.setEnableSaslByDefault(true);

final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
}

@Override
protected void addTransportLayers(final Event event, final TransportInternal transport) {
final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(
event.getConnection().getHostname(),
"/$servicebus/websocket",
null,
0,
"AMQPWSB10",
null,
null);

transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
}
}

@Override
protected int getPort() {
return ClientConstants.HTTPS_PORT;
}

@Override
protected int getMaxFrameSize() {
// This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions
// once, this library enables larger frames - this property can be removed.
return 4 * 1024;
}
}
Loading

0 comments on commit 42d7363

Please sign in to comment.