Skip to content

[HUDI-4329] Add separate control for Flink compaction operation sync/async mode#5991

Open
chenshzh wants to merge 1 commit intoapache:masterfrom
chenshzh:csz/HUDI-4329
Open

[HUDI-4329] Add separate control for Flink compaction operation sync/async mode#5991
chenshzh wants to merge 1 commit intoapache:masterfrom
chenshzh:csz/HUDI-4329

Conversation

@chenshzh
Copy link
Contributor

@chenshzh chenshzh commented Jun 28, 2022

Change Logs

Add separate control for Flink compaction operation sync/async mode.

Details in https://issues.apache.org/jira/projects/HUDI/issues/HUDI-4329

Problem Review

The compact operation sync/async in CompactionFunction is now controlled by FlinkOptions#COMPACTION_ASYNC_ENABLED

  public CompactFunction(Configuration conf) {
    this.conf = conf;
    this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
  }

While in fact it cannot be switched to sync mode because the pipeline defined by sync compaction will only include the clean but not compact operators.

  // compaction
  if (StreamerUtil.needsAsyncCompaction(conf)) {
    return Pipelines.compact(conf, pipeline);
  } else {
    return Pipelines.clean(conf, pipeline);
  }

Improvement

Add another separate control switch for compaction operation sync/async mode.

Impact

add sync compaction switch for MoR compaction.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Add compaction.operation.async.enabled: used to turn on the synchronous compaction operation

The original compaction.async.enabled: used to turn on the compaction process.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405 danny0405 self-assigned this Jun 30, 2022
@danny0405 danny0405 added the engine:flink Flink integration label Jun 30, 2022
@chenshzh
Copy link
Contributor Author

@danny0405 And if it's convenient, pls also check out this. It seems that it has been for some time. Thanks!

@yihua yihua added priority:high Significant impact; potential bugs area:table-service Table services labels Sep 13, 2022
this.conf = conf;
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
this.asyncCompactionOperation = OptionsResolver.needsAsyncCompactionOperation(conf);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why switching to a new option key, what's the purpose here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already support sync compaction for bounded source, it should be fine for async compaction for streaming source ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we use compaction.async.enabled to turn on compaction. But we could not make it sync because it's already been true.

    if (asyncCompaction) {
      // executes the compaction task asynchronously to not block the checkpoint barrier propagate.
      executor.execute(
          () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
          (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
          "Execute compaction for instant %s from task %d", instantTime, taskID);
    } else {
      // executes the compaction task synchronously for batch mode.
      LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
      doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
    }

support sync compaction for bounded source

We will use sync compaction mode for unbounded source in some scenarios. And actually the bounded source sync compaction seems weird. It use compaction.async.enabled true to turn on compaction, and then switch it to fasle for sync mode.

     // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {   // here FlinkOptions.COMPACTION_ASYNC_ENABLED decides that we need compaction
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // we come here because it is true, and it's so weird to turn  it false, actually we just want the operation to be executed sync.
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 pls help see whether there are any more problems here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And actually the bounded source sync compaction seems weird. It use compaction.async.enabled true to turn on compaction, and then switch it to fasle for sync mode.

I agree the logic here is a little weird, we can refactor it though,
the bounded source sync compaction is meaningful, especially when people do batch ingestion for mor table, they do not what a separate compaction job running again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep sync compaction as an option for unbounded source.

And in a whole for hudi feature, you might also agree that users should be provided sync compaction option for unbounded source, no matter the above mentioned scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 do we have any other questions here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and at the same time we use async thread to execute compaction async and collect the result compaction msgs, the output.collector will become thread unsafe

The output collector is thread unsafe here, let's fix it to be thread safe then ~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but that will be another issue, i think.
What about this pr ? We have discussed much about its necessity to keep sync compaction option for unbounded source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark conflicts issue has been addressed in #8379, while I still deem this PR as valid, the suggestions:

  1. rename compaction.async.enabled to compaction.enabled
  2. store the isBounded option in the conf to switch to sync compaction for bounded source.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Feb 26, 2024
@yihua
Copy link
Contributor

yihua commented Mar 26, 2024

@danny0405 is this still needed for Hudi Flink?

@danny0405
Copy link
Contributor

Yeah, let's move it to 1.0 release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:table-service Table services engine:flink Flink integration priority:high Significant impact; potential bugs size:S PR with lines of changes in (10, 100]

Projects

Status: 🚧 Needs Repro
Status: 🔖 Ready for review

Development

Successfully merging this pull request may close these issues.

4 participants