Skip to content

[HUDI-9340] Adding streaming support to secondary index writes w/ metadata table #13436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Jun 14, 2025

Change Logs

To be filled in.

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Jun 14, 2025
}

public void addSecondaryIndexStats(String secondaryIndexPartitionPath, String recordKey, String secondaryIndexValue, boolean isDeleted) {
secondaryIndexStats.computeIfAbsent(secondaryIndexPartitionPath, k -> new ArrayList<>())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add an explicit method named initializeSecondaryIndexStats.
and within constructor of HoodieWriteHandle, we can call this method when we know sec index is enabled and if there are any sec index defns found.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

}

void clear() {
this.writtenRecordDelegates.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clear secondaryIndexStats as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

*/
public class SecondaryIndexStats {

private String recordKey;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@@ -62,7 +65,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
private final boolean preserveMetadata;
private boolean preserveMetadata;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reverting this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this change. I had addded it for fixing one of the tests. Not required anymore.

@@ -173,6 +178,23 @@ record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new Metadat
}
}

private void trackMetadataIndexStats(HoodieRecord record) {
if (!config.isSecondaryIndexEnabled() || secondaryIndexDefns.isEmpty() || !config.isMetadataStreamingWritesEnabled(hoodieTable.getMetaClient().getTableConfig().getTableVersion())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this condition to HoodieWriteHandle and use it all sub class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it

@@ -452,6 +458,51 @@ protected HoodieRecord<T> updateFileName(HoodieRecord<T> record, Schema schema,
return record.prependMetaFields(schema, targetSchema, metadataValues, prop);
}

private void trackMetadataIndexStats(Option<HoodieKey> hoodieKeyOpt, Option<HoodieRecord> combinedRecordOpt, Option<HoodieRecord<T>> oldRecordOpt, boolean isDelete) {
if (!config.isSecondaryIndexEnabled() || secondaryIndexDefns.isEmpty() || !config.isMetadataStreamingWritesEnabled(hoodieTable.getMetaClient().getTableConfig().getTableVersion())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on earlier feedback, we should be able to access a protected method from HoodieWriteHandle for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@@ -149,7 +152,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
private static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME;

// tracks the list of MDT partitions which can write to metadata table in a streaming manner.
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX);
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX, SECONDARY_INDEX);
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES = Arrays.asList(SECONDARY_INDEX);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to Reviewer:
Certain partitions in MDT are one on one mapped to the MetadataPartitionType. but its not always the case.
For eg, in case of secondary index, we generally do startsWtith and not really exact match, since we could have N no of sec indexes built.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we enhance tests that we wrote for the patch where we added APIs and impl for the streaming metadata writes.

assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size());
// Since the MDT is not populated during the create, the updates would be considered as new records by the Append handle
// Therefore only secondary index records for the 50 updates would appear here
assertEquals(50, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we validate the content as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

// validate write status has all record delegates
if (populateMetaFields) {
assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size());
assertEquals(100, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

// verify secondary index stats
assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size());
// 10 si records for old secondary keys and 10 for new secondary keys
assertEquals(20, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should def validate the contents here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets ensure there is a mix of updates and deletes as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@nsivabalan
Copy link
Contributor Author

btw, do we have tests for FG reader based Merge Handle as well?
compaction takes that route right.

@lokeshj1703 lokeshj1703 force-pushed the HUDI-9417-HUDI-9405-end-to-end-stitching-collapsed-si-streaming branch from 73887b7 to 9c2caf5 Compare June 17, 2025 05:49
@nsivabalan nsivabalan changed the title [Hudi 9340][DNM] Adding streaming support to secondary index writes w/ metadata table [HUDI-9340] Adding streaming support to secondary index writes w/ metadata table Jun 17, 2025
}

/**
* Returns true if streaming is enabled and secondary index is configured for the table.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix java docs

@nsivabalan
Copy link
Contributor Author

we need to ignore SI record generation for compaction and clustering.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan closed this Jun 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants