Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 209 additions & 159 deletions src/main/java/com/microsoft/azure/relay/ClientWebSocket.java

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.microsoft.azure.relay;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class HttpClientProvider {
public HttpClient getHttpClient() {
return new HttpClient(new SslContextFactory.Client());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import java.nio.channels.Channel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.websocket.CloseReason;

import org.eclipse.jetty.websocket.api.CloseStatus;

public interface HybridConnectionChannel extends Channel {

Expand All @@ -21,12 +22,12 @@ public interface HybridConnectionChannel extends Channel {
public CompletableFuture<Void> closeAsync();

/**
* Closes the connection with the remote websocket with a given CloseReason
* Closes the connection with the remote websocket with a given CloseStatus
*
* @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
* @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
public CompletableFuture<Void> closeAsync(CloseReason reason);
public CompletableFuture<Void> closeAsync(CloseStatus closeStatus);

/**
* Receives byte messages from the remote sender asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import javax.websocket.ClientEndpointConfig;

public class HybridConnectionClient implements RelayTraceSource {
static final AutoShutdownScheduledExecutor EXECUTOR = AutoShutdownScheduledExecutor.Create();
static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(70);
static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments()
.toString().indexOf("-agentlib:jdwp") > 0;
private HttpClientProvider httpClientProvider;
private String cachedString;
private TrackingContext trackingContext;
private URI address;
Expand Down Expand Up @@ -131,6 +130,14 @@ public HybridConnectionClient(String connectionString, String path) throws URISy
tokenProvider, tokenProvider != null);
}

public HttpClientProvider getHttpClientProvider() {
return httpClientProvider;
}

public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
this.httpClientProvider = httpClientProvider;
}

/**
* The address on which this HybridConnection will connect to. This address should be of the format
* "sb://contoso.servicebus.windows.net/yourhybridconnection".
Expand Down Expand Up @@ -198,19 +205,16 @@ public CompletableFuture<HybridConnectionChannel> createConnectionAsync(Map<Stri
// Set the authentication in request header
Map<String, List<String>> headers = new HashMap<String, List<String>>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
configurator.addHeaders(headers);
if (customHeaders != null) {
configurator.addHeaders(customHeaders);
headers.putAll(customHeaders);
}
ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();

try {
URI uri = HybridConnectionUtil.buildUri(this.address.getHost(), this.address.getPort(),
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.CONNECT,
trackingContext.getTrackingId());
WebSocketChannel channel = new WebSocketChannel(trackingContext, EXECUTOR);
return channel.getWebSocket().connectAsync(uri, this.operationTimeout, config).thenApply($void -> channel);
WebSocketChannel channel = new WebSocketChannel(trackingContext, this.getHttpClientProvider(), EXECUTOR);
return channel.getWebSocket().connectAsync(uri, this.operationTimeout, headers).thenApply($void -> channel);
} catch (URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
}
Expand Down Expand Up @@ -251,5 +255,6 @@ private void initialize(URI address, Duration operationTimeout, TokenProvider to
this.address = address;
this.tokenProvider = tokenProvider;
this.operationTimeout = operationTimeout;
this.httpClientProvider = new HttpClientProvider();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;

import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.json.JSONObject;

Expand All @@ -36,6 +34,7 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private final Object thisLock = new Object();
private final AtomicBoolean openCalled;
private final AtomicBoolean closeCalled;
private HttpClientProvider httpClientProvider;
private Duration operationTimeout;
private int maxWebSocketBufferSize;
private String cachedString;
Expand Down Expand Up @@ -76,6 +75,7 @@ public HybridConnectionListener(URI address, TokenProvider tokenProvider) {
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
this.httpClientProvider = new HttpClientProvider();
}

/**
Expand Down Expand Up @@ -152,12 +152,21 @@ public HybridConnectionListener(String connectionString, String path) throws URI
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
this.httpClientProvider = new HttpClientProvider();
}

public boolean isOnline() {
return this.controlConnection.isOnline();
}

public HttpClientProvider getHttpClientProvider() {
return httpClientProvider;
}

public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
this.httpClientProvider = httpClientProvider;
}

public Function<RelayedHttpListenerContext, Boolean> getAcceptHandler() {
return acceptHandler;
}
Expand Down Expand Up @@ -345,7 +354,7 @@ public CompletableFuture<Void> closeAsync(Duration timeout) {
closeTasks = new CompletableFuture<?>[this.connectionInputQueue.getPendingCount()];
for (int i = 0; i < this.connectionInputQueue.getPendingCount(); i++) {
closeTasks[i] = this.connectionInputQueue.dequeueAsync(timeoutHelper.remainingTime()).thenAccept(connection -> {
connection.closeAsync(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Client closing the socket normally"));
connection.closeAsync(new CloseStatus(StatusCode.NORMAL, "Client closing the socket normally"));
});
}
}
Expand Down Expand Up @@ -480,7 +489,7 @@ private CompletableFuture<Void> completeAcceptAsync(RelayedHttpListenerContext l

if (shouldAccept) {
synchronized (this.thisLock) {
WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), EXECUTOR);
WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), this.getHttpClientProvider(), EXECUTOR);

if (this.closeCalled.get()) {
RelayLogger.logEvent("rendezvousClose", this, rendezvousUri.toString());
Expand Down Expand Up @@ -594,9 +603,9 @@ public CompletableFuture<Void> openAsync(Duration timeout) {
}).whenComplete(($void, err) -> {
if (err != null) {
RelayLogger.throwingException(err, this.listener);
CloseReason closeReason = new CloseReason(CloseCodes.UNEXPECTED_CONDITION,
CloseStatus closeStatus = new CloseStatus(StatusCode.ABNORMAL,
"closing web socket connection because something went wrong trying to connect.");
this.closeOrAbortWebSocketAsync(connectTask, closeReason);
this.closeOrAbortWebSocketAsync(connectTask, closeStatus);
throw new CompletionException(err);
}
});
Expand Down Expand Up @@ -625,8 +634,8 @@ private CompletableFuture<Void> closeAsync(Duration duration) {
if (connectTask != null) {
return connectTask.thenCompose((webSocket) -> {
return this.sendAsyncLock.acquireThenCompose(duration, () -> {
CloseReason reason = new CloseReason(CloseCodes.NORMAL_CLOSURE, "Normal Closure");
return webSocket.closeAsync(reason);
CloseStatus closeStatus = new CloseStatus(StatusCode.NORMAL, "Normal Closure");
return webSocket.closeAsync(closeStatus);
});
});
}
Expand Down Expand Up @@ -715,9 +724,6 @@ private CompletableFuture<ClientWebSocket> connectAsync(Duration timeout) {
// Set the authentication in request header
Map<String, List<String>> headers = new HashMap<String, List<String>>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
configurator.addHeaders(headers);
ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();

// When we reconnect we need to remove the "_GXX" suffix otherwise trackingId
// gets longer after each reconnect
Expand All @@ -729,13 +735,13 @@ private CompletableFuture<ClientWebSocket> connectAsync(Duration timeout) {
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.LISTEN,
trackingId);

ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, EXECUTOR);
ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, this.listener.getHttpClientProvider(), EXECUTOR);
return delayTask.thenCompose(($void) -> {
if (this.listener.injectedFault != null && this.listener.injectedFault instanceof UpgradeException) {
return CompletableFutureUtil.fromException(this.listener.injectedFault);
}

return webSocket.connectAsync(websocketUri, timeout, config)
return webSocket.connectAsync(websocketUri, timeout, headers)
.thenApply(($void2) -> {
this.onOnline();
return webSocket;
Expand All @@ -747,7 +753,7 @@ private CompletableFuture<ClientWebSocket> connectAsync(Duration timeout) {
}
}

private CompletableFuture<Void> closeOrAbortWebSocketAsync(CompletableFuture<ClientWebSocket> connectTask, CloseReason reason) {
private CompletableFuture<Void> closeOrAbortWebSocketAsync(CompletableFuture<ClientWebSocket> connectTask, CloseStatus closeStatus) {
assert CompletableFutureUtil.isDoneNormally(connectTask);

synchronized (this.thisLock) {
Expand All @@ -756,7 +762,7 @@ private CompletableFuture<Void> closeOrAbortWebSocketAsync(CompletableFuture<Cli
}
}

return connectTask.thenCompose((webSocket) -> webSocket.closeAsync(reason))
return connectTask.thenCompose((webSocket) -> webSocket.closeAsync(closeStatus))
.exceptionally((exception) -> {
// catch and do not rethrow
RelayLogger.handledExceptionAsWarning(exception, this.listener);
Expand Down Expand Up @@ -823,8 +829,8 @@ private CompletableFuture<Boolean> receivePumpCoreAsync() {
keepGoing = false;
}
else {
CloseReason reason = webSocket.getCloseReason();
keepGoing = this.onDisconnect(new ConnectionLostException(reason.toString()));
CloseStatus closeStatus = webSocket.getCloseReason();
keepGoing = this.onDisconnect(new ConnectionLostException(closeStatus.toString()));
}
return keepGoing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;

import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.json.JSONObject;

class HybridHttpConnection implements RelayTraceSource {
Expand Down Expand Up @@ -270,7 +270,7 @@ private CompletableFuture<Void> sendBytesOverRendezvousAsync(ByteBuffer buffer,
private CompletableFuture<Void> ensureRendezvousAsync(Duration timeout) throws CompletionException {
if (this.rendezvousWebSocket == null) {
RelayLogger.logEvent("httpCreateRendezvous", this);
this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.executor);
this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.controlWebSocket.getHttpClientProvider(), this.executor);
return this.rendezvousWebSocket.connectAsync(this.rendezvousAddress, timeout);
}
return CompletableFuture.completedFuture(null);
Expand All @@ -280,7 +280,7 @@ private CompletableFuture<Void> closeRendezvousAsync() {
if (this.rendezvousWebSocket != null) {
RelayLogger.logEvent("closing", this);
return this.rendezvousWebSocket
.closeAsync(new CloseReason(CloseCodes.NORMAL_CLOSURE, "NormalClosure"))
.closeAsync(new CloseStatus(StatusCode.NORMAL, "NormalClosure"))
.thenRun(() -> RelayLogger.logEvent("closed", this));
} else {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ CompletableFuture<ClientWebSocket> acceptAsync(URI rendezvousUri) {
// clientWebSocket.Options.AddSubProtocol(subProtocol);
// }

ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rendezvousUri, ACCEPT_TIMEOUT).thenApply(result -> webSocket);
}

Expand All @@ -97,7 +97,7 @@ CompletableFuture<Void> rejectAsync(URI rendezvousUri) {
.append(URLEncoder.encode(this.response.getStatusDescription(), StringUtil.UTF8.name()));
URI rejectURI = new URI(builder.toString());

ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rejectURI, ACCEPT_TIMEOUT).thenCompose((result) -> webSocket.closeAsync());
} catch (IOException | URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.websocket.CloseReason;

import org.eclipse.jetty.websocket.api.CloseStatus;

public class WebSocketChannel implements HybridConnectionChannel {
private final ClientWebSocket websocket;
private final TrackingContext trackingContext;

WebSocketChannel(TrackingContext trackingContext, AutoShutdownScheduledExecutor executor) {
this(new ClientWebSocket(trackingContext, executor), trackingContext);
WebSocketChannel(TrackingContext trackingContext, HttpClientProvider httpClientProvider, AutoShutdownScheduledExecutor executor) {
this(new ClientWebSocket(trackingContext, httpClientProvider, executor), trackingContext);
}

WebSocketChannel(ClientWebSocket websocket, TrackingContext trackingContext) {
Expand Down Expand Up @@ -56,13 +57,13 @@ public CompletableFuture<Void> closeAsync() {
}

/**
* Closes the connection with the remote websocket with a given CloseReason
* Closes the connection with the remote websocket with a given CloseStatus
*
* @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
* @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
public CompletableFuture<Void> closeAsync(CloseReason reason) {
return this.websocket.closeAsync(reason);
public CompletableFuture<Void> closeAsync(CloseStatus closeStatus) {
return this.websocket.closeAsync(closeStatus);
}

/**
Expand Down
Loading