Skip to content

Commit

Permalink
fix #5152: expanding the error detection (#5153)
Browse files Browse the repository at this point in the history
* fix #5152: consolidating logic for watch ending with improved logging

also obeying the Status retryAfterSeconds if provided

* 4.4.2 includes a fix to address demand with ping/pong

* fix #5152 adding a fail-safe check that the watch gets restarted

* fix #5152: ensuring ws errors are logged and expanding close handling

* fix #5152: correcting the jetty ws close expectation

also adding logging and refining termination

---------

Co-authored-by: Marc Nuri <marc@marcnuri.com>
  • Loading branch information
shawkins and manusa committed May 29, 2023
1 parent 51d4a25 commit 1c3baf9
Show file tree
Hide file tree
Showing 24 changed files with 631 additions and 225 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,7 +7,9 @@
* Fix #5125: TLS 1.3 only should be supported
* Fix #5126: fallback to changeit only if null/empty does not work
* Fix #5145: [java-generator] handle `additionalProperties: true` emitting a field of type `AnyType`
* Fix #5152: preventing JDK WebSocket errors from terminating watches and improving watch termination and its logging
* Fix #5164: [java-generator] handle more special characters in field names
* Fix #5125: TLS 1.3 only should be supported

#### Improvements

Expand Down
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -316,17 +315,15 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(
newBuilder.connectTimeout(timeout);
}

AtomicLong queueSize = new AtomicLong();

// use a responseholder to convey both the exception and the websocket
CompletableFuture<WebSocketResponse> response = new CompletableFuture<>();

URI uri = WebSocket.toWebSocketUri(request.uri());
newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((jdkWebSocket, t) -> {
final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(listener);
newBuilder.buildAsync(uri, fabric8WebSocket).whenComplete((jdkWebSocket, t) -> {
if (t instanceof CompletionException && t.getCause() != null) {
t = t.getCause();
}
final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(queueSize, jdkWebSocket);
if (t instanceof java.net.http.WebSocketHandshakeException) {
final java.net.http.HttpResponse<?> jdkResponse = ((java.net.http.WebSocketHandshakeException) t).getResponse();
final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse(
Expand Down
Expand Up @@ -18,6 +18,9 @@

import io.fabric8.kubernetes.client.http.BufferUtil;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -29,75 +32,70 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

class JdkWebSocketImpl implements WebSocket {
class JdkWebSocketImpl implements WebSocket, java.net.http.WebSocket.Listener {

static final class ListenerAdapter implements java.net.http.WebSocket.Listener {
private static final Logger LOG = LoggerFactory.getLogger(JdkWebSocketImpl.class);

private final Listener listener;
private final AtomicLong queueSize;
private final StringBuilder stringBuilder = new StringBuilder();
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final WritableByteChannel byteChannel = Channels.newChannel(byteArrayOutputStream);
private volatile java.net.http.WebSocket webSocket;
private final AtomicLong queueSize = new AtomicLong();
private final Listener listener;
private final StringBuilder stringBuilder = new StringBuilder();
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final WritableByteChannel byteChannel = Channels.newChannel(byteArrayOutputStream);
private final CompletableFuture<Void> terminated = new CompletableFuture<>();

ListenerAdapter(Listener listener, AtomicLong queueSize) {
this.listener = listener;
this.queueSize = queueSize;
}

@Override
public CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
try {
byteChannel.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (last) {
ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}

@Override
public CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
stringBuilder.append(data);
if (last) {
String value = stringBuilder.toString();
stringBuilder.setLength(0);
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}
public JdkWebSocketImpl(Listener listener) {
this.listener = listener;
}

@Override
public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
listener.onClose(new JdkWebSocketImpl(queueSize, webSocket), statusCode, reason);
return null;
@Override
public CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
try {
byteChannel.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false);
if (last) {
ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
listener.onMessage(this, value);
} else {
webSocket.request(1);
}
return null;
}

@Override
public void onOpen(java.net.http.WebSocket webSocket) {
@Override
public CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
stringBuilder.append(data);
if (last) {
String value = stringBuilder.toString();
stringBuilder.setLength(0);
listener.onMessage(this, value);
} else {
webSocket.request(1);
listener.onOpen(new JdkWebSocketImpl(queueSize, webSocket));
}
return null;
}

private java.net.http.WebSocket webSocket;
private AtomicLong queueSize;
@Override
public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
terminated.complete(null);
listener.onClose(this, statusCode, reason);
return null; // should immediately initiate an implicit sendClose
}

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
terminated.complete(null);
listener.onError(this, error);
}

public JdkWebSocketImpl(AtomicLong queueSize, java.net.http.WebSocket webSocket) {
this.queueSize = queueSize;
@Override
public void onOpen(java.net.http.WebSocket webSocket) {
this.webSocket = webSocket;
webSocket.request(1);
listener.onOpen(this);
}

@Override
Expand All @@ -106,29 +104,46 @@ public boolean send(ByteBuffer buffer) {
final int size = buffer.remaining();
queueSize.addAndGet(size);
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendBinary(buffer, true);
cf.whenComplete((b, t) -> queueSize.addAndGet(-size));
return asBoolean(cf);
if (cf.isDone()) {
queueSize.addAndGet(-size);
return !cf.isCompletedExceptionally();
}
cf.whenComplete((b, t) -> {
if (t != null) {
LOG.warn("Queued write did not succeed", t);
abort();
}
queueSize.addAndGet(-size);
});
return true;
}

/**
* If there is an illegalstateexception or other immediate failure, return
* false, otherwise it should have been enqueued
*/
private boolean asBoolean(CompletableFuture<java.net.http.WebSocket> cf) {
try {
cf.getNow(null);
return true;
} catch (Exception e) {
@Override
public synchronized boolean sendClose(int code, String reason) {
if (webSocket.isOutputClosed()) {
return false;
}
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendClose(code, reason == null ? "Closing" : reason);
cf = cf.whenComplete((w, t) -> {
if (t != null) {
abort();
} else if (w != null) {
webSocket.request(1); // there may not be demand, so request more
CompletableFuture<Void> future = Utils.schedule(Runnable::run, this::abort, 1, TimeUnit.MINUTES);
terminated.whenComplete((v, ignored) -> future.cancel(true));
}
});
return !cf.isCompletedExceptionally();
}

@Override
public boolean sendClose(int code, String reason) {
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendClose(code, reason == null ? "Closing" : reason);
// matches the behavior of the okhttp implementation and will ensure input closure after 1 minute
cf.thenRunAsync(() -> webSocket.abort(), CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES));
return asBoolean(cf);
private void abort() {
if (!webSocket.isOutputClosed() || !webSocket.isInputClosed()) {
LOG.warn("Aborting WebSocket due to a write error or failure with sendClose");
webSocket.abort();
if (terminated.complete(null)) {
listener.onClose(this, 1006, "Aborted the WebSocket");
}
}
}

@Override
Expand Down
Expand Up @@ -21,28 +21,35 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketResponse;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;
import io.fabric8.kubernetes.client.utils.Utils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.CloseException;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class JettyWebSocket implements WebSocket, WebSocketListener {

private static final Logger LOG = LoggerFactory.getLogger(JettyWebSocket.class);

private final WebSocket.Listener listener;
private final AtomicLong sendQueue;
private final Lock lock;
private final Condition backPressure;
private final AtomicBoolean closed;
private final CompletableFuture<Void> terminated = new CompletableFuture<>();
private boolean moreMessages;
private volatile Session webSocketSession;

Expand All @@ -51,13 +58,12 @@ public JettyWebSocket(WebSocket.Listener listener) {
sendQueue = new AtomicLong();
lock = new ReentrantLock();
backPressure = lock.newCondition();
closed = new AtomicBoolean();
moreMessages = true;
}

@Override
public boolean send(ByteBuffer buffer) {
if (closed.get() || !webSocketSession.isOpen()) {
if (terminated.isDone() || !webSocketSession.isOpen()) {
return false;
}
buffer = BufferUtil.copy(buffer);
Expand All @@ -67,6 +73,10 @@ public boolean send(ByteBuffer buffer) {
@Override
public void writeFailed(Throwable x) {
sendQueue.addAndGet(-size);
if (webSocketSession.isOpen()) {
LOG.warn("Queued write did not succeed", x);
}
webSocketSession.disconnect(); // prevent further writes
}

@Override
Expand All @@ -78,12 +88,24 @@ public void writeSuccess() {
}

@Override
public boolean sendClose(int code, String reason) {
if (webSocketSession.isOpen() && !closed.getAndSet(true)) {
webSocketSession.close(code, reason);
return true;
public synchronized boolean sendClose(int code, String reason) {
if (!webSocketSession.isOpen()) {
return false;
}
return false;
webSocketSession.close(code, reason, new WriteCallback() {
@Override
public void writeFailed(Throwable x) {
LOG.warn("Queued close did not succeed", x);
webSocketSession.disconnect(); // immediately terminate
}

@Override
public void writeSuccess() {
CompletableFuture<Void> future = Utils.schedule(Runnable::run, webSocketSession::disconnect, 1, TimeUnit.MINUTES);
terminated.whenComplete((v, ignored) -> future.cancel(true));
}
});
return true;
}

@Override
Expand Down Expand Up @@ -118,7 +140,7 @@ public void onWebSocketText(String message) {

@Override
public void onWebSocketClose(int statusCode, String reason) {
closed.set(true);
terminated.complete(null);
listener.onClose(this, statusCode, reason);
}

Expand All @@ -130,7 +152,8 @@ public void onWebSocketConnect(Session session) {

@Override
public void onWebSocketError(Throwable cause) {
if (cause instanceof ClosedChannelException && closed.get()) {
boolean completed = terminated.complete(null);
if (cause instanceof ClosedChannelException && !completed) {
// TODO: Check better
// It appears to be a race condition in Jetty:
// - The server sends a close frame (but we haven't received it)
Expand All @@ -140,7 +163,10 @@ public void onWebSocketError(Throwable cause) {
// - Jetty throws a ClosedChannelException
return;
}
listener.onError(this, cause, cause instanceof IOException);
if (cause instanceof CloseException) {
cause = new ProtocolException().initCause(cause);
}
listener.onError(this, cause);
}

private void backPressure() {
Expand Down

0 comments on commit 1c3baf9

Please sign in to comment.