New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5488] Make sure Disrupt queue start first, then insert records #7582
Conversation
@alexeykudinkin @zhangyue19921010 Could you please help to take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch @boneanxs !
We do have to make sure disruptor is set up before we are consuming records.
But do we need to have a synchronized check for each record consuming(performance concern)? Or we do a setup() step in BaseHoodieQueueBasedExecutor#execute
before startConsumingAsync
and startProducingAsync
Hi @alexeykudinkin what do u think :)
For example something like this
and
|
This fix won't do the synchronize check for each record, it will only block the consuming when the queue is not start yet. But I think adding a new |
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
Outdated
Show resolved
Hide resolved
typo in title |
This PR may relate to https://issues.apache.org/jira/browse/HUDI-5369 |
Gentle ping @alexeykudinkin |
We should do what @zhangyue19921010 suggested in his review
|
@@ -60,6 +61,10 @@ public long size() { | |||
|
|||
@Override | |||
public void insertRecord(I value) throws Exception { | |||
if (!isStart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this, it would throw anyway if the queue has not been started yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We assert in insertRecord that the queue is started
Here just assert the queue is started, in case others use this queue without calling the start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs we need to keep in mind that this lies on the hot-path of being executed for every record, so we should keep the bar really high in terms of what we allow to be executed for every record.
That being said, i also see your point of adding it to make sure it would fail definitively unless queue has been started
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it, please rename it to isStarted
@hudi-bot run azure |
1 similar comment
@hudi-bot run azure |
0f230ae
to
a94ec9c
Compare
@hudi-bot run azure |
1 similar comment
@hudi-bot run azure |
@@ -60,6 +61,10 @@ public long size() { | |||
|
|||
@Override | |||
public void insertRecord(I value) throws Exception { | |||
if (!isStart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs we need to keep in mind that this lies on the hot-path of being executed for every record, so we should keep the bar really high in terms of what we allow to be executed for every record.
That being said, i also see your point of adding it to make sure it would fail definitively unless queue has been started
@@ -60,6 +61,10 @@ public long size() { | |||
|
|||
@Override | |||
public void insertRecord(I value) throws Exception { | |||
if (!isStart) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it, please rename it to isStarted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We can land it after this nit(Alex mentioned) changed. Thanks for your contribution!
Landing this since there's no material changes since last revision, and CI passed previously |
…pache#7582) We must to make sure to set up Disruptor's queue first, then producer can insert records to the queue. But currently we have no idea which thread start first, so this pr tries to fix it.
…pache#7582) We must to make sure to set up Disruptor's queue first, then producer can insert records to the queue. But currently we have no idea which thread start first, so this pr tries to fix it.
Change Logs
Describe context and summary for this change. Highlight if any code was copied.
We must to make sure to set up Disruptor's queue first, then producer can insert records to the queue. But currently we have no idea which thread start first, so this pr tries to fix it.
Also, I think the test
TestDisruptorExecutionInSpark#testExecutor
andTestDisruptorMessageQueue#testRecordReading
failures relate to this bug.Impact
Describe any public API or user-facing feature change or any performance impact.
none
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
none
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist