Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrency bug when doing high volume pull query over java client #10075

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,8 @@ private PullQueryRow pollRow() {

polled.handler.handle(new SucceededFuture<>(null, null));

if (monitor.enterIf(atHalfCapacity)) {
try {
drainHandler.forEach(h -> h.handle(null));
} finally {
monitor.leave();
}
if (isDone() || size() <= queueCapacity / 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the monitor guard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, it's not clear that we need it, because the drain handler doesn't modify the state of the WriteStream - it only modifies the state of the RecordParser, and the RecordParser now has a separate monitor.

More importantly, the monitor now can cause a deadlock:

  • RecordParser.handle acquires the new monitor that I introduced, and subsequently calls PullQueryWriteStream.handle, which attempts to acquire the monitor in this context.
  • PullQueryWriteStream.pollRow acquires monitor and calls RecordParser.resume which attempts to acquire the monitor that I introduced.

So we remove aquiring monitor here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the main original thought was that you might want to call the drain handler while the condition of it being half full was guaranteed to be true. It's true that if you are not holding a lock that the condition might not be true when you act on it, but the queue will never drop data, so worse case, you go over the soft limit. Looking through some other examples, I don't think they hold any locks either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below in the definition of drainHandler, can you do something like this:

@Override
  public PullQueryWriteStream drainHandler(final Handler<Void> handler) {
    Context context = Vertx.currentContext();
    drainHandler.add(v -> {
      context.runOnContext(handler);
    });
    return this;
  }

This might be worth trying before doing the synchronization. I believe with the existing synchronization in this class protecting internal state from the write calls and polls from different threads, and then the callbacks always happening on the same Vertx thread, it would also work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna and I tried running it on the context of the Read Stream, but failed to fix it this way. The problem was that the pull queries would not make progress, and I couldn't determine what was causing it. Today I gave this another look, and I was able to figure it our -- we are calling the drain handler wayyyy to often, so we are clogging the netty event loop completely with drain handler calls. The pull queries make progress, but the netty event loops are essentially at 100% CPU just going through drain handler calls. I think the correct fix is to call the drain handler only exactly once - the pipe implementation will re-register its drain handler when the WriteStream reaches capacity. @AlanConfluent could you have a look at the alternative fix here: #10077

drainHandler.forEach(h -> h.handle(null));
}
Comment on lines +211 to 213
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the drain handler is called too often also in the original code. As far as I understand the drain handler should be called to resume the source that is piped into the write stream (i.e. the http response) after is has been paused. Just because the size of the queue is less than half its capacity, it does not mean the http response has been paused, it could also be that the response gets content slower.
During our investigation, I tried to call the drain handler only when the http response has been paused before and that has already reduced the error rate significantly but did not solve the issue, though.
What do you think of also reducing the calls to the drain handler?
Probably, we should not do it in this PR since for testing your fix we want to hit the issue often. Maybe let's consider it an optimization that we might or might not add afterwards.

Copy link
Member Author

@lucasbru lucasbru Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the whole way the drain handler is used is quite mysterious to me. It's called too often, it's protected by a monitor whose purpose is not clear, it's called from a different thread (other WriteStreams call it from write, so basically from the same context).

I think you are right that it should be called less often, however this change is really to do the minimum change necessary. It's a good idea to try this in a separate PR.


return polled.row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ private <R, T> RestResponse<R> executeRequestSync(
return executeSync(httpMethod, path, Optional.empty(), requestBody,
resp -> responseSupplier.get(),
(resp, vcf) -> {
final RecordParser recordParser = RecordParser.newDelimited(delimiter, resp);
// Use synchronized record parser to prevent races between the `RecordParser.resume` called
// by `PullQueryWriteStream` via the drain handler of the pipe, and `RecordParser.handle`.
// We may want to refactor PullQueryWriteStream to not call the drain handler from
// another thread.
final RecordParser recordParser = SyncronizedRecordParser.newDelimited(delimiter, resp);
final AtomicBoolean end = new AtomicBoolean(false);

final WriteStream<Buffer> ws = new BufferMapWriteStream<>(chunkMapper, chunkHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.confluent.ksql.rest.client;
lucasbru marked this conversation as resolved.
Show resolved Hide resolved

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;

@SuppressWarnings("unused")
public class SyncronizedRecordParser implements RecordParser {
lucasbru marked this conversation as resolved.
Show resolved Hide resolved

private static class WrappedUpstream implements ReadStream<Buffer> {

private final ReadStream<Buffer> delegate;

WrappedUpstream(final ReadStream<Buffer> delegate) {
this.delegate = delegate;
}

@Override
public ReadStream<Buffer> exceptionHandler(@Nullable final Handler<Throwable> handler) {
return delegate.exceptionHandler(handler);
}

@Override
@Fluent
public ReadStream<Buffer> handler(@Nullable final Handler<Buffer> handler) {
return delegate.handler(bf -> {
synchronized (delegate) {
handler.handle(bf);
}
});
}

@Override
@Fluent
public ReadStream<Buffer> pause() {
return delegate.pause();
}

@Override
@Fluent
public ReadStream<Buffer> resume() {
return delegate.resume();
}

@Override
@Fluent
public ReadStream<Buffer> fetch(final long l) {
return delegate.fetch(l);
}

@Override
@Fluent
public ReadStream<Buffer> endHandler(@Nullable final Handler<Void> handler) {
return delegate.endHandler(handler);
}
}

private final RecordParser delegate;
private final ReadStream<Buffer> source;

public SyncronizedRecordParser(final RecordParser delegate, final ReadStream<Buffer> source) {
this.delegate = delegate;
this.source = source;
}

@Override
public void setOutput(final Handler<Buffer> handler) {
delegate.setOutput(handler);
}

public static RecordParser newDelimited(final String delim, final ReadStream<Buffer> stream) {
return new SyncronizedRecordParser(
RecordParser.newDelimited(delim, new WrappedUpstream(stream)),
stream
);
}

@Override
public void delimitedMode(final String s) {
delegate.delimitedMode(s);
}

@Override
public void delimitedMode(final Buffer buffer) {
delegate.delimitedMode(buffer);
}

@Override
public void fixedSizeMode(final int i) {
delegate.fixedSizeMode(i);
}

@Override
@Fluent
public RecordParser maxRecordSize(final int i) {
return delegate.maxRecordSize(i);
}

@Override
public void handle(final Buffer buffer) {
delegate.handle(buffer);
}

@Override
public RecordParser exceptionHandler(final Handler<Throwable> handler) {
return delegate.exceptionHandler(handler);
}

@Override
public RecordParser handler(final Handler<Buffer> handler) {
return delegate.handler(handler);
}

@Override
public RecordParser pause() {
return delegate.pause();
}

@Override
public RecordParser fetch(final long l) {
return delegate.fetch(l);
}

@Override
public RecordParser resume() {
synchronized (source) {
return delegate.resume();
}
}

@Override
public RecordParser endHandler(final Handler<Void> handler) {
return delegate.endHandler(handler);
}
}