Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -29,8 +31,8 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable
private final InputQueue<HybridConnectionChannel> connectionInputQueue;
private final ControlConnection controlConnection;
private final Object thisLock = new Object();
private boolean openCalled;
private volatile boolean closeCalled;
private final AtomicBoolean openCalled;
private final AtomicBoolean closeCalled;
private Duration operationTimeout;
private int maxWebSocketBufferSize;
private String cachedString;
Expand Down Expand Up @@ -69,6 +71,8 @@ public HybridConnectionListener(URI address, TokenProvider tokenProvider) {
this.trackingContext = TrackingContext.create(this.address);
this.connectionInputQueue = new InputQueue<HybridConnectionChannel>(EXECUTOR);
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -143,6 +147,8 @@ public HybridConnectionListener(String connectionString, String path) throws URI
this.trackingContext = TrackingContext.create(this.address);
this.connectionInputQueue = new InputQueue<HybridConnectionChannel>(EXECUTOR);
this.controlConnection = new ControlConnection(this);
this.openCalled = new AtomicBoolean(false);
this.closeCalled = new AtomicBoolean(false);
}

public boolean isOnline() {
Expand Down Expand Up @@ -293,7 +299,7 @@ public CompletableFuture<Void> openAsync(Duration timeout) {
} catch (RelayException e) {
return CompletableFutureUtil.fromException(e);
}
this.openCalled = true;
this.openCalled.set(true);
}

return this.controlConnection.openAsync(timeout);
Expand All @@ -320,12 +326,12 @@ public CompletableFuture<Void> closeAsync(Duration timeout) {
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
CompletableFuture<?>[] closeTasks;
synchronized (this.thisLock) {
if (this.closeCalled) {
if (this.closeCalled.get()) {
return CompletableFuture.completedFuture(null);
}

RelayLogger.logEvent("closing", this);
this.closeCalled = true;
this.closeCalled.set(true);

// If the input queue is empty this completes all pending waiters with null and
// prevents any new items being added to the input queue.
Expand Down Expand Up @@ -364,12 +370,15 @@ public void close() {
* @return A CompletableFuture which completes when a websocket connection from the sender is established.
*/
public CompletableFuture<HybridConnectionChannel> acceptConnectionAsync() {
CompletableFuture<HybridConnectionChannel> connection = null;
synchronized (this.thisLock) {
if (!this.openCalled) {
if (!this.openCalled.get()) {
throw RelayLogger.invalidOperation("cannot accept connection because listener is not open.", this);
}

connection = this.connectionInputQueue.dequeueAsync();
}
return this.connectionInputQueue.dequeueAsync();
return connection;
}

@Override
Expand All @@ -385,16 +394,14 @@ CompletableFuture<Void> sendControlCommandAndStreamAsync(ListenerCommand command
}

void throwIfDisposed() throws RelayException {
if (this.closeCalled) {
if (this.closeCalled.get()) {
throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already closed.", this);
}
}

void throwIfReadOnly() throws RelayException {
synchronized (this.thisLock) {
if (this.openCalled) {
throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already open.", this);
}
if (this.openCalled.get()) {
throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already open.", this);
}
}

Expand Down Expand Up @@ -472,7 +479,7 @@ private CompletableFuture<Void> completeAcceptAsync(RelayedHttpListenerContext l
synchronized (this.thisLock) {
WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), EXECUTOR);

if (this.closeCalled) {
if (this.closeCalled.get()) {
RelayLogger.logEvent("rendezvousClose", this, rendezvousUri.toString());
completeAcceptTask = CompletableFuture.completedFuture(null);
} else {
Expand Down Expand Up @@ -531,7 +538,7 @@ void injectFault(Throwable throwable) {
* Connects, maintains, and transparently reconnects this listener's control
* connection with the cloud service.
*/
static final class ControlConnection implements AutoCloseable {
final class ControlConnection implements AutoCloseable {
private final HybridConnectionListener listener;
Copy link
Copy Markdown
Contributor

@dlstucki dlstucki Mar 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final HybridConnectionListener listener; [](start = 2, length = 48)

It's a minor point and might introduce unnecessary churn but once this class is no longer static you no longer need to explicitly pass in the "listener" reference, right? #WontFix

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case the "listener" reference would have to be referred to as "HybridConnectionListener.this", I personally feel like it's more readable as it is currently.


In reply to: 397338730 [](ancestors = 397338730)

private final URI address;
@SuppressWarnings("unused")
Expand All @@ -540,16 +547,19 @@ static final class ControlConnection implements AutoCloseable {
private final AsyncLock sendAsyncLock;
private final Object thisLock = new Object();
private CompletableFuture<ClientWebSocket> connectAsyncTask;
private int connectDelayIndex;
private AtomicInteger connectDelayIndex;
private AtomicBoolean closeCalled;
private Throwable lastError;
private boolean closeCalled;


ControlConnection(HybridConnectionListener listener) {
this.listener = listener;
this.address = listener.address;
String rawPath = this.address.getPath();
this.path = (rawPath.startsWith("/")) ? rawPath.substring(1) : rawPath;
this.sendAsyncLock = new AsyncLock(EXECUTOR);
this.connectDelayIndex = new AtomicInteger(0);
this.closeCalled = new AtomicBoolean(false);
this.tokenRenewer = new TokenRenewer(
this.listener, this.address.toString(), TokenProvider.DEFAULT_TOKEN_TIMEOUT);
}
Expand Down Expand Up @@ -600,10 +610,10 @@ private CompletableFuture<Void> closeAsync(Duration duration) {

CompletableFuture<ClientWebSocket> connectTask;
synchronized (this.thisLock) {
if (this.closeCalled) {
if (this.closeCalled.get()) {
return CompletableFuture.completedFuture(null);
}
this.closeCalled = true;
this.closeCalled.set(true);
connectTask = this.connectAsyncTask;
this.connectAsyncTask = null;
}
Expand Down Expand Up @@ -696,7 +706,7 @@ private CompletableFuture<ClientWebSocket> connectAsync(Duration timeout) {
try {
this.listener.throwIfDisposed();

CompletableFuture<Void> delayTask = CompletableFutureUtil.delayAsync(RelayConstants.CONNECTION_DELAY_INTERVALS[this.connectDelayIndex], EXECUTOR);
CompletableFuture<Void> delayTask = CompletableFutureUtil.delayAsync(RelayConstants.CONNECTION_DELAY_INTERVALS[this.connectDelayIndex.get()], EXECUTOR);
CompletableFuture<SecurityToken> token = this.tokenRenewer.getTokenAsync();

// Set the authentication in request header
Expand Down Expand Up @@ -806,7 +816,7 @@ private CompletableFuture<Boolean> receivePumpCoreAsync() {
try {
if (!webSocket.isOpen()) {
this.closeOrAbortWebSocketAsync(connectTask, webSocket.getCloseReason());
if (this.closeCalled) {
if (this.closeCalled.get()) {
keepGoing = false;
}
else {
Expand Down Expand Up @@ -835,7 +845,7 @@ private void onOnline() {
}

this.lastError = null;
this.connectDelayIndex = -1;
this.connectDelayIndex.set(-1);
}
RelayLogger.logEvent("connected", this.listener);

Expand Down Expand Up @@ -863,9 +873,9 @@ private boolean onDisconnect(Throwable lastError) {
synchronized (this.thisLock) {
this.lastError = lastError;

if (this.connectDelayIndex < RelayConstants.CONNECTION_DELAY_INTERVALS.length - 1) {
this.connectDelayIndex++;
}
this.connectDelayIndex.updateAndGet((index) -> {
return index < RelayConstants.CONNECTION_DELAY_INTERVALS.length - 1 ? ++index : index;
});
}

// Inspect the close status/description to see if this is a terminal case
Expand Down