Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release eventhubs java client 1.2.0 #382

Merged
merged 2 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down Expand Up @@ -199,7 +199,7 @@ an outdated offset.
Azure Event Hubs requires using the AMQP 1.0 protocol for consuming events.

AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using
TCP port 5672. For the WebSocket binding of AMQP, traffic flows via port 443.
TCP port 5671. For the WebSocket binding of AMQP, traffic flows via port 443.

## Connection Strings

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<version>2.0.1</version>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import org.apache.qpid.proton.engine.Link;

public interface AmqpConnection {

/**
* Host name intended to be used on Amqp Connection Open frame
* @return host name
*/
String getHostName();

void onOpenComplete(Exception exception);

void onConnectionError(ErrorCondition error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public final class AmqpErrorCode {

// connection errors
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");

// proton library introduced this amqpsymbol in their code-base to communicate IOExceptions
// while performing operations on SocketChannel (in IOHandler.java)
public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol("proton:io");
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ClientConstants {
public final static String NO_RETRY = "NoRetry";
public final static String DEFAULT_RETRY = "Default";
public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "1.1.0";
public final static String CURRENT_JAVACLIENT_VERSION = "1.2.0";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
Expand Down Expand Up @@ -74,6 +74,7 @@ public final class ClientConstants {
public static final Symbol LAST_ENQUEUED_TIME_UTC = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC);
public static final String AMQP_REQUEST_FAILED_ERROR = "status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
public static final int MAX_RECEIVER_NAME_LENGTH = 64;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.TransportType;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand All @@ -27,12 +28,30 @@ public class ConnectionHandler extends BaseHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

private final AmqpConnection messagingFactory;
private final AmqpConnection amqpConnection;

public ConnectionHandler(final AmqpConnection messagingFactory) {
protected ConnectionHandler(final AmqpConnection amqpConnection) {

add(new Handshaker());
this.messagingFactory = messagingFactory;
this.amqpConnection = amqpConnection;
}

static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
return new WebSocketProxyConnectionHandler(amqpConnection);
} else {
return new WebSocketConnectionHandler(amqpConnection);
}
case AMQP:
default:
return new ConnectionHandler(amqpConnection);
}
}

protected AmqpConnection getAmqpConnection() {
return this.amqpConnection;
}

private static SslDomain makeDomain(SslDomain.Mode mode) {
Expand All @@ -49,7 +68,10 @@ private static SslDomain makeDomain(SslDomain.Mode mode) {
public void onConnectionInit(Event event) {

final Connection connection = event.getConnection();
final String hostName = event.getReactor().getConnectionAddress(connection);
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();

connection.setHostname(hostName);
connection.setContainer(StringUtil.getRandomString());
Expand All @@ -72,9 +94,37 @@ public void onConnectionInit(Event event) {
}

protected void addTransportLayers(final Event event, final TransportInternal transport) {
final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

protected void notifyTransportErrors(final Event event) {
// no-op
}

protected int getPort() {
/**
* HostName to be used for socket creation.
* for ex: in case of proxy server - this could be proxy ip address
* @return host name
*/
public String getRemoteHostName() {
return amqpConnection.getHostName();
}

/**
* port used to create socket.
* for ex: in case of talking to event hubs service via proxy - use proxy port
* @return port
*/
protected int getRemotePort() {
return this.getProtocolPort();
}

/**
* Port used on connection open frame
* @return port
*/
protected int getProtocolPort() {
return ClientConstants.AMQPS_PORT;
}

Expand All @@ -88,9 +138,6 @@ public void onConnectionBound(Event event) {
final Transport transport = event.getTransport();

this.addTransportLayers(event, (TransportInternal) transport);

final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);
}

@Override
Expand Down Expand Up @@ -121,11 +168,13 @@ public void onTransportError(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}

// onTransportError event is not handled by the global IO Handler for cleanup
transport.unbind();

this.notifyTransportErrors(event);
}

@Override
Expand All @@ -142,7 +191,7 @@ public void onTransportClosed(Event event) {
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
// if the remote-peer abruptly closes the connection without issuing close frame
// issue one
this.messagingFactory.onConnectionError(condition);
this.amqpConnection.onConnectionError(condition);
}
}

Expand All @@ -153,7 +202,7 @@ public void onConnectionRemoteOpen(Event event) {
TRACE_LOGGER.info("onConnectionRemoteOpen: hostname[" + event.getConnection().getHostname() + ", " + event.getConnection().getRemoteContainer() + "]");
}

this.messagingFactory.onOpenComplete(null);
this.amqpConnection.onOpenComplete(null);
}

@Override
Expand Down Expand Up @@ -191,6 +240,6 @@ public void onConnectionRemoteClose(Event event) {
: "]"));
}

this.messagingFactory.onConnectionError(error);
this.amqpConnection.onConnectionError(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
* It will be truncated to 128 characters
*/
public static String USER_AGENT = null;

private final String eventHubName;
private final Object senderCreateSync;
private volatile boolean isSenderCreateStarted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ public void onEvent() {

private void drainPendingReceives(final Exception exception) {
WorkItem<Collection<Message>> workItem;
final boolean shouldReturnNull = (exception == null || exception instanceof TimeoutException);
final boolean shouldReturnNull = (exception == null
|| (exception instanceof EventHubException && ((EventHubException) exception).getIsTransient()));

while ((workItem = this.pendingReceives.poll()) != null) {
final CompletableFuture<Collection<Message>> future = workItem.getWork();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.TimeoutException;
import com.microsoft.azure.eventhubs.TransportType;

/**
* Abstracts all amqp related details and exposes AmqpConnection object
Expand Down Expand Up @@ -82,10 +81,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti
this.retryPolicy = retryPolicy;
this.registeredLinks = new LinkedList<>();
this.reactorLock = new Object();
this.connectionHandler =
builder.getTransportType() == TransportType.AMQP
? new ConnectionHandler(this)
: new WebSocketConnectionHandler(this);
this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this);
this.cbsChannelCreateLock = new Object();
this.mgmtChannelCreateLock = new Object();
this.tokenProvider = builder.getSharedAccessSignature() == null
Expand Down Expand Up @@ -146,6 +142,7 @@ public void run() {
return messagingFactory.open;
}

@Override
public String getHostName() {
return this.hostName;
}
Expand Down Expand Up @@ -174,7 +171,10 @@ public void onReactorInit(Event e) {
super.onReactorInit(e);

final Reactor r = e.getReactor();
connection = r.connectionToHost(hostName, connectionHandler.getPort(), connectionHandler);
connection = r.connectionToHost(
connectionHandler.getRemoteHostName(),
connectionHandler.getRemotePort(),
connectionHandler);
}
});
}
Expand Down Expand Up @@ -219,7 +219,10 @@ public Session getSession(final String path, final Consumer<Session> onRemoteSes
}

if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
this.connection = this.getReactor().connectionToHost(this.hostName, this.connectionHandler.getPort(), this.connectionHandler);
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getRemoteHostName(),
this.connectionHandler.getRemotePort(),
this.connectionHandler);
}

final Session session = this.connection.session();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
import org.slf4j.LoggerFactory;

public class WebSocketConnectionHandler extends ConnectionHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(AmqpConnection messagingFactory) {
super(messagingFactory);
public WebSocketConnectionHandler(AmqpConnection amqpConnection) {
super(amqpConnection);
}

@Override
protected void addTransportLayers(final Event event, final TransportInternal transport) {
final String hostName = event.getConnection().getHostname();

final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(
event.getConnection().getHostname(),
hostName,
"/$servicebus/websocket",
null,
"",
0,
"AMQPWSB10",
null,
Expand All @@ -32,12 +34,14 @@ protected void addTransportLayers(final Event event, final TransportInternal tra
transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]");
}

super.addTransportLayers(event, transport);
}

@Override
protected int getPort() {
protected int getProtocolPort() {
return ClientConstants.HTTPS_PORT;
}

Expand Down
Loading