-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9340] Adding streaming support to secondary index writes w/ metadata table #13436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-9340] Adding streaming support to secondary index writes w/ metadata table #13436
Conversation
} | ||
|
||
public void addSecondaryIndexStats(String secondaryIndexPartitionPath, String recordKey, String secondaryIndexValue, boolean isDeleted) { | ||
secondaryIndexStats.computeIfAbsent(secondaryIndexPartitionPath, k -> new ArrayList<>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add an explicit method named initializeSecondaryIndexStats.
and within constructor of HoodieWriteHandle, we can call this method when we know sec index is enabled and if there are any sec index defns found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
} | ||
|
||
void clear() { | ||
this.writtenRecordDelegates.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clear secondaryIndexStats as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
*/ | ||
public class SecondaryIndexStats { | ||
|
||
private String recordKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
@@ -62,7 +65,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O | |||
protected long recordsDeleted = 0; | |||
private Map<String, HoodieRecord<T>> recordMap; | |||
private boolean useWriterSchema = false; | |||
private final boolean preserveMetadata; | |||
private boolean preserveMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why reverting this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this change. I had addded it for fixing one of the tests. Not required anymore.
@@ -173,6 +178,23 @@ record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new Metadat | |||
} | |||
} | |||
|
|||
private void trackMetadataIndexStats(HoodieRecord record) { | |||
if (!config.isSecondaryIndexEnabled() || secondaryIndexDefns.isEmpty() || !config.isMetadataStreamingWritesEnabled(hoodieTable.getMetaClient().getTableConfig().getTableVersion())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move this condition to HoodieWriteHandle and use it all sub class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it
@@ -452,6 +458,51 @@ protected HoodieRecord<T> updateFileName(HoodieRecord<T> record, Schema schema, | |||
return record.prependMetaFields(schema, targetSchema, metadataValues, prop); | |||
} | |||
|
|||
private void trackMetadataIndexStats(Option<HoodieKey> hoodieKeyOpt, Option<HoodieRecord> combinedRecordOpt, Option<HoodieRecord<T>> oldRecordOpt, boolean isDelete) { | |||
if (!config.isSecondaryIndexEnabled() || secondaryIndexDefns.isEmpty() || !config.isMetadataStreamingWritesEnabled(hoodieTable.getMetaClient().getTableConfig().getTableVersion())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on earlier feedback, we should be able to access a protected method from HoodieWriteHandle for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
@@ -149,7 +152,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab | |||
private static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME; | |||
|
|||
// tracks the list of MDT partitions which can write to metadata table in a streaming manner. | |||
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX); | |||
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX, SECONDARY_INDEX); | |||
private static final List<MetadataPartitionType> STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES = Arrays.asList(SECONDARY_INDEX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to Reviewer:
Certain partitions in MDT are one on one mapped to the MetadataPartitionType. but its not always the case.
For eg, in case of secondary index, we generally do startsWtith
and not really exact match, since we could have N no of sec indexes built.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we enhance tests that we wrote for the patch where we added APIs and impl for the streaming metadata writes.
assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size()); | ||
// Since the MDT is not populated during the create, the updates would be considered as new records by the Append handle | ||
// Therefore only secondary index records for the 50 updates would appear here | ||
assertEquals(50, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we validate the content as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
// validate write status has all record delegates | ||
if (populateMetaFields) { | ||
assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size()); | ||
assertEquals(100, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
// verify secondary index stats | ||
assertEquals(1, writeStatus.getIndexStats().getSecondaryIndexStats().size()); | ||
// 10 si records for old secondary keys and 10 for new secondary keys | ||
assertEquals(20, writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should def validate the contents here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets ensure there is a mix of updates and deletes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
btw, do we have tests for FG reader based Merge Handle as well? |
73887b7
to
9c2caf5
Compare
} | ||
|
||
/** | ||
* Returns true if streaming is enabled and secondary index is configured for the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix java docs
we need to ignore SI record generation for compaction and clustering. |
Change Logs
To be filled in.
Impact
Describe any public API or user-facing feature change or any performance impact.
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist