From 9fdd95ccf08dba4c85e92c5c228dacc16425a9a6 Mon Sep 17 00:00:00 2001
From: Jacob <108886350+jbjordan@users.noreply.github.com>
Date: Thu, 21 May 2026 16:20:19 -0400
Subject: [PATCH] Revert "Merge dev into master and resolve conflicts (#97)"
This reverts commit bc10c8c4a7f02401e5f48489c187c93677816df4.
---
.classpath | 23 -
.env.sample | 1 -
.gitignore | 3 -
.project | 11 -
pom.xml | 49 +--
.../azure/relay/ClientWebSocket.java | 407 +++++++-----------
.../azure/relay/HttpClientProvider.java | 10 -
.../azure/relay/HybridConnectionChannel.java | 9 +-
.../azure/relay/HybridConnectionClient.java | 21 +-
.../HybridConnectionEndpointConfigurator.java | 28 ++
.../azure/relay/HybridConnectionListener.java | 76 +---
.../azure/relay/HybridHttpConnection.java | 8 +-
.../microsoft/azure/relay/RelayConstants.java | 1 -
.../microsoft/azure/relay/RelayLogger.java | 2 -
.../relay/RelayedHttpListenerContext.java | 4 +-
.../azure/relay/WebSocketChannel.java | 16 +-
.../relay/HybridConnectionListenerTest.java | 19 -
.../azure/relay/SendReceiveTest.java | 16 +-
.../com/microsoft/azure/relay/TestUtil.java | 24 +-
src/test/resources/log4j2.xml | 36 +-
20 files changed, 278 insertions(+), 486 deletions(-)
delete mode 100644 .env.sample
delete mode 100644 src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
create mode 100644 src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java
diff --git a/.classpath b/.classpath
index cd575e5..5e8a55f 100644
--- a/.classpath
+++ b/.classpath
@@ -23,28 +23,5 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.env.sample b/.env.sample
deleted file mode 100644
index 557fc39..0000000
--- a/.env.sample
+++ /dev/null
@@ -1 +0,0 @@
-RELAY_CONNECTION_STRING=
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 36b758f..9e962d0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,6 +27,3 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
/src/test/resources/log4j2.xml
-
-# .env file
-.env
\ No newline at end of file
diff --git a/.project b/.project
index 26247bf..e098f33 100644
--- a/.project
+++ b/.project
@@ -20,15 +20,4 @@
org.eclipse.jdt.core.javanature
org.eclipse.m2e.core.maven2Nature
-
-
- 1667084030105
-
- 30
-
- org.eclipse.core.resources.regexFilterMatcher
- node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__
-
-
-
diff --git a/pom.xml b/pom.xml
index 0fc4609..60169df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,28 +31,20 @@
UTF-8
+ 1.12.2
+ 9.4.44.v20210927
1.8
1.8
- 1.12.2
- 2.0.3
- 9.4.49.v20220914
- 20231013
- 2.19.0
- 4.13.2
- 5.2.2
+ 1.7.0
+ 1.2.17
-
- com.azure
- azure-identity
- ${azure.identity.version}
-
-
- org.slf4j
- slf4j-api
- ${slf4j.version}
-
+
+ com.azure
+ azure-identity
+ ${azure.identity.version}
+
org.eclipse.jetty.websocket
javax-websocket-client-impl
@@ -66,31 +58,18 @@
org.json
json
- ${json.version}
+ 20231013
junit
junit
- ${junit.version}
+ 4.13.1
test
- io.github.cdimascio
- java-dotenv
- ${java.dotenv.version}
- test
-
-
- org.apache.logging.log4j
- log4j-core
- ${log4j.version}
- test
-
-
- org.apache.logging.log4j
- log4j-slf4j2-impl
- ${log4j.version}
- test
+ org.slf4j
+ slf4j-api
+ ${slf4j-version}
diff --git a/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java b/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
index d1512d0..0bf3389 100644
--- a/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
+++ b/src/main/java/com/microsoft/azure/relay/ClientWebSocket.java
@@ -3,65 +3,47 @@
package com.microsoft.azure.relay;
-import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.io.IOException;
+
+import javax.websocket.*;
import org.eclipse.jetty.io.RuntimeIOException;
-import org.eclipse.jetty.websocket.api.CloseStatus;
-import org.eclipse.jetty.websocket.api.RemoteEndpoint;
-import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.UpgradeException;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
-import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
-import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
-import org.eclipse.jetty.websocket.client.WebSocketClient;
-class ClientWebSocket extends WebSocketAdapter implements RelayTraceSource, WebSocketPingPongListener {
+class ClientWebSocket extends Endpoint implements RelayTraceSource {
private final AutoShutdownScheduledExecutor executor;
+ private final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
private final TrackingContext trackingContext;
- private final WebSocketClient wsClient;
- private HttpClientProvider httpClientProvider;
+ private Session session;
private int maxMessageBufferSize = RelayConstants.DEFAULT_CONNECTION_BUFFER_SIZE;
- private CloseStatus closeStatus;
+ private CloseReason closeReason;
private InputQueue fragmentQueue;
private InputQueue textQueue;
private CompletableFuture closeTask;
private String cachedString;
- private Duration keepAliveInterval = Duration.ofSeconds(RelayConstants.DEFAULT_PING_INTERVAL_SECONDS);
- private Runnable keepAliveHandler;
/**
* Creates a websocket instance
*/
- public ClientWebSocket(TrackingContext trackingContext, HttpClientProvider httpClientProvider,
- AutoShutdownScheduledExecutor executor) {
+ public ClientWebSocket(TrackingContext trackingContext, AutoShutdownScheduledExecutor executor) {
this.executor = executor;
this.textQueue = new InputQueue(this.executor);
this.fragmentQueue = new InputQueue(this.executor);
+ this.closeReason = null;
this.trackingContext = trackingContext;
- this.httpClientProvider = httpClientProvider;
-
- wsClient = new WebSocketClient(httpClientProvider.getHttpClient());
}
-
- public HttpClientProvider getHttpClientProvider() {
- return httpClientProvider;
- }
-
+
public TrackingContext getTrackingContext() {
return trackingContext;
}
-
+
@Override
public String toString() {
if (this.cachedString == null) {
@@ -69,9 +51,9 @@ public String toString() {
}
return this.cachedString;
}
-
- CloseStatus getCloseReason() {
- return this.closeStatus;
+
+ CloseReason getCloseReason() {
+ return this.closeReason;
}
int getMaxMessageBufferSize() {
@@ -81,22 +63,12 @@ int getMaxMessageBufferSize() {
void setMaxMessageBufferSize(int maxMessageBufferSize) {
if (maxMessageBufferSize > 0) {
this.maxMessageBufferSize = maxMessageBufferSize;
+ this.container.setDefaultMaxTextMessageBufferSize(this.maxMessageBufferSize);
} else {
throw new IllegalArgumentException("MaxBufferSize of the web socket must be a positive value.");
}
}
- void setKeepAliveInterval(Duration interval) {
- this.keepAliveInterval = interval;
- }
-
- /**
- * Sets the handler that will be run after the listener has received a keep alive (aka pong) response
- */
- void setKeepAliveHandler(Runnable onKeepAlive) {
- this.keepAliveHandler = onKeepAlive;
- }
-
/**
* Establish websocket connection between the control websocket and the cloud
* service if not already established.
@@ -124,7 +96,7 @@ public CompletableFuture connectAsync(URI uri) {
public CompletableFuture connectAsync(URI uri, Duration timeout) {
return this.connectAsync(uri, timeout, null);
}
-
+
/**
* Establish websocket connection between the control websocket and the cloud
* service if not already established.
@@ -137,42 +109,35 @@ public CompletableFuture connectAsync(URI uri, Duration timeout) {
* @throws CompletionException Throws when connection could not be established
* within the given timeout
*/
- public CompletableFuture connectAsync(URI uri, Duration timeout, Map> headers) {
+ public CompletableFuture connectAsync(URI uri, Duration timeout, ClientEndpointConfig config) {
if (this.isOpen()) {
return CompletableFutureUtil.fromException(new RuntimeIOException("This connection is already connected."));
}
+ this.container.setDefaultMaxTextMessageBufferSize(this.maxMessageBufferSize);
return CompletableFutureUtil.timedRunAsync(timeout, () -> {
RelayLogger.logEvent("connecting", this);
-
- if (!wsClient.isStarted()) {
- try {
- wsClient.start();
- } catch (Exception e) {
- throw RelayLogger.throwingException(e, this);
- }
- }
-
try {
- ClientUpgradeRequest request = new ClientUpgradeRequest();
- if (headers != null) {
- request.setHeaders(headers);
+ if (config != null) {
+ this.container.connectToServer(this, config, uri);
+ } else {
+ this.container.connectToServer(this, uri);
}
- wsClient.connect(this, uri, request).get();
- } catch (IOException | InterruptedException | ExecutionException e) {
+ } catch (DeploymentException | IOException e) {
if (e.getCause() instanceof UpgradeException) {
throw RelayLogger.throwingException(e.getCause(), this);
}
throw RelayLogger.throwingException(e, this);
}
-
- if (this.isNotConnected()) {
+
+ if (this.session == null || !this.session.isOpen()) {
throw RelayLogger.throwingException(new RuntimeIOException("connection to the server failed."), this);
}
- }, this.executor).whenComplete(($void, ex) -> {
- if (ex != null) {
- this.dispose();
- }
+ },
+ this.executor).whenComplete(($void, ex) -> {
+ if (ex != null) {
+ this.dispose();
+ }
});
}
@@ -183,14 +148,13 @@ public CompletableFuture connectAsync(URI uri, Duration timeout, Map readTextAsync() {
return this.textQueue.dequeueAsync().thenApply(text -> {
@@ -200,38 +164,34 @@ CompletableFuture readTextAsync() {
return text;
});
}
-
+
/**
* Receives byte messages from the remote sender asynchronously.
*
- * @return Returns a CompletableFuture of the bytes which completes when
- * websocket receives an entire message
+ * @return Returns a CompletableFuture of the bytes which completes when websocket receives an entire message
*/
public CompletableFuture readBinaryAsync() {
return this.readBinaryAsync(null);
}
-
+
/**
* Receives byte messages from the remote sender asynchronously.
*
* @param timeout The timeout duration for this operation.
- * @return Returns a CompletableFuture of the bytes which completes when
- * websocket receives the entire message.
- * @throws TimeoutException thrown when a complete message frame is not received
- * within the timeout.
+ * @return Returns a CompletableFuture of the bytes which completes when websocket receives the entire message.
+ * @throws TimeoutException thrown when a complete message frame is not received within the timeout.
*/
public CompletableFuture readBinaryAsync(Duration timeout) {
// Gather all fragments and return a single buffer
BinaryMessageReader messageReader = new BinaryMessageReader(timeout);
return messageReader.readAsync();
}
-
+
/**
* Sends the data to the remote endpoint as binary.
*
* @param data Message to be sent.
- * @return A CompletableFuture which completes when websocket finishes sending
- * the bytes.
+ * @return A CompletableFuture which completes when websocket finishes sending the bytes.
*/
public CompletableFuture writeAsync(Object data) {
return this.writeAsync(data, null);
@@ -240,67 +200,61 @@ public CompletableFuture writeAsync(Object data) {
/**
* Sends the data to the remote endpoint within a timeout as binary.
*
- * @param data Message to be sent.
- * @param timeout The timeout to connect to send the data within. May be null to
- * indicate no timeout limit.
- * @return A CompletableFuture which completes when websocket finishes sending
- * the bytes.
- * @throws TimeoutException Throws when the sending task does not complete
- * within the given timeout.
+ * @param data Message to be sent.
+ * @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
+ * @return A CompletableFuture which completes when websocket finishes sending the bytes.
+ * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
*/
public CompletableFuture writeAsync(Object data, Duration timeout) {
return writeAsync(data, timeout, true, WriteMode.BINARY);
}
-
+
/**
- * Sends the data to the remote endpoint within a timeout in one of the
- * WriteModes.
+ * Sends the data to the remote endpoint within a timeout in one of the WriteModes.
*
- * @param data Message to be sent.
- * @param timeout The timeout to connect to send the data within. May be null to
- * indicate no timeout limit.
- * @param isEnd Indicates if the data sent is the end of a message
- * @param mode The type of the message to be sent.
- * @return A CompletableFuture which completes when websocket finishes sending
- * the bytes.
- * @throws TimeoutException Throws when the sending task does not complete
- * within the given timeout.
+ * @param data Message to be sent.
+ * @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
+ * @param isEnd Indicates if the data sent is the end of a message
+ * @param mode The type of the message to be sent.
+ * @return A CompletableFuture which completes when websocket finishes sending the bytes.
+ * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
*/
CompletableFuture writeAsync(Object data, Duration timeout, boolean isEnd, WriteMode mode) {
if (this.isOpen()) {
if (data == null) {
// TODO: Log warns sending nothing because message is null
return CompletableFuture.completedFuture(null);
- } else {
- RemoteEndpoint remote = this.getRemote();
+ }
+ else {
+ RemoteEndpoint.Basic remote = this.session.getBasicRemote();
RelayLogger.logEvent("writingBytes", this, mode.toString());
-
- // The websocket API will throw if multiple sends are attempted on the same
- // websocket simultaneously
+
+ // The websocket API will throw if multiple sends are attempted on the same websocket simultaneously
return CompletableFutureUtil.timedRunAsync(timeout, () -> {
try {
if (mode.equals(WriteMode.TEXT)) {
String text = data.toString();
- remote.sendPartialString(text, isEnd);
+ remote.sendText(text, isEnd);
RelayLogger.logEvent("writingBytesFinished", this, String.valueOf(text.length()));
- } else {
+ }
+ else {
byte[] bytes;
if (data instanceof byte[]) {
bytes = ((byte[]) data).clone();
- } else if (data instanceof ByteBuffer) {
+ }
+ else if (data instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) data;
bytes = new byte[buffer.remaining()];
buffer.get(bytes);
- } else {
+ }
+ else {
throw new IllegalArgumentException(
- "The data to be sent should be ByteBuffer or byte[], but received "
- + data.getClass().getSimpleName());
+ "The data to be sent should be ByteBuffer or byte[], but received " + data.getClass().getSimpleName());
}
-
+
int bytesToSend = bytes.length;
- // sendBinary() will cause the content of the byte array within the ByteBuffer
- // to change
- remote.sendPartialBytes(ByteBuffer.wrap(bytes), isEnd);
+ // sendBinary() will cause the content of the byte array within the ByteBuffer to change
+ remote.sendBinary(ByteBuffer.wrap(bytes), isEnd);
RelayLogger.logEvent("writingBytesFinished", this, String.valueOf(bytesToSend));
}
} catch (Exception e) {
@@ -308,12 +262,12 @@ CompletableFuture writeAsync(Object data, Duration timeout, boolean isEnd,
}
}, executor);
}
- } else {
- return CompletableFutureUtil
- .fromException(new RuntimeIOException("cannot send because the session is not connected."));
+ }
+ else {
+ return CompletableFutureUtil.fromException(new RuntimeIOException("cannot send because the session is not connected."));
}
}
-
+
/**
* Closes the connection with the remote websocket
*
@@ -327,23 +281,21 @@ public CompletableFuture closeAsync() {
/**
* Closes the connection with the remote websocket with a given CloseReason
*
- * @param reason The CloseReason to be given for this operation. For details
- * please see javax.websocket.CloseReason.
- * @return Returns a CompletableFuture which completes when the connection is
- * completely closed.
+ * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
+ * @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
- public CompletableFuture closeAsync(CloseStatus reason) {
- RelayLogger.logEvent("clientWebSocketClosing", this, (reason != null) ? reason.getPhrase() : "NONE");
-
- if (this.isNotConnected()) {
+ public CompletableFuture closeAsync(CloseReason reason) {
+ RelayLogger.logEvent("clientWebSocketClosing", this, (reason != null) ? reason.getReasonPhrase() : "NONE");
+
+ if (this.session == null || !this.session.isOpen()) {
return this.closeTask;
}
try {
if (reason != null) {
- this.getSession().close(reason);
+ this.session.close(reason);
} else {
- this.getSession().close();
+ this.session.close();
}
} catch (Throwable e) {
this.closeTask.completeExceptionally(e);
@@ -351,96 +303,65 @@ public CompletableFuture closeAsync(CloseStatus reason) {
return this.closeTask;
}
-
- /**
- * Release the resources taken by this websocket. This is a very lengthy
- * execution.
- */
- void dispose() {
- try {
- wsClient.stop();
- } catch (Exception e) {
- RelayLogger.handledExceptionAsWarning(e, this);
- }
- }
-
- @Override
- public void onWebSocketConnect(Session session) {
- super.onWebSocketConnect(session);
-
+
+ /**
+ * Release the resources taken by this websocket. This is a very lengthy execution.
+ */
+ void dispose() {
+ try {
+ ((LifeCycle) this.container).stop();
+ } catch (Exception e) {
+ RelayLogger.handledExceptionAsWarning(e, this);
+ }
+ }
+
+ @OnOpen
+ public void onOpen(Session session, EndpointConfig config) {
RelayLogger.logEvent("connected", this);
-
- session.getPolicy().setMaxBinaryMessageBufferSize(maxMessageBufferSize);
- session.getPolicy().setMaxTextMessageBufferSize(maxMessageBufferSize);
-
- session.getPolicy().setMaxBinaryMessageSize(Integer.MAX_VALUE);
- session.getPolicy().setMaxTextMessageSize(Integer.MAX_VALUE);
-
- this.closeStatus = null;
+ this.closeReason = null;
+ this.session = session;
+ session.setMaxBinaryMessageBufferSize(this.maxMessageBufferSize);
+ session.setMaxTextMessageBufferSize(this.maxMessageBufferSize);
this.closeTask = new CompletableFuture();
+
+ session.addMessageHandler(new MessageHandler.Whole() {
+ @Override
+ public void onMessage(String text) {
+ textQueue.enqueueAndDispatch(text);
+ }
+ });
- this.executor.schedule(new PingRunnable(), keepAliveInterval.toMillis(), TimeUnit.MILLISECONDS);
+ session.addMessageHandler(new MessageHandler.Partial() {
+ @Override
+ public void onMessage(byte[] inputBytes, boolean isEnd) {
+ fragmentQueue.enqueueAndDispatch(new MessageFragment(inputBytes, isEnd));
+ }
+ });
}
-
- @Override
- public void onWebSocketClose(int statusCode, String reason) {
- super.onWebSocketClose(statusCode, reason);
-
- if (reason == null) {
- reason = "";
- }
-
- CompletableFuture.runAsync(() -> {
- this.dispose();
- }, executor);
-
- this.closeStatus = new CloseStatus(statusCode, reason);
- RelayLogger.logEvent("clientWebSocketClosed", this, closeStatus.getPhrase());
+
+ @OnClose
+ public void onClose(Session session, CloseReason reason) {
+ CompletableFuture.runAsync(() -> {
+ this.dispose();
+ }, executor);
+ this.closeReason = reason;
+ RelayLogger.logEvent("clientWebSocketClosed", this, reason.getReasonPhrase());
this.textQueue.shutdown();
this.fragmentQueue.shutdown();
this.closeTask.complete(null);
}
- @Override
- public void onWebSocketText(String message) {
- super.onWebSocketText(message);
-
- textQueue.enqueueAndDispatch(message);
- }
-
- @Override
- public void onWebSocketBinary(byte[] payload, int offset, int len) {
- super.onWebSocketBinary(payload, offset, len);
-
- fragmentQueue.enqueueAndDispatch(new MessageFragment(payload, true));
- }
-
- @Override
- public void onWebSocketError(Throwable cause) {
- super.onWebSocketError(cause);
-
+ @OnError
+ public void onError(Throwable cause) {
if (!this.isOpen()) {
- // A new websocket will be created through reconnection attempt, dispose this
- // one
- CompletableFuture.runAsync(() -> {
- this.dispose();
- }, executor);
- }
- RelayLogger.throwingException(cause, this);
- }
-
- @Override
- public void onWebSocketPing(ByteBuffer payload) {
-
- }
-
- @Override
- public void onWebSocketPong(ByteBuffer payload) {
- if (keepAliveHandler != null) {
- keepAliveHandler.run();
+ // A new websocket will be created through reconnection attempt, dispose this one
+ CompletableFuture.runAsync(() -> {
+ this.dispose();
+ }, executor);
}
+ RelayLogger.throwingException(cause, this);
}
-
+
private static class MessageFragment {
private final byte[] bytes;
private final boolean ended;
@@ -458,66 +379,50 @@ boolean isEnd() {
return this.ended;
}
}
-
+
private final class BinaryMessageReader {
private final TimeoutHelper timeoutHelper;
private final LinkedList fragments;
private int messageSize;
-
+
BinaryMessageReader(Duration timeout) {
timeoutHelper = new TimeoutHelper(timeout);
fragments = new LinkedList();
}
-
+
public CompletableFuture readAsync() {
return readFragmentsAsync()
- .thenApply((voidResult) -> {
- byte[] message = new byte[messageSize];
- int offset = 0;
- for (byte[] bytes : fragments) {
- System.arraycopy(bytes, 0, message, offset, bytes.length);
- offset += bytes.length;
- }
-
- RelayLogger.logEvent("receivedBytes", this, Integer.toString(message.length));
- return ByteBuffer.wrap(message);
- });
+ .thenApply((voidResult) -> {
+ byte[] message = new byte[messageSize];
+ int offset = 0;
+ for (byte[] bytes : fragments) {
+ System.arraycopy(bytes, 0, message, offset, bytes.length);
+ offset += bytes.length;
+ }
+
+ RelayLogger.logEvent("receivedBytes", this, Integer.toString(message.length));
+ return ByteBuffer.wrap(message);
+ });
}
-
+
private CompletableFuture readFragmentsAsync() {
return fragmentQueue.dequeueAsync(timeoutHelper.remainingTime())
- .thenCompose((fragment) -> {
- if (fragment == null) {
- // TODO: In the case of shutdown should we throw if we don't make it to the end
- // of message? We can't just give the user partial data without telling them.
- return CompletableFuture.completedFuture(null);
- }
-
- messageSize += fragment.getBytes().length;
- fragments.add(fragment.getBytes());
-
- if (!fragment.isEnd()) {
- return readFragmentsAsync();
- }
-
+ .thenCompose((fragment) -> {
+ if (fragment == null) {
+ // TODO: In the case of shutdown should we throw if we don't make it to the end
+ // of message? We can't just give the user partial data without telling them.
return CompletableFuture.completedFuture(null);
- });
- }
- }
+ }
- private class PingRunnable implements Runnable {
- @Override
- public void run() {
- if (ClientWebSocket.this.isConnected()) {
- ClientWebSocket.this.executor.schedule(new PingRunnable(), keepAliveInterval.toMillis(),
- TimeUnit.MILLISECONDS);
- try {
- ClientWebSocket.this.getRemote().sendPing(ByteBuffer.allocate(0));
- RelayLogger.logEvent("pingSuccess", this);
- } catch (IOException e) {
- RelayLogger.logEvent("pingFailed", this);
- }
- }
+ messageSize += fragment.getBytes().length;
+ fragments.add(fragment.getBytes());
+
+ if (!fragment.isEnd()) {
+ return readFragmentsAsync();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
}
}
}
diff --git a/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java b/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
deleted file mode 100644
index d12ee9c..0000000
--- a/src/main/java/com/microsoft/azure/relay/HttpClientProvider.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.microsoft.azure.relay;
-
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-public class HttpClientProvider {
- public HttpClient getHttpClient() {
- return new HttpClient(new SslContextFactory.Client());
- }
-}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
index d664db8..2d4fe81 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionChannel.java
@@ -7,8 +7,7 @@
import java.nio.channels.Channel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-
-import org.eclipse.jetty.websocket.api.CloseStatus;
+import javax.websocket.CloseReason;
public interface HybridConnectionChannel extends Channel {
@@ -22,12 +21,12 @@ public interface HybridConnectionChannel extends Channel {
public CompletableFuture closeAsync();
/**
- * Closes the connection with the remote websocket with a given CloseStatus
+ * Closes the connection with the remote websocket with a given CloseReason
*
- * @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
+ * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
- public CompletableFuture closeAsync(CloseStatus closeStatus);
+ public CompletableFuture closeAsync(CloseReason reason);
/**
* Receives byte messages from the remote sender asynchronously.
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
index 0301aaf..b94f0a5 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionClient.java
@@ -12,12 +12,13 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import javax.websocket.ClientEndpointConfig;
+
public class HybridConnectionClient implements RelayTraceSource {
static final AutoShutdownScheduledExecutor EXECUTOR = AutoShutdownScheduledExecutor.Create();
static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(70);
static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments()
.toString().indexOf("-agentlib:jdwp") > 0;
- private HttpClientProvider httpClientProvider;
private String cachedString;
private TrackingContext trackingContext;
private URI address;
@@ -130,14 +131,6 @@ public HybridConnectionClient(String connectionString, String path) throws URISy
tokenProvider, tokenProvider != null);
}
- public HttpClientProvider getHttpClientProvider() {
- return httpClientProvider;
- }
-
- public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
- this.httpClientProvider = httpClientProvider;
- }
-
/**
* The address on which this HybridConnection will connect to. This address should be of the format
* "sb://contoso.servicebus.windows.net/yourhybridconnection".
@@ -205,16 +198,19 @@ public CompletableFuture createConnectionAsync(Map> headers = new HashMap>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
+ HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
+ configurator.addHeaders(headers);
if (customHeaders != null) {
- headers.putAll(customHeaders);
+ configurator.addHeaders(customHeaders);
}
+ ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();
try {
URI uri = HybridConnectionUtil.buildUri(this.address.getHost(), this.address.getPort(),
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.CONNECT,
trackingContext.getTrackingId());
- WebSocketChannel channel = new WebSocketChannel(trackingContext, this.getHttpClientProvider(), EXECUTOR);
- return channel.getWebSocket().connectAsync(uri, this.operationTimeout, headers).thenApply($void -> channel);
+ WebSocketChannel channel = new WebSocketChannel(trackingContext, EXECUTOR);
+ return channel.getWebSocket().connectAsync(uri, this.operationTimeout, config).thenApply($void -> channel);
} catch (URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
}
@@ -255,6 +251,5 @@ private void initialize(URI address, Duration operationTimeout, TokenProvider to
this.address = address;
this.tokenProvider = tokenProvider;
this.operationTimeout = operationTimeout;
- this.httpClientProvider = new HttpClientProvider();
}
}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java
new file mode 100644
index 0000000..121e20b
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionEndpointConfigurator.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+package com.microsoft.azure.relay;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.websocket.ClientEndpointConfig;
+
+class HybridConnectionEndpointConfigurator extends ClientEndpointConfig.Configurator {
+ private Map> customHeaders;
+
+ HybridConnectionEndpointConfigurator() {
+ this.customHeaders = new HashMap>();
+ }
+
+ void addHeaders(Map> headers) {
+ if (headers != null && !headers.isEmpty()) {
+ this.customHeaders.putAll(headers);
+ }
+ }
+
+ @Override
+ public void beforeRequest(Map> requestHeaders) {
+ requestHeaders.putAll(customHeaders);
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
index 738a706..0084867 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java
@@ -19,9 +19,11 @@
import java.util.function.Consumer;
import java.util.function.Function;
+import javax.websocket.ClientEndpointConfig;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
+
import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.websocket.api.CloseStatus;
-import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.json.JSONObject;
@@ -34,7 +36,6 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private final Object thisLock = new Object();
private final AtomicBoolean openCalled;
private final AtomicBoolean closeCalled;
- private HttpClientProvider httpClientProvider;
private Duration operationTimeout;
private int maxWebSocketBufferSize;
private String cachedString;
@@ -47,8 +48,6 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private Consumer connectingHandler;
private Consumer offlineHandler;
private Runnable onlineHandler;
- private Runnable keepAliveHandler;
- private Duration keepAliveInterval = Duration.ofSeconds(RelayConstants.DEFAULT_PING_INTERVAL_SECONDS);
/**
* Create a new HybridConnectionListener instance for accepting
@@ -77,7 +76,6 @@ public HybridConnectionListener(URI address, TokenProvider tokenProvider) {
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
- this.httpClientProvider = new HttpClientProvider();
}
/**
@@ -154,21 +152,12 @@ public HybridConnectionListener(String connectionString, String path) throws URI
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
- this.httpClientProvider = new HttpClientProvider();
}
public boolean isOnline() {
return this.controlConnection.isOnline();
}
- public HttpClientProvider getHttpClientProvider() {
- return httpClientProvider;
- }
-
- public void setHttpClientProvider(HttpClientProvider httpClientProvider) {
- this.httpClientProvider = httpClientProvider;
- }
-
public Function getAcceptHandler() {
return acceptHandler;
}
@@ -229,10 +218,6 @@ public Duration getOperationTimeout() {
public int getMaxWebSocketBufferSize() {
return maxWebSocketBufferSize;
}
-
- public void setKeepAliveInterval(Duration interval) {
- this.keepAliveInterval = interval;
- }
public void setMaxWebSocketBufferSize(int maxWebSocketBufferSize) {
if (maxWebSocketBufferSize > 0) {
@@ -288,20 +273,6 @@ public void setOnlineHandler(Runnable onOnline) {
this.onlineHandler = onOnline;
}
- /**
- * Returns the handler that will be run after the listener has received a keep alive (aka pong) response
- */
- public Runnable getKeepAliveHandler() {
- return keepAliveHandler;
- }
-
- /**
- * Sets the handler that will be run after the listener has received a keep alive (aka pong) response
- */
- public void setKeepAliveHandler(Runnable onKeepAlive) {
- this.keepAliveHandler = onKeepAlive;
- }
-
/**
* Opens the HybridConnectionListener and registers it as a listener in
* ServiceBus.
@@ -374,7 +345,7 @@ public CompletableFuture closeAsync(Duration timeout) {
closeTasks = new CompletableFuture>[this.connectionInputQueue.getPendingCount()];
for (int i = 0; i < this.connectionInputQueue.getPendingCount(); i++) {
closeTasks[i] = this.connectionInputQueue.dequeueAsync(timeoutHelper.remainingTime()).thenAccept(connection -> {
- connection.closeAsync(new CloseStatus(StatusCode.NORMAL, "Client closing the socket normally"));
+ connection.closeAsync(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Client closing the socket normally"));
});
}
}
@@ -509,7 +480,7 @@ private CompletableFuture completeAcceptAsync(RelayedHttpListenerContext l
if (shouldAccept) {
synchronized (this.thisLock) {
- WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), this.getHttpClientProvider(), EXECUTOR);
+ WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), EXECUTOR);
if (this.closeCalled.get()) {
RelayLogger.logEvent("rendezvousClose", this, rendezvousUri.toString());
@@ -623,9 +594,9 @@ public CompletableFuture openAsync(Duration timeout) {
}).whenComplete(($void, err) -> {
if (err != null) {
RelayLogger.throwingException(err, this.listener);
- CloseStatus closeStatus = new CloseStatus(StatusCode.ABNORMAL,
+ CloseReason closeReason = new CloseReason(CloseCodes.UNEXPECTED_CONDITION,
"closing web socket connection because something went wrong trying to connect.");
- this.closeOrAbortWebSocketAsync(connectTask, closeStatus);
+ this.closeOrAbortWebSocketAsync(connectTask, closeReason);
throw new CompletionException(err);
}
});
@@ -654,8 +625,8 @@ private CompletableFuture closeAsync(Duration duration) {
if (connectTask != null) {
return connectTask.thenCompose((webSocket) -> {
return this.sendAsyncLock.acquireThenCompose(duration, () -> {
- CloseStatus closeStatus = new CloseStatus(StatusCode.NORMAL, "Normal Closure");
- return webSocket.closeAsync(closeStatus);
+ CloseReason reason = new CloseReason(CloseCodes.NORMAL_CLOSURE, "Normal Closure");
+ return webSocket.closeAsync(reason);
});
});
}
@@ -744,6 +715,9 @@ private CompletableFuture connectAsync(Duration timeout) {
// Set the authentication in request header
Map> headers = new HashMap>();
headers.put(RelayConstants.SERVICEBUS_AUTHORIZATION_HEADER_NAME, Arrays.asList(token.join().getToken()));
+ HybridConnectionEndpointConfigurator configurator = new HybridConnectionEndpointConfigurator();
+ configurator.addHeaders(headers);
+ ClientEndpointConfig config = ClientEndpointConfig.Builder.create().configurator(configurator).build();
// When we reconnect we need to remove the "_GXX" suffix otherwise trackingId
// gets longer after each reconnect
@@ -755,20 +729,15 @@ private CompletableFuture connectAsync(Duration timeout) {
this.address.getPath(), this.address.getQuery(), HybridConnectionConstants.Actions.LISTEN,
trackingId);
- ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, this.listener.getHttpClientProvider(), EXECUTOR);
- webSocket.setKeepAliveInterval(this.listener.keepAliveInterval);
-
+ ClientWebSocket webSocket = new ClientWebSocket(this.listener.trackingContext, EXECUTOR);
return delayTask.thenCompose(($void) -> {
if (this.listener.injectedFault != null && this.listener.injectedFault instanceof UpgradeException) {
return CompletableFutureUtil.fromException(this.listener.injectedFault);
}
- return webSocket.connectAsync(websocketUri, timeout, headers)
+ return webSocket.connectAsync(websocketUri, timeout, config)
.thenApply(($void2) -> {
this.onOnline();
- webSocket.setKeepAliveHandler(() -> {
- this.onKeepAlive();
- });
return webSocket;
});
});
@@ -778,7 +747,7 @@ private CompletableFuture connectAsync(Duration timeout) {
}
}
- private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture connectTask, CloseStatus closeStatus) {
+ private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture connectTask, CloseReason reason) {
assert CompletableFutureUtil.isDoneNormally(connectTask);
synchronized (this.thisLock) {
@@ -787,7 +756,7 @@ private CompletableFuture closeOrAbortWebSocketAsync(CompletableFuture webSocket.closeAsync(closeStatus))
+ return connectTask.thenCompose((webSocket) -> webSocket.closeAsync(reason))
.exceptionally((exception) -> {
// catch and do not rethrow
RelayLogger.handledExceptionAsWarning(exception, this.listener);
@@ -854,8 +823,8 @@ private CompletableFuture receivePumpCoreAsync() {
keepGoing = false;
}
else {
- CloseStatus closeStatus = webSocket.getCloseReason();
- keepGoing = this.onDisconnect(new ConnectionLostException(closeStatus.toString()));
+ CloseReason reason = webSocket.getCloseReason();
+ keepGoing = this.onDisconnect(new ConnectionLostException(reason.toString()));
}
return keepGoing;
}
@@ -901,13 +870,6 @@ private void onOffline(Throwable lastError) {
}
}
- private void onKeepAlive() {
- Runnable keepAliveHandler = this.listener.getKeepAliveHandler();
- if (keepAliveHandler != null) {
- keepAliveHandler.run();
- }
- }
-
// Returns true if this control connection should attempt to reconnect after this exception.
private boolean onDisconnect(Throwable lastError) {
diff --git a/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java b/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
index daf9625..6be6d56 100644
--- a/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
+++ b/src/main/java/com/microsoft/azure/relay/HybridHttpConnection.java
@@ -16,11 +16,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.URIUtil;
-import org.eclipse.jetty.websocket.api.CloseStatus;
-import org.eclipse.jetty.websocket.api.StatusCode;
import org.json.JSONObject;
class HybridHttpConnection implements RelayTraceSource {
@@ -270,7 +270,7 @@ private CompletableFuture sendBytesOverRendezvousAsync(ByteBuffer buffer,
private CompletableFuture ensureRendezvousAsync(Duration timeout) throws CompletionException {
if (this.rendezvousWebSocket == null) {
RelayLogger.logEvent("httpCreateRendezvous", this);
- this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.controlWebSocket.getHttpClientProvider(), this.executor);
+ this.rendezvousWebSocket = new ClientWebSocket(this.trackingContext, this.executor);
return this.rendezvousWebSocket.connectAsync(this.rendezvousAddress, timeout);
}
return CompletableFuture.completedFuture(null);
@@ -280,7 +280,7 @@ private CompletableFuture closeRendezvousAsync() {
if (this.rendezvousWebSocket != null) {
RelayLogger.logEvent("closing", this);
return this.rendezvousWebSocket
- .closeAsync(new CloseStatus(StatusCode.NORMAL, "NormalClosure"))
+ .closeAsync(new CloseReason(CloseCodes.NORMAL_CLOSURE, "NormalClosure"))
.thenRun(() -> RelayLogger.logEvent("closed", this));
} else {
return CompletableFuture.completedFuture(null);
diff --git a/src/main/java/com/microsoft/azure/relay/RelayConstants.java b/src/main/java/com/microsoft/azure/relay/RelayConstants.java
index f939d4a..7045d09 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayConstants.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayConstants.java
@@ -18,7 +18,6 @@ private RelayConstants() { }
static final Duration MAX_DURATION = Duration.ofMillis(Integer.MAX_VALUE);
static final Duration MIN_DURATION = Duration.ofMillis(Integer.MIN_VALUE);
static final int DEFAULT_CONNECTION_BUFFER_SIZE = 64 * 1024;
- static final int DEFAULT_PING_INTERVAL_SECONDS = 30;
// Listener should reconnect after 0, 1, 2, 5, 10, 30 seconds backoff delay
static final Duration[] CONNECTION_DELAY_INTERVALS = { Duration.ZERO, Duration.ofSeconds(1), Duration.ofSeconds(2),
diff --git a/src/main/java/com/microsoft/azure/relay/RelayLogger.java b/src/main/java/com/microsoft/azure/relay/RelayLogger.java
index 492a48e..df2842c 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayLogger.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayLogger.java
@@ -131,8 +131,6 @@ private static void init() {
map.put("objectNotSet", "%s: %s was not set to the given value.");
map.put("offline", "%s is offline.");
map.put("parsingUUIDFailed", "%s: Parsing TrackingId '%s' as Guid failed, created new ActivityId '%s' for trace correlation.");
- map.put("pingSuccess", "%s Ping Success");
- map.put("pingFailed", "%s Ping Failed");
map.put("receivedBytes", "%s: received bytes from remote. Total length: %s");
map.put("receivedText", "%s: received text from remote. Total length: %s");
map.put("rendezvousClose", "%s: Relayed Listener has received call to close and will not accept the incoming connection. ConnectionAddress: %s");
diff --git a/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java b/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
index 6aa4415..8f7d02b 100644
--- a/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
+++ b/src/main/java/com/microsoft/azure/relay/RelayedHttpListenerContext.java
@@ -78,7 +78,7 @@ CompletableFuture acceptAsync(URI rendezvousUri) {
// clientWebSocket.Options.AddSubProtocol(subProtocol);
// }
- ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
+ ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rendezvousUri, ACCEPT_TIMEOUT).thenApply(result -> webSocket);
}
@@ -97,7 +97,7 @@ CompletableFuture rejectAsync(URI rendezvousUri) {
.append(URLEncoder.encode(this.response.getStatusDescription(), StringUtil.UTF8.name()));
URI rejectURI = new URI(builder.toString());
- ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, this.listener.getHttpClientProvider(), HybridConnectionListener.EXECUTOR);
+ ClientWebSocket webSocket = new ClientWebSocket(this.trackingContext, HybridConnectionListener.EXECUTOR);
return webSocket.connectAsync(rejectURI, ACCEPT_TIMEOUT).thenCompose((result) -> webSocket.closeAsync());
} catch (IOException | URISyntaxException e) {
return CompletableFutureUtil.fromException(e);
diff --git a/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java b/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
index 7246242..93f75e0 100644
--- a/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
+++ b/src/main/java/com/microsoft/azure/relay/WebSocketChannel.java
@@ -7,15 +7,14 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-
-import org.eclipse.jetty.websocket.api.CloseStatus;
+import javax.websocket.CloseReason;
public class WebSocketChannel implements HybridConnectionChannel {
private final ClientWebSocket websocket;
private final TrackingContext trackingContext;
- WebSocketChannel(TrackingContext trackingContext, HttpClientProvider httpClientProvider, AutoShutdownScheduledExecutor executor) {
- this(new ClientWebSocket(trackingContext, httpClientProvider, executor), trackingContext);
+ WebSocketChannel(TrackingContext trackingContext, AutoShutdownScheduledExecutor executor) {
+ this(new ClientWebSocket(trackingContext, executor), trackingContext);
}
WebSocketChannel(ClientWebSocket websocket, TrackingContext trackingContext) {
@@ -57,13 +56,13 @@ public CompletableFuture closeAsync() {
}
/**
- * Closes the connection with the remote websocket with a given CloseStatus
+ * Closes the connection with the remote websocket with a given CloseReason
*
- * @param closeStatus The CloseStatus to be given for this operation. For details please see org.eclipse.jetty.websocket.api.CloseStatus.
+ * @param reason The CloseReason to be given for this operation. For details please see javax.websocket.CloseReason.
* @return Returns a CompletableFuture which completes when the connection is completely closed.
*/
- public CompletableFuture closeAsync(CloseStatus closeStatus) {
- return this.websocket.closeAsync(closeStatus);
+ public CompletableFuture closeAsync(CloseReason reason) {
+ return this.websocket.closeAsync(reason);
}
/**
@@ -120,7 +119,6 @@ public CompletableFuture writeAsync(ByteBuffer data, Duration timeout) {
* @param textData Text message to be sent.
* @param timeout The timeout to connect to send the data within. May be null to indicate no timeout limit.
* @return A CompletableFuture which completes when websocket finishes sending the data.
- * @throws TimeoutException Throws when the sending task does not complete within the given timeout.
*/
public CompletableFuture writeTextAsync(String textData, Duration timeout) {
return this.websocket.writeAsync(textData, timeout, true, WriteMode.TEXT);
diff --git a/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java b/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
index 2cf64a9..48f05fa 100644
--- a/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
+++ b/src/test/java/com/microsoft/azure/relay/HybridConnectionListenerTest.java
@@ -77,25 +77,6 @@ public void openAndCloseTest() {
assertFalse("Listener should be closed", listener.isOnline());
assertEquals("Listener offline handler was not called exactly once", 1, offlineHandlerCalled.get());
}
-
- @Test
- public void keepAliveTest() throws InterruptedException {
- AtomicInteger keepAliveHandlerCalled = new AtomicInteger(0);
-
- listener.setKeepAliveInterval(Duration.ofSeconds(3));
- listener.setKeepAliveHandler(() -> {
- keepAliveHandlerCalled.incrementAndGet();
- });
-
- listener.openAsync(Duration.ofSeconds(15)).join();
- assertTrue("Listener failed to open.", listener.isOnline());
-
- Thread.sleep(5000);
-
- listener.close();
- assertFalse("Listener should be closed", listener.isOnline());
- assertEquals("Keep alive handler was not called exactly once", 1, keepAliveHandlerCalled.get());
- }
@Test
public void customHeadersTest() throws Exception {
diff --git a/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java b/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
index 5b15228..b638471 100644
--- a/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
+++ b/src/test/java/com/microsoft/azure/relay/SendReceiveTest.java
@@ -2,13 +2,13 @@
import com.microsoft.azure.relay.HybridHttpConnection.ResponseStream;
import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.websocket.api.CloseStatus;
-import org.eclipse.jetty.websocket.api.StatusCode;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -216,8 +216,8 @@ public void websocketMultipleSenderTest() throws URISyntaxException {
websocket.writeAsync(ByteBuffer.wrap(SMALL_BYTES)).join();
websocket
.closeAsync(
- new CloseStatus(
- StatusCode.NORMAL, "Normal closure from listener"))
+ new CloseReason(
+ CloseCodes.NORMAL_CLOSURE, "Normal closure from listener"))
.join();
});
});
@@ -388,8 +388,8 @@ private static CompletableFuture sendAndReceiveWithWebsocketListener(
nullResult -> {
checkMsgToSendIsModified(origMsgToSend, msgToSend);
return channel.closeAsync(
- new CloseStatus(
- StatusCode.NORMAL, "Listener closing normally."));
+ new CloseReason(
+ CloseCodes.NORMAL_CLOSURE, "Listener closing normally."));
});
});
}
@@ -451,8 +451,8 @@ private static CompletableFuture sendAndReceiveTextWithWebsocketListener(
nullResult -> {
assertTrue("Message was modified.", origMsgToSend.equals(msgToSend));
return channel.closeAsync(
- new CloseStatus(
- StatusCode.NORMAL, "Listener closing normally."));
+ new CloseReason(
+ CloseCodes.NORMAL_CLOSURE, "Listener closing normally."));
});
});
}
diff --git a/src/test/java/com/microsoft/azure/relay/TestUtil.java b/src/test/java/com/microsoft/azure/relay/TestUtil.java
index ec7ab2e..6663d91 100644
--- a/src/test/java/com/microsoft/azure/relay/TestUtil.java
+++ b/src/test/java/com/microsoft/azure/relay/TestUtil.java
@@ -2,24 +2,20 @@
import java.net.URI;
-import io.github.cdimascio.dotenv.Dotenv;
-
public class TestUtil {
- public static final Dotenv dotenv = Dotenv.configure().load();
- public static final String CONNECTION_STRING_ENV_VARIABLE_NAME = "RELAY_CONNECTION_STRING";
- public static final RelayConnectionStringBuilder CONNECTION_STRING_BUILDER = new RelayConnectionStringBuilder(
- dotenv.get(CONNECTION_STRING_ENV_VARIABLE_NAME));
- public static final URI RELAY_NAMESPACE_URI = CONNECTION_STRING_BUILDER.getEndpoint();
- public static final String ENTITY_PATH = CONNECTION_STRING_BUILDER.getEntityPath();
- public static final String KEY_NAME = CONNECTION_STRING_BUILDER.getSharedAccessKeyName();
- public static final String KEY = CONNECTION_STRING_BUILDER.getSharedAccessKey();
-
- static byte[] concatByteArrays(byte[]... arrays) {
- int totalSize = 0;
+ public static final String CONNECTION_STRING_ENV_VARIABLE_NAME = "RELAY_CONNECTION_STRING";
+ public static final RelayConnectionStringBuilder CONNECTION_STRING_BUILDER = new RelayConnectionStringBuilder(System.getenv(CONNECTION_STRING_ENV_VARIABLE_NAME));
+ public static final URI RELAY_NAMESPACE_URI = CONNECTION_STRING_BUILDER.getEndpoint();
+ public static final String ENTITY_PATH = CONNECTION_STRING_BUILDER.getEntityPath();
+ public static final String KEY_NAME = CONNECTION_STRING_BUILDER.getSharedAccessKeyName();
+ public static final String KEY = CONNECTION_STRING_BUILDER.getSharedAccessKey();
+
+ static byte[] concatByteArrays(byte[]... arrays) {
+ int totalSize = 0;
for (byte[] array : arrays) {
totalSize += array.length;
}
-
+
byte[] result = new byte[totalSize];
int offset = 0;
for (byte[] array : arrays) {
diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml
index fea2ad2..96a398f 100644
--- a/src/test/resources/log4j2.xml
+++ b/src/test/resources/log4j2.xml
@@ -8,29 +8,29 @@
-
+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
\ No newline at end of file