Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrency bug when doing high volume pull query over java client (alternative) #10077

Merged
merged 2 commits into from
Oct 6, 2023

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Oct 5, 2023

Description

Fix for a race inside vert.x’s RecordParser. Two threads are hitting RecordParserImpl.handleParsing, one is entering from upstream via RecordParser.handle, and one thread is coming from the downstream via ReadStream.resume. Once our PullQueryWriteStream is half empty, we call the drain handler, which calls RecordParser.resume, which synchronously starts parsing new records from the buffer. RecordParser.handleParsing is not thread-safe.

The cause is that the monitor inside PullQueryWriteStream is insufficient to protect from races inside the RecordParser. This makes two changes:

  • We run the drain handler as part of the ReadStream context, which ensures that it is race-free with other calls into RecordParser since it's executed as part of the same netty event loop.
  • To avoid clogging the event loop with a lot of drain handler calls, we make sure to only call each drain handler exactly once. The PipeImpl will re-register the drain handler when the WriteStream reaches capacity.

Testing done

No behavior changes in existing integration tests. Tested locally.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")
  • Do these changes have compatibility implications for rollback? If so, ensure that the ksql command version is bumped.

@lucasbru lucasbru requested a review from a team as a code owner October 5, 2023 11:51
@cla-assistant
Copy link

cla-assistant bot commented Oct 5, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

1 similar comment
@cla-assistant
Copy link

cla-assistant bot commented Oct 5, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@@ -97,12 +99,6 @@ public PullQueryWriteStream(
) {
this.queryLimit = queryLimit;
this.translator = translator;

// register a drainHandler that will wake up anyone waiting on hasCapacity
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this is necessary, since every time we call the drain handlers, we anyways acquire the monitor.

@lucasbru lucasbru changed the title fix: concurrency bug when doing high volume pull query over java client fix: concurrency bug when doing high volume pull query over java client (alternative) Oct 5, 2023
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -211,6 +207,7 @@ private PullQueryRow pollRow() {
if (monitor.enterIf(atHalfCapacity)) {
try {
drainHandler.forEach(h -> h.handle(null));
drainHandler.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like other implementors only allow one handler at a time, and they just clobber any previous one rather than keeping a list, but I couldn't find good documentation on this.

I also think proper callers also reregister after every callback they get, but I couldn't find a very good documentation of this either.

Ultimately, the only user of these APIs are the http response objects, so if we test it out with them, we're good.

@lucasbru lucasbru merged commit 019e061 into confluentinc:master Oct 6, 2023
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants