From 94744cf8bda497c297ca7a8baff3f9881400d83c Mon Sep 17 00:00:00 2001 From: bailiu Date: Mon, 23 Mar 2020 12:29:18 -0700 Subject: [PATCH] improve synchronization of fields --- .../azure/relay/HybridConnectionListener.java | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java index ba8800f..c78412e 100644 --- a/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java +++ b/src/main/java/com/microsoft/azure/relay/HybridConnectionListener.java @@ -11,6 +11,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -29,8 +31,8 @@ public class HybridConnectionListener implements RelayTraceSource, AutoCloseable private final InputQueue connectionInputQueue; private final ControlConnection controlConnection; private final Object thisLock = new Object(); - private boolean openCalled; - private volatile boolean closeCalled; + private final AtomicBoolean openCalled; + private final AtomicBoolean closeCalled; private Duration operationTimeout; private int maxWebSocketBufferSize; private String cachedString; @@ -69,6 +71,8 @@ public HybridConnectionListener(URI address, TokenProvider tokenProvider) { this.trackingContext = TrackingContext.create(this.address); this.connectionInputQueue = new InputQueue(EXECUTOR); this.controlConnection = new ControlConnection(this); + this.openCalled = new AtomicBoolean(false); + this.closeCalled = new AtomicBoolean(false); } /** @@ -143,6 +147,8 @@ public HybridConnectionListener(String connectionString, String path) throws URI this.trackingContext = TrackingContext.create(this.address); this.connectionInputQueue = new InputQueue(EXECUTOR); this.controlConnection = new ControlConnection(this); + this.openCalled = new AtomicBoolean(false); + this.closeCalled = new AtomicBoolean(false); } public boolean isOnline() { @@ -293,7 +299,7 @@ public CompletableFuture openAsync(Duration timeout) { } catch (RelayException e) { return CompletableFutureUtil.fromException(e); } - this.openCalled = true; + this.openCalled.set(true); } return this.controlConnection.openAsync(timeout); @@ -320,12 +326,12 @@ public CompletableFuture closeAsync(Duration timeout) { TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); CompletableFuture[] closeTasks; synchronized (this.thisLock) { - if (this.closeCalled) { + if (this.closeCalled.get()) { return CompletableFuture.completedFuture(null); } RelayLogger.logEvent("closing", this); - this.closeCalled = true; + this.closeCalled.set(true); // If the input queue is empty this completes all pending waiters with null and // prevents any new items being added to the input queue. @@ -364,12 +370,15 @@ public void close() { * @return A CompletableFuture which completes when a websocket connection from the sender is established. */ public CompletableFuture acceptConnectionAsync() { + CompletableFuture connection = null; synchronized (this.thisLock) { - if (!this.openCalled) { + if (!this.openCalled.get()) { throw RelayLogger.invalidOperation("cannot accept connection because listener is not open.", this); } + + connection = this.connectionInputQueue.dequeueAsync(); } - return this.connectionInputQueue.dequeueAsync(); + return connection; } @Override @@ -385,16 +394,14 @@ CompletableFuture sendControlCommandAndStreamAsync(ListenerCommand command } void throwIfDisposed() throws RelayException { - if (this.closeCalled) { + if (this.closeCalled.get()) { throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already closed.", this); } } void throwIfReadOnly() throws RelayException { - synchronized (this.thisLock) { - if (this.openCalled) { - throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already open.", this); - } + if (this.openCalled.get()) { + throw RelayLogger.invalidOperation("Invalid operation. Cannot call open when it's already open.", this); } } @@ -472,7 +479,7 @@ private CompletableFuture completeAcceptAsync(RelayedHttpListenerContext l synchronized (this.thisLock) { WebSocketChannel rendezvousConnection = new WebSocketChannel(listenerContext.getTrackingContext(), EXECUTOR); - if (this.closeCalled) { + if (this.closeCalled.get()) { RelayLogger.logEvent("rendezvousClose", this, rendezvousUri.toString()); completeAcceptTask = CompletableFuture.completedFuture(null); } else { @@ -531,7 +538,7 @@ void injectFault(Throwable throwable) { * Connects, maintains, and transparently reconnects this listener's control * connection with the cloud service. */ - static final class ControlConnection implements AutoCloseable { + final class ControlConnection implements AutoCloseable { private final HybridConnectionListener listener; private final URI address; @SuppressWarnings("unused") @@ -540,9 +547,10 @@ static final class ControlConnection implements AutoCloseable { private final AsyncLock sendAsyncLock; private final Object thisLock = new Object(); private CompletableFuture connectAsyncTask; - private int connectDelayIndex; + private AtomicInteger connectDelayIndex; + private AtomicBoolean closeCalled; private Throwable lastError; - private boolean closeCalled; + ControlConnection(HybridConnectionListener listener) { this.listener = listener; @@ -550,6 +558,8 @@ static final class ControlConnection implements AutoCloseable { String rawPath = this.address.getPath(); this.path = (rawPath.startsWith("/")) ? rawPath.substring(1) : rawPath; this.sendAsyncLock = new AsyncLock(EXECUTOR); + this.connectDelayIndex = new AtomicInteger(0); + this.closeCalled = new AtomicBoolean(false); this.tokenRenewer = new TokenRenewer( this.listener, this.address.toString(), TokenProvider.DEFAULT_TOKEN_TIMEOUT); } @@ -600,10 +610,10 @@ private CompletableFuture closeAsync(Duration duration) { CompletableFuture connectTask; synchronized (this.thisLock) { - if (this.closeCalled) { + if (this.closeCalled.get()) { return CompletableFuture.completedFuture(null); } - this.closeCalled = true; + this.closeCalled.set(true); connectTask = this.connectAsyncTask; this.connectAsyncTask = null; } @@ -696,7 +706,7 @@ private CompletableFuture connectAsync(Duration timeout) { try { this.listener.throwIfDisposed(); - CompletableFuture delayTask = CompletableFutureUtil.delayAsync(RelayConstants.CONNECTION_DELAY_INTERVALS[this.connectDelayIndex], EXECUTOR); + CompletableFuture delayTask = CompletableFutureUtil.delayAsync(RelayConstants.CONNECTION_DELAY_INTERVALS[this.connectDelayIndex.get()], EXECUTOR); CompletableFuture token = this.tokenRenewer.getTokenAsync(); // Set the authentication in request header @@ -806,7 +816,7 @@ private CompletableFuture receivePumpCoreAsync() { try { if (!webSocket.isOpen()) { this.closeOrAbortWebSocketAsync(connectTask, webSocket.getCloseReason()); - if (this.closeCalled) { + if (this.closeCalled.get()) { keepGoing = false; } else { @@ -835,7 +845,7 @@ private void onOnline() { } this.lastError = null; - this.connectDelayIndex = -1; + this.connectDelayIndex.set(-1); } RelayLogger.logEvent("connected", this.listener); @@ -863,9 +873,9 @@ private boolean onDisconnect(Throwable lastError) { synchronized (this.thisLock) { this.lastError = lastError; - if (this.connectDelayIndex < RelayConstants.CONNECTION_DELAY_INTERVALS.length - 1) { - this.connectDelayIndex++; - } + this.connectDelayIndex.updateAndGet((index) -> { + return index < RelayConstants.CONNECTION_DELAY_INTERVALS.length - 1 ? ++index : index; + }); } // Inspect the close status/description to see if this is a terminal case