Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrency bug when doing high volume pull query over java client (alternative) #10077

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.RowMetadata;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.streams.WriteStream;
Expand Down Expand Up @@ -97,12 +99,6 @@ public PullQueryWriteStream(
) {
this.queryLimit = queryLimit;
this.translator = translator;

// register a drainHandler that will wake up anyone waiting on hasCapacity
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this is necessary, since every time we call the drain handlers, we anyways acquire the monitor.

drainHandler.add(ignored -> {
monitor.enter();
monitor.leave();
});
}

private static final class HandledRow {
Expand Down Expand Up @@ -211,6 +207,7 @@ private PullQueryRow pollRow() {
if (monitor.enterIf(atHalfCapacity)) {
try {
drainHandler.forEach(h -> h.handle(null));
drainHandler.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like other implementors only allow one handler at a time, and they just clobber any previous one rather than keeping a list, but I couldn't find good documentation on this.

I also think proper callers also reregister after every callback they get, but I couldn't find a very good documentation of this either.

Ultimately, the only user of these APIs are the http response objects, so if we test it out with them, we're good.

} finally {
monitor.leave();
}
Expand Down Expand Up @@ -273,7 +270,8 @@ public PullQueryWriteStream exceptionHandler(final Handler<Throwable> handler) {

@Override
public PullQueryWriteStream drainHandler(final Handler<Void> handler) {
drainHandler.add(handler);
final Context context = Vertx.currentContext();
drainHandler.add(v -> context.runOnContext(handler));
return this;
}

Expand Down