Skip to content

Commit

Permalink
fix: Block writer thread if response output buffer is full (#5386)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed May 19, 2020
1 parent fb838fe commit 0edda40
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@

package io.confluent.ksql.api.server;

import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;

/*
Expand All @@ -41,7 +45,7 @@ public void write(final int b) {
}

@Override
public void write(final @NotNull byte[] bytes, final int offset, final int length) {
public synchronized void write(final @NotNull byte[] bytes, final int offset, final int length) {
Objects.requireNonNull(bytes);
if ((offset < 0) || (offset > bytes.length)) {
throw new IndexOutOfBoundsException();
Expand All @@ -55,13 +59,29 @@ public void write(final @NotNull byte[] bytes, final int offset, final int lengt
final byte[] bytesToWrite = new byte[length];
System.arraycopy(bytes, offset, bytesToWrite, 0, length);
final Buffer buffer = Buffer.buffer(bytesToWrite);
blockIfWriteQueueFull();
response.write(buffer);
}

@Override
public void close() {
response.end();
}

private void blockIfWriteQueueFull() {
VertxUtils.checkIsWorker();
if (response.writeQueueFull()) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
response.drainHandler(v -> cf.complete(null));
try {
cf.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
// Very slow consumers will result in a timeout, this will cause the push query to be closed
throw new KsqlException(e);
}
}
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ private void handleQueryRequest(final RoutingContext routingContext) {
final CompletableFuture<Void> connectionClosedFuture = new CompletableFuture<>();
routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null));

routingContext.response().setWriteQueueMaxSize(200 * 1024);

handleOldApiRequest(server, routingContext, KsqlRequest.class,
(request, apiSecurityContext) ->
endpoints
Expand Down

0 comments on commit 0edda40

Please sign in to comment.