Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #5152: expanding the error detection #5153

Merged
merged 8 commits into from
May 29, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
* Fix #5125: TLS 1.3 only should be supported
* Fix #5126: fallback to changeit only if null/empty does not work
* Fix #5145: [java-generator] handle `additionalProperties: true` emitting a field of type `AnyType`
* Fix #5152: preventing JDK WebSocket errors from terminating watches and improving watch termination and its logging
* Fix #5164: [java-generator] handle more special characters in field names
* Fix #5125: TLS 1.3 only should be supported

#### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -316,17 +315,15 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(
newBuilder.connectTimeout(timeout);
}

AtomicLong queueSize = new AtomicLong();

// use a responseholder to convey both the exception and the websocket
CompletableFuture<WebSocketResponse> response = new CompletableFuture<>();

URI uri = WebSocket.toWebSocketUri(request.uri());
newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((jdkWebSocket, t) -> {
final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(listener);
newBuilder.buildAsync(uri, fabric8WebSocket).whenComplete((jdkWebSocket, t) -> {
if (t instanceof CompletionException && t.getCause() != null) {
t = t.getCause();
}
final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(queueSize, jdkWebSocket);
if (t instanceof java.net.http.WebSocketHandshakeException) {
final java.net.http.HttpResponse<?> jdkResponse = ((java.net.http.WebSocketHandshakeException) t).getResponse();
final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import io.fabric8.kubernetes.client.http.BufferUtil;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -29,75 +32,70 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

class JdkWebSocketImpl implements WebSocket {
class JdkWebSocketImpl implements WebSocket, java.net.http.WebSocket.Listener {

static final class ListenerAdapter implements java.net.http.WebSocket.Listener {
private static final Logger LOG = LoggerFactory.getLogger(JdkWebSocketImpl.class);

private final Listener listener;
private final AtomicLong queueSize;
private final StringBuilder stringBuilder = new StringBuilder();
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final WritableByteChannel byteChannel = Channels.newChannel(byteArrayOutputStream);
private volatile java.net.http.WebSocket webSocket;
private final AtomicLong queueSize = new AtomicLong();
private final Listener listener;
private final StringBuilder stringBuilder = new StringBuilder();
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final WritableByteChannel byteChannel = Channels.newChannel(byteArrayOutputStream);
private final CompletableFuture<Void> terminated = new CompletableFuture<>();

ListenerAdapter(Listener listener, AtomicLong queueSize) {
this.listener = listener;
this.queueSize = queueSize;
}

@Override
public CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
try {
byteChannel.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (last) {
ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}

@Override
public CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
stringBuilder.append(data);
if (last) {
String value = stringBuilder.toString();
stringBuilder.setLength(0);
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}
public JdkWebSocketImpl(Listener listener) {
this.listener = listener;
}

@Override
public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
listener.onClose(new JdkWebSocketImpl(queueSize, webSocket), statusCode, reason);
return null;
@Override
public CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer data, boolean last) {
try {
byteChannel.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false);
if (last) {
ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
listener.onMessage(this, value);
} else {
webSocket.request(1);
}
return null;
}

@Override
public void onOpen(java.net.http.WebSocket webSocket) {
@Override
public CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
stringBuilder.append(data);
if (last) {
String value = stringBuilder.toString();
stringBuilder.setLength(0);
listener.onMessage(this, value);
} else {
webSocket.request(1);
listener.onOpen(new JdkWebSocketImpl(queueSize, webSocket));
}
return null;
}

private java.net.http.WebSocket webSocket;
private AtomicLong queueSize;
@Override
public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) {
terminated.complete(null);
listener.onClose(this, statusCode, reason);
return null; // should immediately initiate an implicit sendClose
}

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
terminated.complete(null);
listener.onError(this, error);
}

public JdkWebSocketImpl(AtomicLong queueSize, java.net.http.WebSocket webSocket) {
this.queueSize = queueSize;
@Override
public void onOpen(java.net.http.WebSocket webSocket) {
this.webSocket = webSocket;
webSocket.request(1);
listener.onOpen(this);
}

@Override
Expand All @@ -106,29 +104,46 @@ public boolean send(ByteBuffer buffer) {
final int size = buffer.remaining();
queueSize.addAndGet(size);
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendBinary(buffer, true);
cf.whenComplete((b, t) -> queueSize.addAndGet(-size));
return asBoolean(cf);
if (cf.isDone()) {
queueSize.addAndGet(-size);
return !cf.isCompletedExceptionally();
}
cf.whenComplete((b, t) -> {
if (t != null) {
LOG.warn("Queued write did not succeed", t);
abort();
}
queueSize.addAndGet(-size);
});
return true;
}

/**
* If there is an illegalstateexception or other immediate failure, return
* false, otherwise it should have been enqueued
*/
private boolean asBoolean(CompletableFuture<java.net.http.WebSocket> cf) {
try {
cf.getNow(null);
return true;
} catch (Exception e) {
@Override
public synchronized boolean sendClose(int code, String reason) {
if (webSocket.isOutputClosed()) {
return false;
}
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendClose(code, reason == null ? "Closing" : reason);
cf = cf.whenComplete((w, t) -> {
if (t != null) {
abort();
} else if (w != null) {
webSocket.request(1); // there may not be demand, so request more
CompletableFuture<Void> future = Utils.schedule(Runnable::run, this::abort, 1, TimeUnit.MINUTES);
terminated.whenComplete((v, ignored) -> future.cancel(true));
}
});
return !cf.isCompletedExceptionally();
}

@Override
public boolean sendClose(int code, String reason) {
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendClose(code, reason == null ? "Closing" : reason);
// matches the behavior of the okhttp implementation and will ensure input closure after 1 minute
cf.thenRunAsync(() -> webSocket.abort(), CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES));
return asBoolean(cf);
private void abort() {
if (!webSocket.isOutputClosed() || !webSocket.isInputClosed()) {
LOG.warn("Aborting WebSocket due to a write error or failure with sendClose");
webSocket.abort();
if (terminated.complete(null)) {
listener.onClose(this, 1006, "Aborted the WebSocket");
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,35 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketResponse;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;
import io.fabric8.kubernetes.client.utils.Utils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.CloseException;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class JettyWebSocket implements WebSocket, WebSocketListener {

private static final Logger LOG = LoggerFactory.getLogger(JettyWebSocket.class);

private final WebSocket.Listener listener;
private final AtomicLong sendQueue;
private final Lock lock;
private final Condition backPressure;
private final AtomicBoolean closed;
private final CompletableFuture<Void> terminated = new CompletableFuture<>();
private boolean moreMessages;
private volatile Session webSocketSession;

Expand All @@ -51,13 +58,12 @@ public JettyWebSocket(WebSocket.Listener listener) {
sendQueue = new AtomicLong();
lock = new ReentrantLock();
backPressure = lock.newCondition();
closed = new AtomicBoolean();
moreMessages = true;
}

@Override
public boolean send(ByteBuffer buffer) {
if (closed.get() || !webSocketSession.isOpen()) {
if (terminated.isDone() || !webSocketSession.isOpen()) {
return false;
}
buffer = BufferUtil.copy(buffer);
Expand All @@ -67,6 +73,10 @@ public boolean send(ByteBuffer buffer) {
@Override
public void writeFailed(Throwable x) {
sendQueue.addAndGet(-size);
if (webSocketSession.isOpen()) {
LOG.warn("Queued write did not succeed", x);
}
webSocketSession.disconnect(); // prevent further writes
}

@Override
Expand All @@ -78,12 +88,24 @@ public void writeSuccess() {
}

@Override
public boolean sendClose(int code, String reason) {
if (webSocketSession.isOpen() && !closed.getAndSet(true)) {
webSocketSession.close(code, reason);
return true;
public synchronized boolean sendClose(int code, String reason) {
if (!webSocketSession.isOpen()) {
return false;
}
return false;
webSocketSession.close(code, reason, new WriteCallback() {
@Override
public void writeFailed(Throwable x) {
LOG.warn("Queued close did not succeed", x);
webSocketSession.disconnect(); // immediately terminate
}

@Override
public void writeSuccess() {
CompletableFuture<Void> future = Utils.schedule(Runnable::run, webSocketSession::disconnect, 1, TimeUnit.MINUTES);
terminated.whenComplete((v, ignored) -> future.cancel(true));
}
});
return true;
}

@Override
Expand Down Expand Up @@ -118,7 +140,7 @@ public void onWebSocketText(String message) {

@Override
public void onWebSocketClose(int statusCode, String reason) {
closed.set(true);
terminated.complete(null);
listener.onClose(this, statusCode, reason);
}

Expand All @@ -130,7 +152,8 @@ public void onWebSocketConnect(Session session) {

@Override
public void onWebSocketError(Throwable cause) {
if (cause instanceof ClosedChannelException && closed.get()) {
boolean completed = terminated.complete(null);
if (cause instanceof ClosedChannelException && !completed) {
// TODO: Check better
// It appears to be a race condition in Jetty:
// - The server sends a close frame (but we haven't received it)
Expand All @@ -140,7 +163,10 @@ public void onWebSocketError(Throwable cause) {
// - Jetty throws a ClosedChannelException
return;
}
listener.onError(this, cause, cause instanceof IOException);
if (cause instanceof CloseException) {
cause = new ProtocolException().initCause(cause);
}
listener.onError(this, cause);
}

private void backPressure() {
Expand Down