Skip to content

Commit

Permalink
fix #4885 fixing potential issues with jdk logwatch
Browse files Browse the repository at this point in the history
- there is a possibility that the buffer gets modified after the work is
queued
- there is a race condition between the done handling and the async
buffer handling.  If the executor is shutdown before the task starts
running the buffers will be lost.
  • Loading branch information
shawkins authored and manusa committed Feb 16, 2023
1 parent fb158cf commit f701b7b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.fabric8.kubernetes.client.http.AsyncBody;
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.BufferUtil;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.StandardHttpClient;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;

Expand Down Expand Up @@ -86,7 +88,9 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onNext(List<ByteBuffer> item) {
bodySubscriber.onNext(item);
// there doesn't seem to be a guarantee that the buffer won't be modified by the caller
// after passing it in, so we'll create a copy
bodySubscriber.onNext(item.stream().map(BufferUtil::copy).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,7 +45,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
private volatile InputStream output;

private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile Optional<AsyncBody> asyncBody = Optional.empty();
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
private final SerialExecutor serialExecutor;

public LogWatchCallback(OutputStream out, Executor executor) {
Expand All @@ -66,7 +65,7 @@ private void cleanUp() {
if (!closed.compareAndSet(false, true)) {
return;
}
asyncBody.ifPresent(AsyncBody::cancel);
asyncBody.thenAccept(AsyncBody::cancel);
serialExecutor.shutdownNow();
}

Expand Down Expand Up @@ -109,15 +108,15 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
onFailure(e);
}
if (a != null) {
asyncBody = Optional.of(a.body());
asyncBody.complete(a.body());
a.body().consume();
a.body().done().whenComplete((v, t) -> {
a.body().done().whenComplete((v, t) -> CompletableFuture.runAsync(() -> {
if (t != null) {
onFailure(t);
} else {
cleanUp();
}
});
}, serialExecutor));
}
});
}
Expand Down

0 comments on commit f701b7b

Please sign in to comment.