diff --git a/.classpath b/.classpath
index 5e8a55f..cd575e5 100644
--- a/.classpath
+++ b/.classpath
@@ -23,5 +23,28 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.env.sample b/.env.sample
new file mode 100644
index 0000000..557fc39
--- /dev/null
+++ b/.env.sample
@@ -0,0 +1 @@
+RELAY_CONNECTION_STRING=
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 9e962d0..36b758f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,6 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
/src/test/resources/log4j2.xml
+
+# .env file
+.env
\ No newline at end of file
diff --git a/.project b/.project
index e098f33..26247bf 100644
--- a/.project
+++ b/.project
@@ -20,4 +20,15 @@
org.eclipse.jdt.core.javanature
org.eclipse.m2e.core.maven2Nature
+
+
+ 1667084030105
+
+ 30
+
+ org.eclipse.core.resources.regexFilterMatcher
+ node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__
+
+
+
diff --git a/pom.xml b/pom.xml
index 60169df..0fc4609 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,20 +31,28 @@
UTF-8
- 1.12.2
- 9.4.44.v20210927
1.8
1.8
- 1.7.0
- 1.2.17
+ 1.12.2
+ 2.0.3
+ 9.4.49.v20220914
+ 20231013
+ 2.19.0
+ 4.13.2
+ 5.2.2
-
- com.azure
- azure-identity
- ${azure.identity.version}
-
+
+ com.azure
+ azure-identity
+ ${azure.identity.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
org.eclipse.jetty.websocket
javax-websocket-client-impl
@@ -58,18 +66,31 @@
org.json
json
- 20231013
+ ${json.version}
junit
junit
- 4.13.1
+ ${junit.version}
test
- org.slf4j
- slf4j-api
- ${slf4j-version}
+ io.github.cdimascio
+ java-dotenv
+ ${java.dotenv.version}
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ ${log4j.version}
+ test
diff --git a/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java b/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
index 0bf3389..d1512d0 100644
--- a/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
+++ b/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
@@ -3,47 +3,65 @@
package com.microsoft.azure.relay;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.io.IOException;
-
-import javax.websocket.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.RuntimeIOException;
-import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.websocket.api.CloseStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
-class ClientWebSocket extends Endpoint implements RelayTraceSource {
+class ClientWebSocket extends WebSocketAdapter implements RelayTraceSource, WebSocketPingPongListener {
private final AutoShutdownScheduledExecutor executor;
- private final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
private final TrackingContext trackingContext;
- private Session session;
+ private final WebSocketClient wsClient;
+ private HttpClientProvider httpClientProvider;
private int maxMessageBufferSize = RelayConstants.DEFAULT_CONNECTION_BUFFER_SIZE;
- private CloseReason closeReason;
+ private CloseStatus closeStatus;
private InputQueue fragmentQueue;
private InputQueue textQueue;
private CompletableFuture closeTask;
private String cachedString;
+ private Duration keepAliveInterval = Duration.ofSeconds(RelayConstants.DEFAULT_PING_INTERVAL_SECONDS);
+ private Runnable keepAliveHandler;
/**
* Creates a websocket instance
*/
- public ClientWebSocket(TrackingContext trackingContext, AutoShutdownScheduledExecutor executor) {
+ public ClientWebSocket(TrackingContext trackingContext, HttpClientProvider httpClientProvider,
+ AutoShutdownScheduledExecutor executor) {
this.executor = executor;
this.textQueue = new InputQueue(this.executor);
this.fragmentQueue = new InputQueue(this.executor);
- this.closeReason = null;
this.trackingContext = trackingContext;
+ this.httpClientProvider = httpClientProvider;
+
+ wsClient = new WebSocketClient(httpClientProvider.getHttpClient());
}
-
+
+ public HttpClientProvider getHttpClientProvider() {
+ return httpClientProvider;
+ }
+
public TrackingContext getTrackingContext() {
return trackingContext;
}
-
+
@Override
public String toString() {
if (this.cachedString == null) {
@@ -51,9 +69,9 @@ public String toString() {
}
return this.cachedString;
}
-
- CloseReason getCloseReason() {
- return this.closeReason;
+
+ CloseStatus getCloseReason() {
+ return this.closeStatus;
}
int getMaxMessageBufferSize() {
@@ -63,12 +81,22 @@ int getMaxMessageBufferSize() {
void setMaxMessageBufferSize(int maxMessageBufferSize) {
if (maxMessageBufferSize > 0) {
this.maxMessageBufferSize = maxMessageBufferSize;
- this.container.setDefaultMaxTextMessageBufferSize(this.maxMessageBufferSize);
} else {
throw new IllegalArgumentException("MaxBufferSize of the web socket must be a positive value.");
}
}
+ void setKeepAliveInterval(Duration interval) {
+ this.keepAliveInterval = interval;
+ }
+
+ /**
+ * Sets the handler that will be run after the listener has received a keep alive (aka pong) response
+ */
+ void setKeepAliveHandler(Runnable onKeepAlive) {
+ this.keepAliveHandler = onKeepAlive;
+ }
+
/**
* Establish websocket connection between the control websocket and the cloud
* service if not already established.
@@ -96,7 +124,7 @@ public CompletableFuture connectAsync(URI uri) {
public CompletableFuture connectAsync(URI uri, Duration timeout) {
return this.connectAsync(uri, timeout, null);
}
-
+
/**
* Establish websocket connection between the control websocket and the cloud
* service if not already established.
@@ -109,35 +137,42 @@ public CompletableFuture connectAsync(URI uri, Duration timeout) {
* @throws CompletionException Throws when connection could not be established
* within the given timeout
*/
- public CompletableFuture connectAsync(URI uri, Duration timeout, ClientEndpointConfig config) {
+ public CompletableFuture connectAsync(URI uri, Duration timeout, Map> headers) {
if (this.isOpen()) {
return CompletableFutureUtil.fromException(new RuntimeIOException("This connection is already connected."));
}
- this.container.setDefaultMaxTextMessageBufferSize(this.maxMessageBufferSize);
return CompletableFutureUtil.timedRunAsync(timeout, () -> {
RelayLogger.logEvent("connecting", this);
+
+ if (!wsClient.isStarted()) {
+ try {
+ wsClient.start();
+ } catch (Exception e) {
+ throw RelayLogger.throwingException(e, this);
+ }
+ }
+
try {
- if (config != null) {
- this.container.connectToServer(this, config, uri);
- } else {
- this.container.connectToServer(this, uri);
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ if (headers != null) {
+ request.setHeaders(headers);
}
- } catch (DeploymentException | IOException e) {
+ wsClient.connect(this, uri, request).get();
+ } catch (IOException | InterruptedException | ExecutionException e) {
if (e.getCause() instanceof UpgradeException) {
throw RelayLogger.throwingException(e.getCause(), this);
}
throw RelayLogger.throwingException(e, this);
}
-
- if (this.session == null || !this.session.isOpen()) {
+
+ if (this.isNotConnected()) {
throw RelayLogger.throwingException(new RuntimeIOException("connection to the server failed."), this);
}
- },
- this.executor).whenComplete(($void, ex) -> {
- if (ex != null) {
- this.dispose();
- }
+ }, this.executor).whenComplete(($void, ex) -> {
+ if (ex != null) {
+ this.dispose();
+ }
});
}
@@ -148,13 +183,14 @@ public CompletableFuture connectAsync(URI uri, Duration timeout, ClientEnd
* endpoint
*/
boolean isOpen() {
- return this.session != null && this.session.isOpen();
+ return this.isConnected();
}
/**
* Receives text messages asynchronously.
*
- * @return Returns a CompletableFuture which completes when websocket receives text messages
+ * @return Returns a CompletableFuture which completes when websocket receives
+ * text messages
*/
CompletableFuture readTextAsync() {
return this.textQueue.dequeueAsync().thenApply(text -> {
@@ -164,34 +200,38 @@ CompletableFuture readTextAsync() {
return text;
});
}
-
+
/**
* Receives byte messages from the remote sender asynchronously.
*
- * @return Returns a CompletableFuture of the bytes which completes when websocket receives an entire message
+ * @return Returns a CompletableFuture of the bytes which completes when
+ * websocket receives an entire message
*/
public CompletableFuture readBinaryAsync() {
return this.readBinaryAsync(null);
}
-
+
/**
* Receives byte messages from the remote sender asynchronously.
*
* @param timeout The timeout duration for this operation.
- * @return Returns a CompletableFuture of the bytes which completes when websocket receives the entire message.
- * @throws TimeoutException thrown when a complete message frame is not received within the timeout.
+ * @return Returns a CompletableFuture of the bytes which completes when
+ * websocket receives the entire message.
+ * @throws TimeoutException thrown when a complete message frame is not received
+ * within the timeout.
*/
public CompletableFuture readBinaryAsync(Duration timeout) {
// Gather all fragments and return a single buffer
BinaryMessageReader messageReader = new BinaryMessageReader(timeout);
return messageReader.readAsync();
}
-
+
/**
* Sends the data to the remote endpoint as binary.
*
* @param data Message to be sent.
- * @return A CompletableFuture which completes when websocket finishes sending the bytes.
+ * @return A CompletableFuture which completes when websocket finishes sending
+ * the bytes.
*/
public CompletableFuture writeAsync(Object data) {
return this.writeAsync(data, null);
@@ -200,61 +240,67 @@ public CompletableFuture writeAsync(Object data) {
/**
* Sends the data to the remote endpoint within a timeout as binary.
*
- * @param data Message to be sent.
- * @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
- * @return A CompletableFuture which completes when websocket finishes sending the bytes.
- * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
+ * @param data Message to be sent.
+ * @param timeout The timeout to connect to send the data within. May be null to
+ * indicate no timeout limit.
+ * @return A CompletableFuture which completes when websocket finishes sending
+ * the bytes.
+ * @throws TimeoutException Throws when the sending task does not complete
+ * within the given timeout.
*/
public CompletableFuture writeAsync(Object data, Duration timeout) {
return writeAsync(data, timeout, true, WriteMode.BINARY);
}
-
+
/**
- * Sends the data to the remote endpoint within a timeout in one of the WriteModes.
+ * Sends the data to the remote endpoint within a timeout in one of the
+ * WriteModes.
*
- * @param data Message to be sent.
- * @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
- * @param isEnd Indicates if the data sent is the end of a message
- * @param mode The type of the message to be sent.
- * @return A CompletableFuture which completes when websocket finishes sending the bytes.
- * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
+ * @param data Message to be sent.
+ * @param timeout The timeout to connect to send the data within. May be null to
+ * indicate no timeout limit.
+ * @param isEnd Indicates if the data sent is the end of a message
+ * @param mode The type of the message to be sent.
+ * @return A CompletableFuture which completes when websocket finishes sending
+ * the bytes.
+ * @throws TimeoutException Throws when the sending task does not complete
+ * within the given timeout.
*/
CompletableFuture writeAsync(Object data, Duration timeout, boolean isEnd, WriteMode mode) {
if (this.isOpen()) {
if (data == null) {
// TODO: Log warns sending nothing because message is null
return CompletableFuture.completedFuture(null);
- }
- else {
- RemoteEndpoint.Basic remote = this.session.getBasicRemote();
+ } else {
+ RemoteEndpoint remote = this.getRemote();
RelayLogger.logEvent("writingBytes", this, mode.toString());
-
- // The websocket API will throw if multiple sends are attempted on the same websocket simultaneously
+
+ // The websocket API will throw if multiple sends are attempted on the same
+ // websocket simultaneously
return CompletableFutureUtil.timedRunAsync(timeout, () -> {
try {
if (mode.equals(WriteMode.TEXT)) {
String text = data.toString();
- remote.sendText(text, isEnd);
+ remote.sendPartialString(text, isEnd);
RelayLogger.logEvent("writingBytesFinished", this, String.valueOf(text.length()));
- }
- else {
+ } else {
byte[] bytes;
if (data instanceof byte[]) {
bytes = ((byte[]) data).clone();
- }
- else if (data instanceof ByteBuffer) {
+ } else if (data instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) data;
bytes = new byte[buffer.remaining()];
buffer.get(bytes);
- }
- else {
+ } else {
throw new IllegalArgumentException(
- "The data to be sent should be ByteBuffer or byte[], but received " + data.getClass().getSimpleName());
+ "The data to be sent should be ByteBuffer or byte[], but received "
+ + data.getClass().getSimpleName());
}
-
+
int bytesToSend = bytes.length;
- // sendBinary() will cause the content of the byte array within the ByteBuffer to change
- remote.sendBinary(ByteBuffer.wrap(bytes), isEnd);
+ // sendBinary() will cause the content of the byte array within the ByteBuffer
+ // to change
+ remote.sendPartialBytes(ByteBuffer.wrap(bytes), isEnd);
RelayLogger.logEvent("writingBytesFinished", this, String.valueOf(bytesToSend));
}
} catch (Exception e) {
@@ -262,12 +308,12 @@ else if (data instanceof ByteBuffer) {
}
}, executor);
}
- }
- else {
- return CompletableFutureUtil.fromException(new RuntimeIOException("cannot send because the session is not connected."));
+ } else {
+ return CompletableFutureUtil
+ .fromException(new RuntimeIOException("cannot send because the session is not connected."));
}
}
-
+
/**
* Closes the connection with the remote websocket
*
@@ -281,21 +327,23 @@ public CompletableFuture closeAsync() {
/**
* Closes the connection with the remote websocket with a given CloseReason
*
- * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
- * @return Returns a CompletableFuture which completes when the connection is completely closed.
+ * @param reason The CloseReason to be given for this operation. For details
+ * please see javax.websocket.CloseReason.
+ * @return Returns a CompletableFuture which completes when the connection is
+ * completely closed.
*/
- public CompletableFuture closeAsync(CloseReason reason) {
- RelayLogger.logEvent("clientWebSocketClosing", this, (reason != null) ? reason.getReasonPhrase() : "NONE");
-
- if (this.session == null || !this.session.isOpen()) {
+ public CompletableFuture closeAsync(CloseStatus reason) {
+ RelayLogger.logEvent("clientWebSocketClosing", this, (reason != null) ? reason.getPhrase() : "NONE");
+
+ if (this.isNotConnected()) {
return this.closeTask;
}
try {
if (reason != null) {
- this.session.close(reason);
+ this.getSession().close(reason);
} else {
- this.session.close();
+ this.getSession().close();
}
} catch (Throwable e) {
this.closeTask.completeExceptionally(e);
@@ -303,65 +351,96 @@ public CompletableFuture closeAsync(CloseReason reason) {
return this.closeTask;
}
-
- /**
- * Release the resources taken by this websocket. This is a very lengthy execution.
- */
- void dispose() {
- try {
- ((LifeCycle) this.container).stop();
- } catch (Exception e) {
- RelayLogger.handledExceptionAsWarning(e, this);
- }
- }
-
- @OnOpen
- public void onOpen(Session session, EndpointConfig config) {
+
+ /**
+ * Release the resources taken by this websocket. This is a very lengthy
+ * execution.
+ */
+ void dispose() {
+ try {
+ wsClient.stop();
+ } catch (Exception e) {
+ RelayLogger.handledExceptionAsWarning(e, this);
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ super.onWebSocketConnect(session);
+
RelayLogger.logEvent("connected", this);
- this.closeReason = null;
- this.session = session;
- session.setMaxBinaryMessageBufferSize(this.maxMessageBufferSize);
- session.setMaxTextMessageBufferSize(this.maxMessageBufferSize);
+
+ session.getPolicy().setMaxBinaryMessageBufferSize(maxMessageBufferSize);
+ session.getPolicy().setMaxTextMessageBufferSize(maxMessageBufferSize);
+
+ session.getPolicy().setMaxBinaryMessageSize(Integer.MAX_VALUE);
+ session.getPolicy().setMaxTextMessageSize(Integer.MAX_VALUE);
+
+ this.closeStatus = null;
this.closeTask = new CompletableFuture();
-
- session.addMessageHandler(new MessageHandler.Whole() {
- @Override
- public void onMessage(String text) {
- textQueue.enqueueAndDispatch(text);
- }
- });
- session.addMessageHandler(new MessageHandler.Partial() {
- @Override
- public void onMessage(byte[] inputBytes, boolean isEnd) {
- fragmentQueue.enqueueAndDispatch(new MessageFragment(inputBytes, isEnd));
- }
- });
+ this.executor.schedule(new PingRunnable(), keepAliveInterval.toMillis(), TimeUnit.MILLISECONDS);
}
-
- @OnClose
- public void onClose(Session session, CloseReason reason) {
- CompletableFuture.runAsync(() -> {
- this.dispose();
- }, executor);
- this.closeReason = reason;
- RelayLogger.logEvent("clientWebSocketClosed", this, reason.getReasonPhrase());
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ super.onWebSocketClose(statusCode, reason);
+
+ if (reason == null) {
+ reason = "";
+ }
+
+ CompletableFuture.runAsync(() -> {
+ this.dispose();
+ }, executor);
+
+ this.closeStatus = new CloseStatus(statusCode, reason);
+ RelayLogger.logEvent("clientWebSocketClosed", this, closeStatus.getPhrase());
this.textQueue.shutdown();
this.fragmentQueue.shutdown();
this.closeTask.complete(null);
}
- @OnError
- public void onError(Throwable cause) {
+ @Override
+ public void onWebSocketText(String message) {
+ super.onWebSocketText(message);
+
+ textQueue.enqueueAndDispatch(message);
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {
+ super.onWebSocketBinary(payload, offset, len);
+
+ fragmentQueue.enqueueAndDispatch(new MessageFragment(payload, true));
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ super.onWebSocketError(cause);
+
if (!this.isOpen()) {
- // A new websocket will be created through reconnection attempt, dispose this one
- CompletableFuture.runAsync(() -> {
- this.dispose();
- }, executor);
+ // A new websocket will be created through reconnection attempt, dispose this
+ // one
+ CompletableFuture.runAsync(() -> {
+ this.dispose();
+ }, executor);
}
- RelayLogger.throwingException(cause, this);
+ RelayLogger.throwingException(cause, this);
}
-
+
+ @Override
+ public void onWebSocketPing(ByteBuffer payload) {
+
+ }
+
+ @Override
+ public void onWebSocketPong(ByteBuffer payload) {
+ if (keepAliveHandler != null) {
+ keepAliveHandler.run();
+ }
+ }
+
private static class MessageFragment {
private final byte[] bytes;
private final boolean ended;
@@ -379,50 +458,66 @@ boolean isEnd() {
return this.ended;
}
}
-
+
private final class BinaryMessageReader {
private final TimeoutHelper timeoutHelper;
private final LinkedList fragments;
private int messageSize;
-
+
BinaryMessageReader(Duration timeout) {
timeoutHelper = new TimeoutHelper(timeout);
fragments = new LinkedList();
}
-
+
public CompletableFuture readAsync() {
return readFragmentsAsync()
- .thenApply((voidResult) -> {
- byte[] message = new byte[messageSize];
- int offset = 0;
- for (byte[] bytes : fragments) {
- System.arraycopy(bytes, 0, message, offset, bytes.length);
- offset += bytes.length;
- }
-
- RelayLogger.logEvent("receivedBytes", this, Integer.toString(message.length));
- return ByteBuffer.wrap(message);
- });
+ .thenApply((voidResult) -> {
+ byte[] message = new byte[messageSize];
+ int offset = 0;
+ for (byte[] bytes : fragments) {
+ System.arraycopy(bytes, 0, message, offset, bytes.length);
+ offset += bytes.length;
+ }
+
+ RelayLogger.logEvent("receivedBytes", this, Integer.toString(message.length));
+ return ByteBuffer.wrap(message);
+ });
}
-
+
private CompletableFuture readFragmentsAsync() {
return fragmentQueue.dequeueAsync(timeoutHelper.remainingTime())
- .thenCompose((fragment) -> {
- if (fragment == null) {
- // TODO: In the case of shutdown should we throw if we don't make it to the end
- // of message? We can't just give the user partial data without telling them.
+ .thenCompose((fragment) -> {
+ if (fragment == null) {
+ // TODO: In the case of shutdown should we throw if we don't make it to the end
+ // of message? We can't just give the user partial data without telling them.
+ return CompletableFuture.completedFuture(null);
+ }
+
+ messageSize += fragment.getBytes().length;
+ fragments.add(fragment.getBytes());
+
+ if (!fragment.isEnd()) {
+ return readFragmentsAsync();
+ }
+
return CompletableFuture.completedFuture(null);
- }
-
- messageSize += fragment.getBytes().length;
- fragments.add(fragment.getBytes());
+ });
+ }
+ }
- if (!fragment.isEnd()) {
- return readFragmentsAsync();
- }
-
- return CompletableFuture.completedFuture(null);
- });
+ private class PingRunnable implements Runnable {
+ @Override
+ public void run() {
+ if (ClientWebSocket.this.isConnected()) {
+ ClientWebSocket.this.executor.schedule(new PingRunnable(), keepAliveInterval.toMillis(),
+ TimeUnit.MILLISECONDS);
+ try {
+ ClientWebSocket.this.getRemote().sendPing(ByteBuffer.allocate(0));
+ RelayLogger.logEvent("pingSuccess", this);
+ } catch (IOException e) {
+ RelayLogger.logEvent("pingFailed", this);
+ }
+ }
}
}
}
diff --git a/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java b/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
new file mode 100644
index 0000000..d12ee9c
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
@@ -0,0 +1,10 @@
+package com.microsoft.azure.relay;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+public class HttpClientProvider {
+ public HttpClient getHttpClient() {
+ return new HttpClient(new SslContextFactory.Client());
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
index 2d4fe81..d664db8 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
@@ -7,7 +7,8 @@
import java.nio.channels.Channel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import javax.websocket.CloseReason;
+
+import org.eclipse.jetty.websocket.api.CloseStatus;
public interface HybridConnectionChannel extends Channel {
@@ -21,12 +22,12 @@ public interface HybridConnectionChannel extends Channel {
public CompletableFuture closeAsync();
/**
- * Closes the connection with the remote websocket with a given CloseReason
+ * Closes the connection with the remote websocket with a given CloseStatus
*
- * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
+ * @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
- public CompletableFuture closeAsync(CloseReason reason);
+ public CompletableFuture closeAsync(CloseStatus closeStatus);
/**
* Receives byte messages from the remote sender asynchronously.
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
index b94f0a5..0301aaf 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
@@ -12,13 +12,12 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import javax.websocket.ClientEndpointConfig;
-
public class HybridConnectionClient implements RelayTraceSource {
static final AutoShutdownScheduledExecutor EXECUTOR = AutoShutdownScheduledExecutor.Create();
static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(70);
static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments()
.toString().indexOf("-agentlib:jdwp") > 0;
+ private HttpClientProvider httpClientProvider;
private String cachedString;
private TrackingContext trackingContext;
private URI address;
@@ -131,6 +130,14 @@ public HybridConnectionClient(String connectionString, String path) throws URISy
tokenProvider, tokenProvider != null);
}
+ public HttpClientProvider getHttpClientProvider() {
+ return httpClientProvider;
+ }
+
+ public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
+ this.httpClientProvider = httpClientProvider;
+ }
+
/**
* The address on which this HybridConnection will connect to. This address should be of the format
* "sb://contoso.servicebus.windows.net/yourhybridconnection".
@@ -198,19 +205,16 @@ public CompletableFuture createConnectionAsync(Map> headers = new HashMap>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
- HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
- configurator.addHeaders(headers);
if (customHeaders != null) {
- configurator.addHeaders(customHeaders);
+ headers.putAll(customHeaders);
}
- ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();
try {
URI uri = HybridConnectionUtil.buildUri(this.address.getHost(), this.address.getPort(),
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.CONNECT,
trackingContext.getTrackingId());
- WebSocketChannel channel = new WebSocketChannel(trackingContext, EXECUTOR);
- return channel.getWebSocket().connectAsync(uri, this.operationTimeout, config).thenApply($void -> channel);
+ WebSocketChannel channel = new WebSocketChannel(trackingContext, this.getHttpClientProvider(), EXECUTOR);
+ return channel.getWebSocket().connectAsync(uri, this.operationTimeout, headers).thenApply($void -> channel);
} catch (URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
}
@@ -251,5 +255,6 @@ private void initialize(URI address, Duration operationTimeout, TokenProvider to
this.address = address;
this.tokenProvider = tokenProvider;
this.operationTimeout = operationTimeout;
+ this.httpClientProvider = new HttpClientProvider();
}
}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java
deleted file mode 100644
index 121e20b..0000000
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-package com.microsoft.azure.relay;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.websocket.ClientEndpointConfig;
-
-class HybridConnectionEndpointConfigurator extends ClientEndpointConfig.Configurator {
- private Map> customHeaders;
-
- HybridConnectionEndpointConfigurator() {
- this.customHeaders = new HashMap>();
- }
-
- void addHeaders(Map> headers) {
- if (headers != null && !headers.isEmpty()) {
- this.customHeaders.putAll(headers);
- }
- }
-
- @Override
- public void beforeRequest(Map> requestHeaders) {
- requestHeaders.putAll(customHeaders);
- }
-}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
index 0084867..738a706 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
@@ -19,11 +19,9 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import javax.websocket.ClientEndpointConfig;
-import javax.websocket.CloseReason;
-import javax.websocket.CloseReason.CloseCodes;
-
import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.CloseStatus;
+import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.json.JSONObject;
@@ -36,6 +34,7 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private final Object thisLock = new Object();
private final AtomicBoolean openCalled;
private final AtomicBoolean closeCalled;
+ private HttpClientProvider httpClientProvider;
private Duration operationTimeout;
private int maxWebSocketBufferSize;
private String cachedString;
@@ -48,6 +47,8 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private Consumer connectingHandler;
private Consumer offlineHandler;
private Runnable onlineHandler;
+ private Runnable keepAliveHandler;
+ private Duration keepAliveInterval = Duration.ofSeconds(RelayConstants.DEFAULT_PING_INTERVAL_SECONDS);
/**
* Create a new HybridConnectionListener instance for accepting
@@ -76,6 +77,7 @@ public HybridConnectionListener(URI address, TokenProvider tokenProvider) {
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
+ this.httpClientProvider = new HttpClientProvider();
}
/**
@@ -152,12 +154,21 @@ public HybridConnectionListener(String connectionString, String path) throws URI
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
+ this.httpClientProvider = new HttpClientProvider();
}
public boolean isOnline() {
return this.controlConnection.isOnline();
}
+ public HttpClientProvider getHttpClientProvider() {
+ return httpClientProvider;
+ }
+
+ public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
+ this.httpClientProvider = httpClientProvider;
+ }
+
public Function getAcceptHandler() {
return acceptHandler;
}
@@ -218,6 +229,10 @@ public Duration getOperationTimeout() {
public int getMaxWebSocketBufferSize() {
return maxWebSocketBufferSize;
}
+
+ public void setKeepAliveInterval(Duration interval) {
+ this.keepAliveInterval = interval;
+ }
public void setMaxWebSocketBufferSize(int maxWebSocketBufferSize) {
if (maxWebSocketBufferSize > 0) {
@@ -273,6 +288,20 @@ public void setOnlineHandler(Runnable onOnline) {
this.onlineHandler = onOnline;
}
+ /**
+ * Returns the handler that will be run after the listener has received a keep alive (aka pong) response
+ */
+ public Runnable getKeepAliveHandler() {
+ return keepAliveHandler;
+ }
+
+ /**
+ * Sets the handler that will be run after the listener has received a keep alive (aka pong) response
+ */
+ public void setKeepAliveHandler(Runnable onKeepAlive) {
+ this.keepAliveHandler = onKeepAlive;
+ }
+
/**
* Opens the HybridConnectionListener and registers it as a listener in
* ServiceBus.
@@ -345,7 +374,7 @@ public CompletableFuture closeAsync(Duration timeout) {
closeTasks = new CompletableFuture>[this.connectionInputQueue.getPendingCount()];
for (int i = 0; i < this.connectionInputQueue.getPendingCount(); i++) {
closeTasks[i] = this.connectionInputQueue.dequeueAsync(timeoutHelper.remainingTime()).thenAccept(connection -> {
- connection.closeAsync(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Client closing the socket normally"));
+ connection.closeAsync(new CloseStatus(StatusCode.NORMAL, "Client closing the socket normally"));
});
}
}
@@ -480,7 +509,7 @@ private CompletableFuture completeAcceptAsync(RelayedHttpListenerContext l
if (shouldAccept) {
synchronized (this.thisLock) {
- WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), EXECUTOR);
+ WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), this.getHttpClientProvider(), EXECUTOR);
if (this.closeCalled.get()) {
RelayLogger.logEvent("rendezvousClose", this, rendezvousUri.toString());
@@ -594,9 +623,9 @@ public CompletableFuture openAsync(Duration timeout) {
}).whenComplete(($void, err) -> {
if (err != null) {
RelayLogger.throwingException(err, this.listener);
- CloseReason closeReason = new CloseReason(CloseCodes.UNEXPECTED_CONDITION,
+ CloseStatus closeStatus = new CloseStatus(StatusCode.ABNORMAL,
"closing web socket connection because something went wrong trying to connect.");
- this.closeOrAbortWebSocketAsync(connectTask, closeReason);
+ this.closeOrAbortWebSocketAsync(connectTask, closeStatus);
throw new CompletionException(err);
}
});
@@ -625,8 +654,8 @@ private CompletableFuture closeAsync(Duration duration) {
if (connectTask != null) {
return connectTask.thenCompose((webSocket) -> {
return this.sendAsyncLock.acquireThenCompose(duration, () -> {
- CloseReason reason = new CloseReason(CloseCodes.NORMAL_CLOSURE, "Normal Closure");
- return webSocket.closeAsync(reason);
+ CloseStatus closeStatus = new CloseStatus(StatusCode.NORMAL, "Normal Closure");
+ return webSocket.closeAsync(closeStatus);
});
});
}
@@ -715,9 +744,6 @@ private CompletableFuture connectAsync(Duration timeout) {
// Set the authentication in request header
Map> headers = new HashMap>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
- HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
- configurator.addHeaders(headers);
- ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();
// When we reconnect we need to remove the "_GXX" suffix otherwise trackingId
// gets longer after each reconnect
@@ -729,15 +755,20 @@ private CompletableFuture connectAsync(Duration timeout) {
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.LISTEN,
trackingId);
- ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, EXECUTOR);
+ ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, this.listener.getHttpClientProvider(), EXECUTOR);
+ webSocket.setKeepAliveInterval(this.listener.keepAliveInterval);
+
return delayTask.thenCompose(($void) -> {
if (this.listener.injectedFault != null && this.listener.injectedFault instanceof UpgradeException) {
return CompletableFutureUtil.fromException(this.listener.injectedFault);
}
- return webSocket.connectAsync(websocketUri, timeout, config)
+ return webSocket.connectAsync(websocketUri, timeout, headers)
.thenApply(($void2) -> {
this.onOnline();
+ webSocket.setKeepAliveHandler(() -> {
+ this.onKeepAlive();
+ });
return webSocket;
});
});
@@ -747,7 +778,7 @@ private CompletableFuture connectAsync(Duration timeout) {
}
}
- private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture connectTask, CloseReason reason) {
+ private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture connectTask, CloseStatus closeStatus) {
assert CompletableFutureUtil.isDoneNormally(connectTask);
synchronized (this.thisLock) {
@@ -756,7 +787,7 @@ private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture webSocket.closeAsync(reason))
+ return connectTask.thenCompose((webSocket) -> webSocket.closeAsync(closeStatus))
.exceptionally((exception) -> {
// catch and do not rethrow
RelayLogger.handledExceptionAsWarning(exception, this.listener);
@@ -823,8 +854,8 @@ private CompletableFuture receivePumpCoreAsync() {
keepGoing = false;
}
else {
- CloseReason reason = webSocket.getCloseReason();
- keepGoing = this.onDisconnect(new ConnectionLostException(reason.toString()));
+ CloseStatus closeStatus = webSocket.getCloseReason();
+ keepGoing = this.onDisconnect(new ConnectionLostException(closeStatus.toString()));
}
return keepGoing;
}
@@ -870,6 +901,13 @@ private void onOffline(Throwable lastError) {
}
}
+ private void onKeepAlive() {
+ Runnable keepAliveHandler = this.listener.getKeepAliveHandler();
+ if (keepAliveHandler != null) {
+ keepAliveHandler.run();
+ }
+ }
+
// Returns true if this control connection should attempt to reconnect after this exception.
private boolean onDisconnect(Throwable lastError) {
diff --git a/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java b/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
index 6be6d56..daf9625 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
@@ -16,11 +16,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
-import javax.websocket.CloseReason;
-import javax.websocket.CloseReason.CloseCodes;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.URIUtil;
+import org.eclipse.jetty.websocket.api.CloseStatus;
+import org.eclipse.jetty.websocket.api.StatusCode;
import org.json.JSONObject;
class HybridHttpConnection implements RelayTraceSource {
@@ -270,7 +270,7 @@ private CompletableFuture sendBytesOverRendezvousAsync(ByteBuffer buffer,
private CompletableFuture ensureRendezvousAsync(Duration timeout) throws CompletionException {
if (this.rendezvousWebSocket == null) {
RelayLogger.logEvent("httpCreateRendezvous", this);
- this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.executor);
+ this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.controlWebSocket.getHttpClientProvider(), this.executor);
return this.rendezvousWebSocket.connectAsync(this.rendezvousAddress, timeout);
}
return CompletableFuture.completedFuture(null);
@@ -280,7 +280,7 @@ private CompletableFuture closeRendezvousAsync() {
if (this.rendezvousWebSocket != null) {
RelayLogger.logEvent("closing", this);
return this.rendezvousWebSocket
- .closeAsync(new CloseReason(CloseCodes.NORMAL_CLOSURE, "NormalClosure"))
+ .closeAsync(new CloseStatus(StatusCode.NORMAL, "NormalClosure"))
.thenRun(() -> RelayLogger.logEvent("closed", this));
} else {
return CompletableFuture.completedFuture(null);
diff --git a/src/main/java/com/microsoft/azure/relay/RelayConstants.java b/src/main/java/com/microsoft/azure/relay/RelayConstants.java
index 7045d09..f939d4a 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayConstants.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayConstants.java
@@ -18,6 +18,7 @@ private RelayConstants() { }
static final Duration MAX_DURATION = Duration.ofMillis(Integer.MAX_VALUE);
static final Duration MIN_DURATION = Duration.ofMillis(Integer.MIN_VALUE);
static final int DEFAULT_CONNECTION_BUFFER_SIZE = 64 * 1024;
+ static final int DEFAULT_PING_INTERVAL_SECONDS = 30;
// Listener should reconnect after 0, 1, 2, 5, 10, 30 seconds backoff delay
static final Duration[] CONNECTION_DELAY_INTERVALS = { Duration.ZERO, Duration.ofSeconds(1), Duration.ofSeconds(2),
diff --git a/src/main/java/com/microsoft/azure/relay/RelayLogger.java b/src/main/java/com/microsoft/azure/relay/RelayLogger.java
index df2842c..492a48e 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayLogger.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayLogger.java
@@ -131,6 +131,8 @@ private static void init() {
map.put("objectNotSet", "%s: %s was not set to the given value.");
map.put("offline", "%s is offline.");
map.put("parsingUUIDFailed", "%s: Parsing TrackingId '%s' as Guid failed, created new ActivityId '%s' for trace correlation.");
+ map.put("pingSuccess", "%s Ping Success");
+ map.put("pingFailed", "%s Ping Failed");
map.put("receivedBytes", "%s: received bytes from remote. Total length: %s");
map.put("receivedText", "%s: received text from remote. Total length: %s");
map.put("rendezvousClose", "%s: Relayed Listener has received call to close and will not accept the incoming connection. ConnectionAddress: %s");
diff --git a/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java b/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
index 8f7d02b..6aa4415 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
@@ -78,7 +78,7 @@ CompletableFuture acceptAsync(URI rendezvousUri) {
// clientWebSocket.Options.AddSubProtocol(subProtocol);
// }
- ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
+ ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rendezvousUri, ACCEPT_TIMEOUT).thenApply(result -> webSocket);
}
@@ -97,7 +97,7 @@ CompletableFuture rejectAsync(URI rendezvousUri) {
.append(URLEncoder.encode(this.response.getStatusDescription(), StringUtil.UTF8.name()));
URI rejectURI = new URI(builder.toString());
- ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
+ ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rejectURI, ACCEPT_TIMEOUT).thenCompose((result) -> webSocket.closeAsync());
} catch (IOException | URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
diff --git a/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java b/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
index 93f75e0..7246242 100644
--- a/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
+++ b/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
@@ -7,14 +7,15 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import javax.websocket.CloseReason;
+
+import org.eclipse.jetty.websocket.api.CloseStatus;
public class WebSocketChannel implements HybridConnectionChannel {
private final ClientWebSocket websocket;
private final TrackingContext trackingContext;
- WebSocketChannel(TrackingContext trackingContext, AutoShutdownScheduledExecutor executor) {
- this(new ClientWebSocket(trackingContext, executor), trackingContext);
+ WebSocketChannel(TrackingContext trackingContext, HttpClientProvider httpClientProvider, AutoShutdownScheduledExecutor executor) {
+ this(new ClientWebSocket(trackingContext, httpClientProvider, executor), trackingContext);
}
WebSocketChannel(ClientWebSocket websocket, TrackingContext trackingContext) {
@@ -56,13 +57,13 @@ public CompletableFuture closeAsync() {
}
/**
- * Closes the connection with the remote websocket with a given CloseReason
+ * Closes the connection with the remote websocket with a given CloseStatus
*
- * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
+ * @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
- public CompletableFuture closeAsync(CloseReason reason) {
- return this.websocket.closeAsync(reason);
+ public CompletableFuture closeAsync(CloseStatus closeStatus) {
+ return this.websocket.closeAsync(closeStatus);
}
/**
@@ -119,6 +120,7 @@ public CompletableFuture writeAsync(ByteBuffer data, Duration timeout) {
* @param textData Text message to be sent.
* @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
* @return A CompletableFuture which completes when websocket finishes sending the data.
+ * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
*/
public CompletableFuture writeTextAsync(String textData, Duration timeout) {
return this.websocket.writeAsync(textData, timeout, true, WriteMode.TEXT);
diff --git a/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java b/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
index 48f05fa..2cf64a9 100644
--- a/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
+++ b/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
@@ -77,6 +77,25 @@ public void openAndCloseTest() {
assertFalse("Listener should be closed", listener.isOnline());
assertEquals("Listener offline handler was not called exactly once", 1, offlineHandlerCalled.get());
}
+
+ @Test
+ public void keepAliveTest() throws InterruptedException {
+ AtomicInteger keepAliveHandlerCalled = new AtomicInteger(0);
+
+ listener.setKeepAliveInterval(Duration.ofSeconds(3));
+ listener.setKeepAliveHandler(() -> {
+ keepAliveHandlerCalled.incrementAndGet();
+ });
+
+ listener.openAsync(Duration.ofSeconds(15)).join();
+ assertTrue("Listener failed to open.", listener.isOnline());
+
+ Thread.sleep(5000);
+
+ listener.close();
+ assertFalse("Listener should be closed", listener.isOnline());
+ assertEquals("Keep alive handler was not called exactly once", 1, keepAliveHandlerCalled.get());
+ }
@Test
public void customHeadersTest() throws Exception {
diff --git a/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java b/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
index b638471..5b15228 100644
--- a/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
+++ b/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
@@ -2,13 +2,13 @@
import com.microsoft.azure.relay.HybridHttpConnection.ResponseStream;
import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.CloseStatus;
+import org.eclipse.jetty.websocket.api.StatusCode;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import javax.websocket.CloseReason;
-import javax.websocket.CloseReason.CloseCodes;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -216,8 +216,8 @@ public void websocketMultipleSenderTest() throws URISyntaxException {
websocket.writeAsync(ByteBuffer.wrap(SMALL_BYTES)).join();
websocket
.closeAsync(
- new CloseReason(
- CloseCodes.NORMAL_CLOSURE, "Normal closure from listener"))
+ new CloseStatus(
+ StatusCode.NORMAL, "Normal closure from listener"))
.join();
});
});
@@ -388,8 +388,8 @@ private static CompletableFuture sendAndReceiveWithWebsocketListener(
nullResult -> {
checkMsgToSendIsModified(origMsgToSend, msgToSend);
return channel.closeAsync(
- new CloseReason(
- CloseCodes.NORMAL_CLOSURE, "Listener closing normally."));
+ new CloseStatus(
+ StatusCode.NORMAL, "Listener closing normally."));
});
});
}
@@ -451,8 +451,8 @@ private static CompletableFuture sendAndReceiveTextWithWebsocketListener(
nullResult -> {
assertTrue("Message was modified.", origMsgToSend.equals(msgToSend));
return channel.closeAsync(
- new CloseReason(
- CloseCodes.NORMAL_CLOSURE, "Listener closing normally."));
+ new CloseStatus(
+ StatusCode.NORMAL, "Listener closing normally."));
});
});
}
diff --git a/src/test/java/com/microsoft/azure/relay/TestUtil.java b/src/test/java/com/microsoft/azure/relay/TestUtil.java
index 6663d91..ec7ab2e 100644
--- a/src/test/java/com/microsoft/azure/relay/TestUtil.java
+++ b/src/test/java/com/microsoft/azure/relay/TestUtil.java
@@ -2,20 +2,24 @@
import java.net.URI;
+import io.github.cdimascio.dotenv.Dotenv;
+
public class TestUtil {
- public static final String CONNECTION_STRING_ENV_VARIABLE_NAME = "RELAY_CONNECTION_STRING";
- public static final RelayConnectionStringBuilder CONNECTION_STRING_BUILDER = new RelayConnectionStringBuilder(System.getenv(CONNECTION_STRING_ENV_VARIABLE_NAME));
- public static final URI RELAY_NAMESPACE_URI = CONNECTION_STRING_BUILDER.getEndpoint();
- public static final String ENTITY_PATH = CONNECTION_STRING_BUILDER.getEntityPath();
- public static final String KEY_NAME = CONNECTION_STRING_BUILDER.getSharedAccessKeyName();
- public static final String KEY = CONNECTION_STRING_BUILDER.getSharedAccessKey();
-
- static byte[] concatByteArrays(byte[]... arrays) {
- int totalSize = 0;
+ public static final Dotenv dotenv = Dotenv.configure().load();
+ public static final String CONNECTION_STRING_ENV_VARIABLE_NAME = "RELAY_CONNECTION_STRING";
+ public static final RelayConnectionStringBuilder CONNECTION_STRING_BUILDER = new RelayConnectionStringBuilder(
+ dotenv.get(CONNECTION_STRING_ENV_VARIABLE_NAME));
+ public static final URI RELAY_NAMESPACE_URI = CONNECTION_STRING_BUILDER.getEndpoint();
+ public static final String ENTITY_PATH = CONNECTION_STRING_BUILDER.getEntityPath();
+ public static final String KEY_NAME = CONNECTION_STRING_BUILDER.getSharedAccessKeyName();
+ public static final String KEY = CONNECTION_STRING_BUILDER.getSharedAccessKey();
+
+ static byte[] concatByteArrays(byte[]... arrays) {
+ int totalSize = 0;
for (byte[] array : arrays) {
totalSize += array.length;
}
-
+
byte[] result = new byte[totalSize];
int offset = 0;
for (byte[] array : arrays) {
diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml
index 96a398f..fea2ad2 100644
--- a/src/test/resources/log4j2.xml
+++ b/src/test/resources/log4j2.xml
@@ -8,29 +8,29 @@
-
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
\ No newline at end of file