Skip to content

[HUDI-6480] Flink support non-blocking concurrency control#9850

Merged
danny0405 merged 4 commits intoapache:masterfrom
beyond1920:flinkLocklessMultipleWriter
Oct 16, 2023
Merged

[HUDI-6480] Flink support non-blocking concurrency control#9850
danny0405 merged 4 commits intoapache:masterfrom
beyond1920:flinkLocklessMultipleWriter

Conversation

@beyond1920
Copy link
Contributor

@beyond1920 beyond1920 commented Oct 12, 2023

Change Logs

Since #9776 is merged, this pr aims to support multiple streaming writers into the same MOR table with simple bucket index.

Set the following configure to enable this feature:

  • table.type=MERGE_ON_READ
  • hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL
  • index.type=BUCKET
  • hoodie.index.bucket.engine=SIMPLE
  • hoodie.cleaner.policy.failed.writes=LAZY
    Besides, only enable async cleaning for one writer (by default, they are all enabled).

ps: This work is follow up of #9125. Thanks for contribution @danny0405

Impact

NA

Risk level (write none, low medium or high below)

NA

Documentation Update

Would be add in a separate PR.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

return OptionsResolver.isMorTable(conf)
? TestUtils.getLastDeltaCompleteInstant(basePath)
: TestUtils.getLastCompleteInstant(basePath, HoodieTimeline.COMMIT_ACTION);
return this.ckpMetadata.lastCompleteInstant();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we must fetch the instant from ckp metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.
Add a special branch to handle lockless multiple writers.
For multiple writes, the writer started first might finish later, similarly, the writer started later might finish first. So the last completed instant from ckp metadata might not be equals to the last completed instant from timeline.

pipeline1.checkpoint(1)
.assertNextEvent()
.checkpointCompleteThrows(1, HoodieWriteConflictException.class, "Cannot resolve conflicts");
testConcurrentCommit(pipeline1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add new tests instead of ammending existing one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of existed test cases (testWriteMultiWriterInvolved and testWriteMultiWriterPartialOverlapping) change after support non-blocking multiple writers. So those two tests need to be updated.
I also add some new tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@beyond1920
Copy link
Contributor Author

@danny0405 Thanks for review. I would add more tests soon.

@beyond1920 beyond1920 force-pushed the flinkLocklessMultipleWriter branch from e01944d to d6a5091 Compare October 13, 2023 09:04
.checkpoint(1)
.assertNextEvent();
if (OptionsResolver.isLocklessMultiWriter(conf)) {
// should success for concurrent modification of same fileGroups if using lockless multi writers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be always true? OptionsResolver.isLocklessMultiWriter(conf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's only be true if all the following condition is satisfied:

  • the table is MOR table
  • the index is simple bucket index
  • enable OPTIMISTIC_CONCURRENCY_CONTROL

So it could be false for COW table or MOR table with other index type.

// If there are lockless multiple writer, fetch last complete instant of current writer from ckp metadata
// because the writer started first might finish later, similarly, the writer started later might finish first.
return this.ckpMetadata.lastCompleteInstant();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, maybe we can use return this.ckpMetadata.lastCompleteInstant(); directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most case return this.ckpMetadata.lastCompleteInstant(); directly works fine.
But the case testSubtaskFails would fail because failover happens, the ckp meta might be cleaned, fetch the instant from timeline.
I add branch to handle this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this.ckpMetadata.isEmpty() indicate a failover?

@beyond1920 beyond1920 force-pushed the flinkLocklessMultipleWriter branch 2 times, most recently from a247acb to f73c1b6 Compare October 13, 2023 12:39
@danny0405
Copy link
Contributor

6480.patch.zip
Thanks for the contribution, I have reviewed and created a patch, you need to rebase with latest master then apply the path.

@beyond1920 beyond1920 force-pushed the flinkLocklessMultipleWriter branch from f73c1b6 to 1297401 Compare October 16, 2023 03:22
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 160f43c into apache:master Oct 16, 2023
@beyond1920 beyond1920 deleted the flinkLocklessMultipleWriter branch October 16, 2023 12:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants