Skip to content

Commit

Permalink
[Java] Don't call onClose when WebSocket connection is not open (#28004)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Nov 23, 2020
1 parent 4b46b26 commit 4731c87
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
private WebSocketOnClosedCallback onClose;
private CompletableSubject startSubject = CompletableSubject.create();
private CompletableSubject closeSubject = CompletableSubject.create();
private final ReentrantLock closeLock = new ReentrantLock();
private final ReentrantLock stateLock = new ReentrantLock();

private final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketWrapper.class);

Expand Down Expand Up @@ -82,7 +82,12 @@ public void setOnClose(WebSocketOnClosedCallback onClose) {
private class SignalRWebSocketListener extends WebSocketListener {
@Override
public void onOpen(WebSocket webSocket, Response response) {
startSubject.onComplete();
stateLock.lock();
try {
startSubject.onComplete();
} finally {
stateLock.unlock();
}
}

@Override
Expand All @@ -97,39 +102,64 @@ public void onMessage(WebSocket webSocket, ByteString bytes) {

@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
onClose.invoke(code, reason);
boolean isOpen = false;
stateLock.lock();
try {
isOpen = startSubject.hasComplete();
} finally {
stateLock.unlock();
}

logger.info("WebSocket closing with status code '{}' and reason '{}'.", code, reason);

// Only call onClose if connection is open
if (isOpen) {
onClose.invoke(code, reason);
}

try {
closeLock.lock();
stateLock.lock();
closeSubject.onComplete();
}
finally {
closeLock.unlock();
stateLock.unlock();
}
checkStartFailure();
checkStartFailure(null);
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
logger.error("WebSocket closed from an error: {}.", t.getMessage());
logger.error("WebSocket closed from an error.", t);

boolean isOpen = false;
try {
closeLock.lock();
stateLock.lock();
if (!closeSubject.hasComplete()) {
closeSubject.onError(new RuntimeException(t));
}

isOpen = startSubject.hasComplete();
}
finally {
closeLock.unlock();
stateLock.unlock();
}
onClose.invoke(null, t.getMessage());
checkStartFailure();
// Only call onClose if connection is open
if (isOpen) {
onClose.invoke(null, t.getMessage());
}
checkStartFailure(t);
}

private void checkStartFailure() {
// If the start task hasn't completed yet, then we need to complete it
// exceptionally.
if (!startSubject.hasComplete()) {
startSubject.onError(new RuntimeException("There was an error starting the WebSocket transport."));
private void checkStartFailure(Throwable t) {
stateLock.lock();
try {
// If the start task hasn't completed yet, then we need to complete it
// exceptionally.
if (!startSubject.hasComplete()) {
startSubject.onError(new RuntimeException("There was an error starting the WebSocket transport.", t));
}
} finally {
stateLock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public Completable stop() {
}

void onClose(Integer code, String reason) {
logger.info("WebSocket connection stopping with " +
"code {} and reason '{}'.", code, reason);
if (code == null || code != 1000) {
onClose.invoke(reason);
}
Expand Down

0 comments on commit 4731c87

Please sign in to comment.