-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: concurrency bug when doing high volume pull query over java client #10075
Conversation
Fix for a race inside vert.x’s `RecordParser`. Two threads are hitting `RecordParserImpl.handleParsing`, one is entering from upstream via `RecordParser.handle`, and one thread is coming from the downstream via `ReadStream.resume`. Once our `PullQueryWriteStream` is half empty, we call the drain handler, which calls `RecordParser.resume`, which synchronously starts parsing new records from the buffer. `RecordParser.handleParsing` is not thread-safe. The cause is that the monitor inside `PullQueryWriteStream` is insufficient to protect from races inside the `RecordParser`. This fix uses a separate monitor (the object monitor of the HTTP response object) to prevent races inside the `RecordParser`. To implement the fix, we define a shim wrapper around `RecordParser`, which makes sure to acquire the monitor both on `handle` and on `resume`. In the future, we may want to fix this by changing the way the drain handler is called.
|
1 similar comment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucasbru Thanks for the fix!
Here my feedback!
} finally { | ||
monitor.leave(); | ||
} | ||
if (isDone() || size() <= queueCapacity / 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove the monitor guard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly, it's not clear that we need it, because the drain handler doesn't modify the state of the WriteStream - it only modifies the state of the RecordParser, and the RecordParser now has a separate monitor.
More importantly, the monitor now can cause a deadlock:
RecordParser.handle
acquires the new monitor that I introduced, and subsequently callsPullQueryWriteStream.handle
, which attempts to acquire themonitor
in this context.PullQueryWriteStream.pollRow
acquiresmonitor
and callsRecordParser.resume
which attempts to acquire the monitor that I introduced.
So we remove aquiring monitor
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the main original thought was that you might want to call the drain handler while the condition of it being half full was guaranteed to be true. It's true that if you are not holding a lock that the condition might not be true when you act on it, but the queue will never drop data, so worse case, you go over the soft limit. Looking through some other examples, I don't think they hold any locks either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below in the definition of drainHandler
, can you do something like this:
@Override
public PullQueryWriteStream drainHandler(final Handler<Void> handler) {
Context context = Vertx.currentContext();
drainHandler.add(v -> {
context.runOnContext(handler);
});
return this;
}
This might be worth trying before doing the synchronization. I believe with the existing synchronization in this class protecting internal state from the write calls and polls from different threads, and then the callbacks always happening on the same Vertx thread, it would also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna and I tried running it on the context of the Read Stream, but failed to fix it this way. The problem was that the pull queries would not make progress, and I couldn't determine what was causing it. Today I gave this another look, and I was able to figure it our -- we are calling the drain handler wayyyy to often, so we are clogging the netty event loop completely with drain handler calls. The pull queries make progress, but the netty event loops are essentially at 100% CPU just going through drain handler calls. I think the correct fix is to call the drain handler only exactly once - the pipe implementation will re-register its drain handler when the WriteStream
reaches capacity. @AlanConfluent could you have a look at the alternative fix here: #10077
if (isDone() || size() <= queueCapacity / 2) { | ||
drainHandler.forEach(h -> h.handle(null)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the drain handler is called too often also in the original code. As far as I understand the drain handler should be called to resume the source that is piped into the write stream (i.e. the http response) after is has been paused. Just because the size of the queue is less than half its capacity, it does not mean the http response has been paused, it could also be that the response gets content slower.
During our investigation, I tried to call the drain handler only when the http response has been paused before and that has already reduced the error rate significantly but did not solve the issue, though.
What do you think of also reducing the calls to the drain handler?
Probably, we should not do it in this PR since for testing your fix we want to hit the issue often. Maybe let's consider it an optimization that we might or might not add afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think the whole way the drain handler is used is quite mysterious to me. It's called too often, it's protected by a monitor whose purpose is not clear, it's called from a different thread (other WriteStreams call it from write
, so basically from the same context).
I think you are right that it should be called less often, however this change is really to do the minimum change necessary. It's a good idea to try this in a separate PR.
ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/SyncronizedRecordParser.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-client/src/main/java/io/confluent/ksql/rest/client/SyncronizedRecordParser.java
Outdated
Show resolved
Hide resolved
There are some build errors. |
This is an interesting insight. It occurs to me that many of these pieces like I think we've broken that model here, as you've pointed out. There's the Vertx thread thread which is writing to the RecordParser, and then the Pull query thread which is pulling from the queue (and invoking the drain callback and calling resume). One solution would be to just invoke the handler using the same Vertx thread which registers the drain handler. That might be simpler than adding synchronization. |
} finally { | ||
monitor.leave(); | ||
} | ||
if (isDone() || size() <= queueCapacity / 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the main original thought was that you might want to call the drain handler while the condition of it being half full was guaranteed to be true. It's true that if you are not holding a lock that the condition might not be true when you act on it, but the queue will never drop data, so worse case, you go over the soft limit. Looking through some other examples, I don't think they hold any locks either.
} finally { | ||
monitor.leave(); | ||
} | ||
if (isDone() || size() <= queueCapacity / 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below in the definition of drainHandler
, can you do something like this:
@Override
public PullQueryWriteStream drainHandler(final Handler<Void> handler) {
Context context = Vertx.currentContext();
drainHandler.add(v -> {
context.runOnContext(handler);
});
return this;
}
This might be worth trying before doing the synchronization. I believe with the existing synchronization in this class protecting internal state from the write calls and polls from different threads, and then the callbacks always happening on the same Vertx thread, it would also work.
|
||
@Override | ||
public RecordParser resume() { | ||
synchronized (source) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ensures it's synchronized for anything sharing the same response object, but doesn't prevent concurrent calls with handle
. Are you trying to protect the state in the RecordParser delegate between calls to resume and handle from different threads? If so, I would think you would think they should both use delegate (or even source) as the lock, but not different ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm slightly worried about other internal uses of state in RecordParser
that could access state concurrently with these synchronized calls. It seems like you would have to audit the delegate implementation to know which methods require additional synchronization and potentially add them here.
} | ||
|
||
@Override | ||
public RecordParser exceptionHandler(final Handler<Throwable> handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all of these "fluid" calls that return a RecordParser
, wouldn't you want to return this rather than the underlying RecordParser
? Otherwise, they might circumvent your locking.
Description
Fix for a race inside vert.x’s
RecordParser
. Two threads are hittingRecordParserImpl.handleParsing
, one is entering from upstream viaRecordParser.handle
, and one thread is coming from the downstream viaReadStream.resume
. Once ourPullQueryWriteStream
is half empty, we call the drain handler, which callsRecordParser.resume
, which synchronously starts parsing new records from the buffer.RecordParser.handleParsing
is not thread-safe.The cause is that the monitor inside
PullQueryWriteStream
is insufficient to protect from races inside theRecordParser
. This fix uses a separate monitor (the object monitor of the HTTP response object) to prevent races inside theRecordParser
. To implement the fix, we define a shim wrapper aroundRecordParser
, which makes sure to acquire the monitor both onhandle
and onresume
.In the future, we may want to fix this by changing the way the drain handler is called.
Testing done
No behavior changes in existing integration tests. Tested locally.
Reviewer checklist