diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ResponseOutputStream.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ResponseOutputStream.java index 59bb15fd1abd..a36b1ed93565 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ResponseOutputStream.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ResponseOutputStream.java @@ -15,10 +15,14 @@ package io.confluent.ksql.api.server; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.VertxUtils; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerResponse; import java.io.OutputStream; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.jetbrains.annotations.NotNull; /* @@ -41,7 +45,7 @@ public void write(final int b) { } @Override - public void write(final @NotNull byte[] bytes, final int offset, final int length) { + public synchronized void write(final @NotNull byte[] bytes, final int offset, final int length) { Objects.requireNonNull(bytes); if ((offset < 0) || (offset > bytes.length)) { throw new IndexOutOfBoundsException(); @@ -55,6 +59,7 @@ public void write(final @NotNull byte[] bytes, final int offset, final int lengt final byte[] bytesToWrite = new byte[length]; System.arraycopy(bytes, offset, bytesToWrite, 0, length); final Buffer buffer = Buffer.buffer(bytesToWrite); + blockIfWriteQueueFull(); response.write(buffer); } @@ -62,6 +67,21 @@ public void write(final @NotNull byte[] bytes, final int offset, final int lengt public void close() { response.end(); } + + private void blockIfWriteQueueFull() { + VertxUtils.checkIsWorker(); + if (response.writeQueueFull()) { + final CompletableFuture cf = new CompletableFuture<>(); + response.drainHandler(v -> cf.complete(null)); + try { + cf.get(60, TimeUnit.SECONDS); + } catch (Exception e) { + // Very slow consumers will result in a timeout, this will cause the push query to be closed + throw new KsqlException(e); + } + } + } + } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java index 4d9264d8ecb1..8d1e74d8a025 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerVerticle.java @@ -226,6 +226,8 @@ private void handleQueryRequest(final RoutingContext routingContext) { final CompletableFuture connectionClosedFuture = new CompletableFuture<>(); routingContext.request().connection().closeHandler(v -> connectionClosedFuture.complete(null)); + routingContext.response().setWriteQueueMaxSize(200 * 1024); + handleOldApiRequest(server, routingContext, KsqlRequest.class, (request, apiSecurityContext) -> endpoints