Skip to content

[HUDI-9505] Hudi 1.1 blocker code change of index look up #13414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

Davis-Zhang-Onehouse
Copy link
Contributor

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse commented Jun 9, 2025

Change Logs

  1. Make sure the index lookup API is wrapped around HoodieData or HoodiePairData, which is in preparation for scalable batch index lookup where we will switch to RDD based implementation. We only target RLI / secondary index (SI) related look up as of now.
  2. Change the hash partitioning strategy of SI file groups. Previously the storage format is
    <secondary col value>_<record col value>: null
    records of SI were hash partitioned by <secondary col value>_<record col value>

Now we change it to hash partitioned by <secondary col value>, the storage format is unchanged.

The motivation is that the index query pattern typically copes with cases where only "secondary col value" are available in the lookup query

by changing the partitioning strategy, we can do SI file group pruning to ensure we only involve file groups of interest instead of scanning all SI records. The downsides of skewed record distributions of SI records across file groups are well understood.

  1. Because we change the partition strategy of SI, the change cannot go out as it is. The functional requirement is "when the code change is actively functioning, reader and writer path will all function as if SI file groups are partitioned based on <secondary col value>", and we DO NOT want to support old & new modes at the same time. The new strategy does not work with the existing SI data layout if a table already comes with SI and is hash partitioned by <secondary col value>_<record col value>.Without extra protection, SI failed to lookup the records from the file groups it needs due to the file group pruning enforced by the new read pattern, causing data correctness issue. As discussed with hudi PMCs, we agree table version upgrade is indispensable.

  2. For future extensibility, we introduced index version field in index_def.json. Starting table version 9, each index comes with its version number. So later storage change on index do not require table version upgrade anymore. If the field is not present, then value version 1 is assumed.
    We handled the differences between SI layouts 1 and 2, inside the reader/write code. Inside the code, the layout version is handled specifically for each index type.
    We still need to bump up the table version (to handle a downgrade where SI layout 2, can't be read by releases < 1.1).

https://issues.apache.org/jira/browse/HUDI-9516

Impact

Faster SI lookup with acceptable downsides as explained above.

Risk level (write none, low medium or high below)

None

Documentation Update

Need to work on table version upgrade guide. Tracked by
https://issues.apache.org/jira/browse/HUDI-9505

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jun 9, 2025
@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-9505-1 branch 2 times, most recently from bff05a9 to a18a18b Compare June 9, 2025 22:55
@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Jun 9, 2025
@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-9505-1 branch 2 times, most recently from cccb3cc to 7f6b80b Compare June 9, 2025 23:23
Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it deserves a new table version based on the fact there is no file format or layout change, maybe just a new metadata config should be enough.

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse changed the title [HUDI-9505] [HUDI-9505] Hudi 1.1 blocker code change of index look up Jun 10, 2025
@Davis-Zhang-Onehouse
Copy link
Contributor Author

I don't think it deserves a new table version based on the fact there is no file format or layout change, maybe just a new metadata config should be enough.

Hi Danny, thanks for chiming in here. I added more details in the PR descriptions. If your concerns persists, let's have a discussion with Ethan and vc regarding the decision making. Thank you!

@vinothchandar vinothchandar self-assigned this Jun 13, 2025
@yihua
Copy link
Contributor

yihua commented Jun 13, 2025

I don't think it deserves a new table version based on the fact there is no file format or layout change, maybe just a new metadata config should be enough.

There is layout change in the secondary index partition in MDT where the file group mapping from the record key to the file group index is changed. Unless the table version is changed and the mapping or sharding (we can decide a name to use) algorithm is stored in the table config, there is no way to differentiate how secondary index is sharded.

@yihua
Copy link
Contributor

yihua commented Jun 13, 2025

In general, I prefer the mapping or sharding (we can decide a name to use) algorithm to be stored in the table config along with the table version upgrade so it's easier to change the mapping if needed and the reader can use that to determine how to look up the index. There are at least two occurrences, this secondary index and partitioned record index, in which having the new table config would help migration and avoid table version upgrade in the future.

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-9505-1 branch 2 times, most recently from eef34e1 to f202555 Compare June 16, 2025 00:10
@vinothchandar
Copy link
Member

we DO NOT want to support old & new modes at the same time.

Mulling if we should change this constraint instead. This may be a good point to introduce a index layout version and have that inside the index definition metadata. Similar to log files. This will help readers/writers evolve index storage. for e.g compaction could seamlessly upgrade index partitions.

@vinothchandar
Copy link
Member

I prefer the mapping or sharding (we can decide a name to use) algorithm to be stored in the table config

Prefer index definition. vs adding a new table config.

@yihua
Copy link
Contributor

yihua commented Jun 16, 2025

I prefer the mapping or sharding (we can decide a name to use) algorithm to be stored in the table config

Prefer index definition. vs adding a new table config.

In general I prefer adding the info in the .hoodie folder as part of the table so reader can decide the sharding based on this. Index definition or metadata looks better for having this sharding information stored. Let's do that.

@Davis-Zhang-Onehouse
Copy link
Contributor Author

Davis-Zhang-Onehouse commented Jun 16, 2025

@yihua asked for PR owner @Davis-Zhang-Onehouse to work on the following AI:

Index definition looks better. sth to store as part of the table metadata, instead of implicitly inferring it. We should store something in the .index.def.

Proposed change

Introducing a new attribute "partitionStrategy". Possible values:

  • HASH_ON_SECONDARY_KEY: exclusive to SI. It means we can determine SI file group by computing the hash value only using the secondary data column value.
  • HASH_ON_SECONDARY_KEY_RECORD_KEY: exclusive to SI. It means the old behavior.
  • For other indexes we will also come up with proper enum values for their existing strategies.
image

Also from the schema evolution perspective about this json. If we didn't find such field, depends on the index type, we infer the default strategy. For SI, in case we could not find the value, it means using HASH_ON_SECONDARY_KEY_RECORD_KEY.

Implementation details

Read path

Whenever we do lookup using indexes, we will read the index def file and add partitionStrategy value to the context. If it is
HASH_ON_SECONDARY_KEY_RECORD_KEY, it will route to prefix lookup code. If it is HASH_ON_SECONDARY_KEY, it is route to the new code

Also for all the other indexes, we need to do the same as they share the same code path. It is an index look up interface change so it impacts code shared by all indexes.

Write path

If it is creating a new interface, we always write partitionStrategy attribute in the strategy. For SI it will be of value HASH_ON_SECONDARY_KEY_RECORD_KEY. For others, we will come up name mapping to their existing behavior.

If it is updating existing indexes due to change of data, follow the "Read path" logic.

Misc

revert the table version related code change.

@yihua please comment on the proposed plan when you get a chance. thanks

@yihua
Copy link
Contributor

yihua commented Jun 16, 2025

@yihua asked for PR owner @Davis-Zhang-Onehouse to work on the following AI:

Index definition looks better. sth to store as part of the table metadata, instead of implicitly inferring it. We should store something in the .index.def.

Proposed change

Introducing a new attribute "partitionStrategy". Possible values:

  • HASH_ON_SECONDARY_KEY: exclusive to SI. It means we can determine SI file group by computing the hash value only using the secondary data column value.
  • HASH_ON_SECONDARY_KEY_RECORD_KEY: exclusive to SI. It means the old behavior.
  • For other indexes we will also come up with proper enum values for their existing strategies.
image Also from the schema evolution perspective about this json. If we didn't find such field, depends on the index type, we infer the default strategy. For SI, in case we could not find the value, it means using `HASH_ON_SECONDARY_KEY_RECORD_KEY`.

Implementation details

Read path

Whenever we do lookup using indexes, we will read the index def file and add partitionStrategy value to the context. If it is HASH_ON_SECONDARY_KEY_RECORD_KEY, it will route to prefix lookup code. If it is HASH_ON_SECONDARY_KEY, it is route to the new code

Also for all the other indexes, we need to do the same as they share the same code path. It is an index look up interface change so it impacts code shared by all indexes.

Write path

If it is creating a new interface, we always write partitionStrategy attribute in the strategy. For SI it will be of value HASH_ON_SECONDARY_KEY_RECORD_KEY. For others, we will come up name mapping to their existing behavior.

If it is updating existing indexes due to change of data, follow the "Read path" logic.

Misc

revert the table version related code change.

@yihua please comment on the proposed plan when you get a chance. thanks

Looks good to me overall.

@vinothchandar
Copy link
Member

vinothchandar commented Jun 16, 2025

@yihua @Davis-Zhang-Onehouse

I prefer not to introduce a new concept like partitionStrategy, that may or may not be changing/defined per index.

Instead I propose

  • We introduce a layoutVersion field into the index definition. If the field is not present, then value version 1 is assumed.
  • We handled the differences between SI layouts 1 and 2, inside the reader/write code. Inside the code, the layout version is handled specifically for each index type.
  • We document different index storage layouts in the 1.0-tech-specs (needs a docs PR to asf-site branch)
  • We still need to bump up the table version (to handle a downgrade where SI layout 2, can't be read by releases < 1.1)

@yihua
Copy link
Contributor

yihua commented Jun 16, 2025

@yihua @Davis-Zhang-Onehouse

I prefer not to introduce a new concept like partitionStrategy, that may or may not be changing/defined per index.

Instead I propose

  • We introduce a layoutVersion field into the index definition. If the field is not present, then value version 1 is assumed.
  • We handled the differences between SI layouts 1 and 2, inside the reader/write code. Inside the code, the layout version is handled specifically for each index type.
  • We document different index storage layouts in the 1.0-tech-specs (needs a docs PR to asf-site branch)
  • We still need to bump up the table version (to handle a downgrade where SI layout 2, can't be read by releases < 1.1)

Actually this proposed index layout version is more general. I prefer this too.

@Davis-Zhang-Onehouse
Copy link
Contributor Author

Davis-Zhang-Onehouse commented Jun 17, 2025

@yihua @vinothchandar new items to factor in: backwards and forward compatibility. I spotted major issues and the PR is blocked on your feedback

Compatibility

Forward compatibility

If SI using version 2 (hash partition on data column value only) and hudi is of old binary, what happens is hudi does not has the concept of index version and will treat the new SI version as if it is the old one. As a result,

Read path

it should be fine since it will use prefix lookup which naturally compatible with the new partition strategy.

Write path

Write path is messed up as the old hudi binary will write to new index version with old partition strategy.
What make things worse is the hudi index version is not updated as the old binary do not have such logic.
So we end up with a corrupted version 2 hoodie index as the old hudi binary do not conform to the version 2 protocol of updating the index.

Backward compatibility

this should be fine as the new hudi binary will properly recognize the version (or the absence of the version) and adapt properly. Details are covered in the previous thread.

Fundamental limitation of the index version design

Old hudi binary only recognize and respect table version. Introducing index version means user must use a version that recognize and honor this.

In industry the standard procedure is

  • introducing a "compatibility patch" which recognize the version and proper back off it is some future version (old hudi binary will choose not to use the index even there is one)
  • User must be aware of all readers/writers that happens to a hudi table. If the hudi table is of SI version 2, user must make sure all hudi versions are at least >= the compatibility patch.
  • This place a burden on the user side, and failed to do so means SI is silently corrupted and causing correctness issue which is not acceptable. We need a place to guide user and this is way cumbersome than a table version upgrade.
  • If all readers writers are managed by some service provider, this might not be a issue - just introduce the compatibiilty patch and we are all good.

@Davis-Zhang-Onehouse
Copy link
Contributor Author

An alternative approach - create SI V2 as a new index category

Instead of creating SI with multiple versions, we can do new version of SI as a completely different index type - we write to different metadata partition path, etc. There is no compatibility issue anymore.

Also with the approch, the old binary does not know the new SI version, so it should not do anything with it (not sure if we follow this compatibility best practice).

The new binary can do the following:

  • It only create SI v2
  • It is capable of reading and writing both new and old SI.

This avoids version management of any form. But the old binary must tolerate unrecognized MDT partitions which I'm not sure

@yihua
Copy link
Contributor

yihua commented Jun 17, 2025

@Davis-Zhang-Onehouse at a high level, both the reader and writer paths should follow the new index layout version added in the index definition (i.e., the index layout version is 1 if not set, and the index layout version is 2, which should be explicitly set for the new secondary index layout in this PR). The table version needs to be upgraded and the secondary index layout should be changed as part of the table upgrade and downgrade process.

Here are more details on how it should work:
Index definition and layout:

  • Table version 8: no index layout version; secondary index layout based on hard-coded logic
  • Table version 9: new index layout version introduced; secondary index layout version 2 (different from table version 8)

Upgrade and downgrade:

  • Upgrade from 8 -> 9: add the index layout version to index definitions; remove secondary index if exists
  • Downgrade from 9 -> 8: remove the index layout version; remove secondary index if exists

MDT reader and write compatibility:

  • The writer and reader take the index definition and get the index layout version. If not present (meaning table version <= 8), infer the table layout version as 1. Use the table layout version to determine how the record keys are mapped to file groups (i.e., secondary index has different mapping between layout 1 and 2).

@yihua
Copy link
Contributor

yihua commented Jun 17, 2025

An alternative approach - create SI V2 as a new index category

Instead of creating SI with multiple versions, we can do new version of SI as a completely different index type - we write to different metadata partition path, etc. There is no compatibility issue anymore.

Also with the approch, the old binary does not know the new SI version, so it should not do anything with it (not sure if we follow this compatibility best practice).

The new binary can do the following:

  • It only create SI v2
  • It is capable of reading and writing both new and old SI.

This avoids version management of any form. But the old binary must tolerate unrecognized MDT partitions which I'm not sure

A new metadata partition path should be avoided for only layout difference. That's why the layout version plays a role.

@Davis-Zhang-Onehouse
Copy link
Contributor Author

ok, so the solution still requires table version upgrade, sorry I should have read vc's reply.

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-9505-1 branch 3 times, most recently from 945a9e9 to 0c87abf Compare June 18, 2025 01:48
@Davis-Zhang-Onehouse
Copy link
Contributor Author

CI failures are not relevant to the code changes

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-9505-1 branch 2 times, most recently from feff82f to 04e3a63 Compare June 23, 2025 21:21
@vinothchandar vinothchandar self-assigned this Jun 24, 2025
@vinothchandar
Copy link
Member

@Davis-Zhang-Onehouse When you get a chance, please bring the PR description inline with the approach now implemented

@Davis-Zhang-Onehouse
Copy link
Contributor Author

@Davis-Zhang-Onehouse When you get a chance, please bring the PR description inline with the approach now implemented

done

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass. Can take another once you address some of these.

public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context,
String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config, context, table, "upgrading");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to "upgrading to table version 9"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to recreate the index as well? or we push that to the first write after upgrade

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user needs to explicitly recreate the index. we will make sure it is captured in the doc update.
If we want first write after upgrade to auto recreate, let's plan for the priority and delivery time. As MVP I didn't include this.

@@ -134,6 +140,8 @@ public void createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaC
.withIndexType(PARTITION_NAME_COLUMN_STATS)
.withIndexFunction(PARTITION_NAME_COLUMN_STATS)
.withSourceFields(columnsToIndex)
// Use the existing version if exists, otherwise fall back to the default version.
.withVersion(getExistingHoodieIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, metaClient))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we want to hand control of what version is written to user - right? the versions should be picked by index def building for writing internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, user cannot control.

then why we have "existing" or "default"?

  • Existing: if index is already created, write using the existing version.
  • default: if index is not created (first time creating index), use index version based on the table version

JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
.map(HoodieRecord::getRecordKey)
.keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, numFileGroups))
.keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, numFileGroups, partitionPath,
getExistingHoodieIndexVersionOrDefault(partitionPath, hoodieTable.getMetaClient())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callout: we should ensure none of these calls from executors read the index defs json. it should be read once on the driver.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's an excellent callout. I reviewed all call site and revised as appropriate.

@@ -294,18 +303,20 @@ public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> reco
* @param secondaryKeys The list of secondary keys to read
*/
@Override
public Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
public HoodiePairData<String, HoodieRecordGlobalLocation> readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you point me to changes where we read v1 and v2 here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we assume we just read v2.. Reason is - the writer version can be 6,8,9.. and if its 6, don't we write v1 of SI? so the code here, shoudl read both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR I didn't add the change and it does not break anything.

write path: it does the proper hashing based on index, so it is good
read path: it read all file index and prefix matching which is compatible to handle both v1 and v2.

we have the code change of what you want in the productionization PR, which is separate from what this PR is - minimum code change to unblock hudi 1.x.

The read path has larger code change and we should not do everything in 1 single giant PR

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar @Davis-Zhang-Onehouse

Curerntly the SI is not even been utilized in any community user production use cases yet, is it okay we just migrate to new hashing func for SI and add doc to our site the SI in 1.0.2 and 1.1 are not compatible, add tutorials to guid user how to do a munal upgrade then? So that we avoid the chaos introduced in this PR.

each index comes with its version number. So later storage change on index do not require table version upgrade anymore

This is not true, as long as the index layout changes, a table upgrade/downgrade is required.

Also for all the other indexes, we need to do the same as they share the same code path. It is an index look up interface change so it impacts code shared by all indexes.

Can we limit the change to just SI as much as possible, I don't think the other index types need a multiple version in the mid-term future.

@Davis-Zhang-Onehouse
Copy link
Contributor Author

#13489 PR comments follow up is in #13489.

@vinothchandar
Copy link
Member

@danny0405 we need a versioning schema for index storage as well, just like log blocks.. So this is useful foundation work regardless

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants