Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6134] Prevent two clean run concurrently in flink #8568

Conversation

hbgstc123
Copy link
Contributor

Change Logs

prevent two clean run concurrently in flink.

Impact

no

Risk level (write none, low medium or high below)

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405
Copy link
Contributor

Can you elaborate the details a little, the #open method can only be invoked by 1 times?

@hbgstc123
Copy link
Contributor Author

Can you elaborate the details a little, the #open method can only be invoked by 1 times?

If clean called in #open take long time, snapshotState() can trigger another clean, which will run together with the one trigger in #open

@danny0405
Copy link
Contributor

Can you elaborate the details a little, the #open method can only be invoked by 1 times?

If clean called in #open take long time, snapshotState() can trigger another clean, which will run together with the one trigger in #open

If the operator is not opened, how could it execute the #snapshotState()?

@hbgstc123
Copy link
Contributor Author

If the operator is not opened, how could it execute the #snapshotState()?

In #open clean is run asynchronously in NonThrownExecutor, so #open can finish right away before clean finished.

this.isCleaning = true;
try {
this.writeClient.clean(instantTime);
} catch (Throwable throwable) {
Copy link
Contributor

@danny0405 danny0405 Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to execute the cleaning no matter whether async cleaning is true or false.
And this is a NonThrownExecutor, there is no need to catch the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Removed the catch clause and ASYNC_ENABLED condition, and add !isCleaning condition before clean function calls in subclass ClusteringCommitSink and CompactionCommitSink.
Pls take another look, thanks

@hbgstc123 hbgstc123 force-pushed the HUDI-6134_prevent_clean_run_concurrently_in_flink branch from 6926fef to 708a1e0 Compare April 26, 2023 02:29
@@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List<
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);

// whether to clean up the input base parquet files used for clustering
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
LOG.info("Running inline clean");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the matter if there are multiple cleaning tasks runing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple cleaning tasks maybe running the same clean instant, it's unnecessary.
And now multi clean can leave .hoodie/20230425173418352.clean.tmp this tmp file in timeline because the slower clean task will fail to rename this tmp file to the final 20230425173418352.clean because it will be there created by an earlier clean task.
We can try fix the rename issue in HoodieActiveTimeline.transitionState too, delete the tmp file when rename is unsuccessful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple cleaning tasks maybe running the same clean instant, it's unnecessary.

That's not possible, cleaning service may have clean plan, one file can only belongs to one cleaning plan.

Copy link
Contributor Author

@hbgstc123 hbgstc123 May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I see the clean.tmp file in a flink write hudi job with online compaction,
recently I see the clean tmp file in timeline again in a flink job with offline compaction.
I write an issue about it:
#8726

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple cleaning tasks maybe running the same clean instant

Can you explain why the same cleaning instant is executed by multiple tasks? You mean the cleaning service would try to execute the existing pending cleaning instant? Can you show us the code there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some analysis I find that there is an option hoodie.clean.allow.multiple for multiple writer cleaning, and by default it is true.

I also find that the clean action executor would try to clean all the pending cleaning instants on the timeline if hoodie.clean.allow.multiple is enabled.

But in flink streaming, the cleaning is scheduled and executed in a single worker thread pool, that means at most 1 cleaning task is running for streaming pipeline.

But there is possibility for batch ingestion job, the #open method and #doCommit method can trigger the cleaning in separeate threads. Is that your case here?

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 changed the title [HUDI-6134] prevent two clean run concurrently in flink. [HUDI-6134] Prevent two clean run concurrently in flink. May 16, 2023
@danny0405 danny0405 changed the title [HUDI-6134] Prevent two clean run concurrently in flink. [HUDI-6134] Prevent two clean run concurrently in flink May 16, 2023
@danny0405 danny0405 merged commit 3e788ba into apache:master May 16, 2023
17 checks passed
yihua pushed a commit to yihua/hudi that referenced this pull request May 17, 2023
Co-authored-by: hbg <bingeng.huang@shopee.com>
yihua pushed a commit to yihua/hudi that referenced this pull request May 17, 2023
Co-authored-by: hbg <bingeng.huang@shopee.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

None yet

3 participants