Skip to content

Commit

Permalink
fix #4891: removing the backpressure for close
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Feb 22, 2023
1 parent b83927f commit f344495
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Fix #4823: (java-generator) handle special characters in field names
* Fix #4723: [java-generator] Fix a race in the use of JavaParser hitting large CRDs
* Fix #4885: addresses a potential hang in the jdk client with exec stream reading
* Fix #4891: address vertx not completely reading exec streams

#### Improvements
* Fix #4675: adding a fully client side timeout for informer watches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public void onWebSocketText(String message) {
@Override
public void onWebSocketClose(int statusCode, String reason) {
closed.set(true);
backPressure();
listener.onClose(this, statusCode, reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ private void request() {

@Override
public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) {
awaitMoreRequest();
listener.onClose(new OkHttpWebSocketImpl(webSocket, this::request), code, reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@ default void onOpen(WebSocket webSocket) {

/**
* Called once the full text message has been built. {@link WebSocket#request()} must
* be called to receive more messages or onClose.
* be called to receive more messages.
*/
default void onMessage(WebSocket webSocket, String text) {
webSocket.request();
}

/**
* Called once the full binary message has been built. {@link WebSocket#request()} must
* be called to receive more messages or onClose.
* be called to receive more messages.
*/
default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
webSocket.request();
}

/**
* Called when the remote input closes. It's a terminal event, calls to {@link WebSocket#request()}
* do nothing after this.
* do nothing after this. Some {@link HttpClient} implementations will require {@link WebSocket#request()}
* to be called to calling onClose.
*/
default void onClose(WebSocket webSocket, int code, String reason) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ private void asyncWrite(WritableByteChannel channel, ByteBuffer b) {

@Override
public void close() {
if (closed.get()) {
return;
}
// simply sends a close, which will shut down the output
// it's expected that the server will respond with a close, but if not the input will be shutdown implicitly
closeWebSocketOnce(1000, "Closing...");
Expand All @@ -226,10 +229,6 @@ private void cleanUpOnce() {
}

private void closeWebSocketOnce(int code, String reason) {
if (closed.get()) {
return;
}

try {
WebSocket ws = webSocketRef.get();
if (ws != null) {
Expand Down Expand Up @@ -260,6 +259,7 @@ public void onOpen(WebSocket webSocket) {

@Override
public void onError(WebSocket webSocket, Throwable t) {
closed.set(true);
HttpResponse<?> response = null;

try {
Expand Down Expand Up @@ -358,22 +358,24 @@ private void handleExitStatus(ByteBuffer bytes) {

@Override
public void onClose(WebSocket webSocket, int code, String reason) {
if (!exitCode.isDone()) {
exitCode.complete(null);
}
closeWebSocketOnce(code, reason);
//If we already called onClosed() or onFailed() before, we need to abort.
if (!closed.compareAndSet(false, true)) {
return;
}
closeWebSocketOnce(code, reason);
LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", code, reason);
try {
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
serialExecutor.execute(() -> {
try {
if (!exitCode.isDone()) {
exitCode.complete(null);
}
cleanUpOnce();
} finally {
if (listener != null) {
listener.onClose(code, reason);
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan
});

return new PortForward() {
private final AtomicBoolean closed = new AtomicBoolean();

@Override
public void close() {
socket.cancel(true);
if (!closed.compareAndSet(false, true)) {
return;
}
socket.whenComplete((w, t) -> {
if (w != null) {
listener.closeBothWays(w, 1001, "User closing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,17 @@ void closeBothWays(WebSocket webSocket, int code, String message) {
}

private void closeForwarder() {
alive.set(false);
if (in != null) {
Utils.closeQuietly(in);
}
if (out != null && out != in) {
Utils.closeQuietly(out);
}
pumperService.shutdownNow();
serialExecutor.shutdownNow();
serialExecutor.execute(() -> {
alive.set(false);
if (in != null) {
Utils.closeQuietly(in);
}
if (out != null && out != in) {
Utils.closeQuietly(out);
}
pumperService.shutdownNow();
serialExecutor.shutdownNow();
});
}

private static void pipe(ReadableByteChannel in, WebSocket webSocket, BooleanSupplier isAlive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.utils.CommonThreadPool;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -122,8 +123,8 @@ void onClose_shouldCloseChannels() {
listener = new PortForwarderWebsocketListener(in, out, CommonThreadPool.get());
listener.onClose(webSocket, 1337, "Test ended");
// Then
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !in.isOpen());
assertThat(listener.getServerThrowables()).isEmpty();
assertThat(in.isOpen()).isFalse();
assertThat(out.isOpen()).isFalse();
}

Expand Down Expand Up @@ -159,6 +160,7 @@ void onMessage_withEmptyMessage_shouldEndWithError() {
listener.onMessage(webSocket, "SKIP 1");
// Then
verify(webSocket, timeout(10_000)).sendClose(1002, "Protocol error");
await().atMost(10, TimeUnit.SECONDS).until(() -> !listener.isAlive());
assertThat(outputContent.toString()).isEmpty();
assertThat(listener.errorOccurred()).isTrue();
assertThat(listener.getServerThrowables()).isNotEmpty();
Expand Down Expand Up @@ -208,6 +210,7 @@ void onMessage_withWrongChannel_shouldLogAndEndWithError() {
listener.onMessage(webSocket, "SKIP 1");
// Then
verify(webSocket, timeout(10_000)).sendClose(1002, "Protocol error");
await().atMost(10, TimeUnit.SECONDS).until(() -> !listener.isAlive());
assertThat(outputContent.toString()).isEmpty();
assertThat(listener.errorOccurred()).isTrue();
assertThat(listener.getServerThrowables().iterator().next().getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void setUp() {
BaseClient client = mock(BaseClient.class, Mockito.RETURNS_SELF);
Mockito.when(client.adapt(BaseClient.class).getExecutor()).thenReturn(CommonThreadPool.get());
Config config = mock(Config.class, Mockito.RETURNS_DEEP_STUBS);
when(config.getRequestConfig().getUploadRequestTimeout()).thenReturn(10);
when(config.getMasterUrl()).thenReturn("https://openshift.com:8443");
when(config.getNamespace()).thenReturn("default");
when(client.getConfiguration()).thenReturn(config);
Expand Down

0 comments on commit f344495

Please sign in to comment.