Skip to content

[HUDI-6822] Fix deletes handling in hbase index when partition path is updated#9630

Merged
nsivabalan merged 5 commits intoapache:masterfrom
flashJd:Fix_hbase_index_update_partition
Dec 4, 2023
Merged

[HUDI-6822] Fix deletes handling in hbase index when partition path is updated#9630
nsivabalan merged 5 commits intoapache:masterfrom
flashJd:Fix_hbase_index_update_partition

Conversation

@flashJd
Copy link
Contributor

@flashJd flashJd commented Sep 6, 2023

Change Logs

Similar to #9114, corner case when a record moves from 1 partition to another with partition path update set to true

Impact

private <R> Function2<Integer, Iterator<HoodieRecord<R>>, Iterator<HoodieRecord<R>>> locationTagFunction(
HoodieTableMetaClient metaClient) {
// `multiGetBatchSize` is intended to be a batch per 100ms. To create a rate limiter that measures
// operations per second, we need to multiply `multiGetBatchSize` by 10.
Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize();
return (partitionNum, hoodieRecordIterator) -> {
boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath();

  1. Hbase index tagLocation is executed concurrently in different spark partitions, so the index delete operation may happen after index add operation, cause the undefined behavior as codope said.
  2. ReduceByKey can be a costly operation as [HUDI-6467] Fix deletes handling in rli when partition path is updated #9114 (comment) discussed
    so add a flag to ignore this index delete operations in partitionPathUpdate condition

Risk level (write none, low medium or high below)

N/A

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

List<HoodieRecord> recordList = new LinkedList<>();
for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
if (recordDelegate.getIgnoreFlag()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ingores the delete records for old partitions directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only ignore the deleted record in partition change scene, not all deleted record:
existing record(id1, par1), new record(id1, par2), if updatePartitionPath set to true, will emit delete record to par1 and add new record(id1, par2) to par2, in this scene, the delete record is no need to update index as the new record will override the index

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we handle deletes. i.e. if we get deletes for a record in partition p1, when it reaches metadata writer, we might just have 1 recordDelegate but theignore flag will not be set since we are not setting it in any of write handles? and so we should be good.

we are setting the ignore flag only in indexing code and specifically when indexing could reutrn two version of record delegate.

just wanted to confirm my understanding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's right, we are setting the ignore flag only in indexing code and specifically when indexing could reutrn two version of record delegate.

@bvaradar bvaradar added the type:bug Bug reports and fixes label Oct 4, 2023
@bvaradar bvaradar self-assigned this Oct 4, 2023
@nsivabalan nsivabalan added priority:blocker Production down; release blocker release-0.14.1 labels Nov 15, 2023
@nsivabalan nsivabalan self-assigned this Nov 29, 2023
List<HoodieRecord> recordList = new LinkedList<>();
for (HoodieRecordDelegate recordDelegate : writeStatus.getWrittenRecordDelegates()) {
if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
if (recordDelegate.getIgnoreFlag()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we handle deletes. i.e. if we get deletes for a record in partition p1, when it reaches metadata writer, we might just have 1 recordDelegate but theignore flag will not be set since we are not setting it in any of write handles? and so we should be good.

we are setting the ignore flag only in indexing code and specifically when indexing could reutrn two version of record delegate.

just wanted to confirm my understanding.

@bvaradar bvaradar force-pushed the Fix_hbase_index_update_partition branch from a37708f to b6faf77 Compare November 29, 2023 04:22
@nsivabalan nsivabalan force-pushed the Fix_hbase_index_update_partition branch from 36306f0 to fe66314 Compare December 1, 2023 23:38
@hudi-bot
Copy link
Collaborator

hudi-bot commented Dec 2, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan merged commit b9fb9f6 into apache:master Dec 4, 2023
nsivabalan pushed a commit to nsivabalan/hudi that referenced this pull request Dec 5, 2023
…s updated (apache#9630)

---------

Co-authored-by: Balaji Varadarajan <balaji.varadarajan@robinhood.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker release-0.14.1 type:bug Bug reports and fixes

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

5 participants