Skip to content

Commit

Permalink
Merge pull request #382 from Azure/dev
Browse files Browse the repository at this point in the history
release eventhubs java client 1.2.0
  • Loading branch information
SreeramGarlapati committed Sep 21, 2018
2 parents 13ee706 + f8493e5 commit ca94465
Show file tree
Hide file tree
Showing 22 changed files with 780 additions and 41 deletions.
4 changes: 2 additions & 2 deletions ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down Expand Up @@ -199,7 +199,7 @@ an outdated offset.
Azure Event Hubs requires using the AMQP 1.0 protocol for consuming events.

AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using
TCP port 5672. For the WebSocket binding of AMQP, traffic flows via port 443.
TCP port 5671. For the WebSocket binding of AMQP, traffic flows via port 443.

## Connection Strings

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<version>2.0.1</version>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import org.apache.qpid.proton.engine.Link;

public interface AmqpConnection {

/**
* Host name intended to be used on Amqp Connection Open frame
* @return host name
*/
String getHostName();

void onOpenComplete(Exception exception);

void onConnectionError(ErrorCondition error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public final class AmqpErrorCode {

// connection errors
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");

// proton library introduced this amqpsymbol in their code-base to communicate IOExceptions
// while performing operations on SocketChannel (in IOHandler.java)
public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol("proton:io");
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ClientConstants {
public final static String NO_RETRY = "NoRetry";
public final static String DEFAULT_RETRY = "Default";
public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "1.1.0";
public final static String CURRENT_JAVACLIENT_VERSION = "1.2.0";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
Expand Down Expand Up @@ -74,6 +74,7 @@ public final class ClientConstants {
public static final Symbol LAST_ENQUEUED_TIME_UTC = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC);
public static final String AMQP_REQUEST_FAILED_ERROR = "status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
public static final int MAX_RECEIVER_NAME_LENGTH = 64;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.TransportType;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand All @@ -27,12 +28,30 @@ public class ConnectionHandler extends BaseHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

private final AmqpConnection messagingFactory;
private final AmqpConnection amqpConnection;

public ConnectionHandler(final AmqpConnection messagingFactory) {
protected ConnectionHandler(final AmqpConnection amqpConnection) {

add(new Handshaker());
this.messagingFactory = messagingFactory;
this.amqpConnection = amqpConnection;
}

static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
return new WebSocketProxyConnectionHandler(amqpConnection);
} else {
return new WebSocketConnectionHandler(amqpConnection);
}
case AMQP:
default:
return new ConnectionHandler(amqpConnection);
}
}

protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}

private static SslDomain makeDomain(SslDomain.Mode mode) {
Expand All @@ -49,7 +68,10 @@ private static SslDomain makeDomain(SslDomain.Mode mode) {
public void onConnectionInit(Event event) {

final Connection connection = event.getConnection();
final String hostName = event.getReactor().getConnectionAddress(connection);
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();

connection.setHostname(hostName);
connection.setContainer(StringUtil.getRandomString());
Expand All @@ -72,9 +94,37 @@ public void onConnectionInit(Event event) {
}

protected void addTransportLayers(final Event event, final TransportInternal transport) {
final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

protected void notifyTransportErrors(final Event event) {
// no-op
}

protected int getPort() {
/**
* HostName to be used for socket creation.
* for ex: in case of proxy server - this could be proxy ip address
* @return host name
*/
public String getRemoteHostName() {
return amqpConnection.getHostName();
}

/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
* @return port
*/
protected int getRemotePort() {
return this.getProtocolPort();
}

/**
* Port used on connection open frame
* @return port
*/
protected int getProtocolPort() {
return ClientConstants.AMQPS_PORT;
}

Expand All @@ -88,9 +138,6 @@ public void onConnectionBound(Event event) {
final Transport transport = event.getTransport();

this.addTransportLayers(event, (TransportInternal) transport);

final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

@Override
Expand Down Expand Up @@ -121,11 +168,13 @@ public void onTransportError(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}

// onTransportError event is not handled by the global IO Handler for cleanup
transport.unbind();

this.notifyTransportErrors(event);
}

@Override
Expand All @@ -142,7 +191,7 @@ public void onTransportClosed(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}
}

Expand All @@ -153,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) {
TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]");
}

this.messagingFactory.onOpenComplete(null);
this.amqpConnection.onOpenComplete(null);
}

@Override
Expand Down Expand Up @@ -191,6 +240,6 @@ public void onConnectionRemoteClose(Event event) {
: "]"));
}

this.messagingFactory.onConnectionError(error);
this.amqpConnection.onConnectionError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
* It will be truncated to 128 characters
*/
public static String USER_AGENT = null;

private final String eventHubName;
private final Object senderCreateSync;
private volatile boolean isSenderCreateStarted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ public void onEvent() {

private void drainPendingReceives(final Exception exception) {
WorkItem<Collection<Message>> workItem;
final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException);
final boolean shouldReturnNull = (exception == null
|| (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient()));

while ((workItem = this.pendingReceives.poll()) != null) {
final CompletableFuture<Collection<Message>> future = workItem.getWork();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.TransportType;

/**
* Abstracts all amqp related details and exposes AmqpConnection object
Expand Down Expand Up @@ -82,10 +81,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
this.retryPolicy = retryPolicy;
this.registeredLinks = new LinkedList<>();
this.reactorLock = new Object();
this.connectionHandler =
builder.getTransportType() == TransportType.AMQP
? new ConnectionHandler(this)
: new WebSocketConnectionHandler(this);
this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this);
this.cbsChannelCreateLock = new Object();
this.mgmtChannelCreateLock = new Object();
this.tokenProvider = builder.getSharedAccessSignature() == null
Expand Down Expand Up @@ -146,6 +142,7 @@ public void run() {
return messagingFactory.open;
}

@Override
public String getHostName() {
return this.hostName;
}
Expand Down Expand Up @@ -174,7 +171,10 @@ public void onReactorInit(Event e) {
super.onReactorInit(e);

final Reactor r = e.getReactor();
connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler);
connection = r.connectionToHost(
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
});
}
Expand Down Expand Up @@ -219,7 +219,10 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes
}

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(this.hostName, this.connectionHandler.getPort(), this.connectionHandler);
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getRemoteHostName(),
this.connectionHandler.getRemotePort(),
this.connectionHandler);
}

final Session session = this.connection.session();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
import org.slf4j.LoggerFactory;

public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
protected void addTransportLayers(final Event event, final TransportInternal transport) {
final String hostName = event.getConnection().getHostname();

final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(
event.getConnection().getHostname(),
hostName,
"/$servicebus/websocket",
null,
"",
0,
"AMQPWSB10",
null,
Expand All @@ -32,12 +34,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra
transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]");
}

super.addTransportLayers(event, transport);
}

@Override
protected int getPort() {
protected int getProtocolPort() {
return ClientConstants.HTTPS_PORT;
}

Expand Down
Loading

0 comments on commit ca94465

Please sign in to comment.