Skip to content

[MINOR] improve comaction operator shuffle#11757

Merged
danny0405 merged 1 commit intoapache:masterfrom
xicm:improve_compaction
Aug 14, 2024
Merged

[MINOR] improve comaction operator shuffle#11757
danny0405 merged 1 commit intoapache:masterfrom
xicm:improve_compaction

Conversation

@xicm
Copy link
Contributor

@xicm xicm commented Aug 12, 2024

Change Logs

I found a skew in the flink compaction task.

微信图片_20240812172514

Compaction task shuffle the compaction operation with keyby, A series of hash mods results in skew.

One file group on compaction operator, so we don't need keyby, just rebalance is OK.

This is the result of rebanlace.

微信图片_20240812172545

Impact

none

Risk level (write none, low medium or high below)

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XS PR with lines of changes in <= 10 label Aug 12, 2024
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.rebalance()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One operator one file group, rebalance is ok.
This is what we do in HoodieFlinkCompactor

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CompactionPlanSourceFunction(compactionPlans, conf))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is rebanlance deterministic for multiple concurrent tasks, when there are task failover and retries, two task may has the risk of sharing a common compaction file group.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you test it offline already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested on my cluster. But I didn't test the failed and retry case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline conpaction uses rebalance.

@xicm xicm changed the title [MINNOR] improve comaction operator shuffle [MINOR] improve comaction operator shuffle Aug 12, 2024
@xicm xicm force-pushed the improve_compaction branch from ec36c19 to 039fbc7 Compare August 13, 2024 06:09
@github-actions github-actions bot added size:S PR with lines of changes in (10, 100] and removed size:XS PR with lines of changes in <= 10 labels Aug 13, 2024
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 35c00da into apache:master Aug 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:S PR with lines of changes in (10, 100]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants