feat(common): When inferring checkpoint/schema from timeline, check non-ingestion write commits (in case they have metadata rolled-over)#18576
Conversation
92a41cb to
197d9e3
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR broadens schema and checkpoint inference to fall back to non-ingestion write commits (clustering/compaction/delete_partition) so those paths keep working after all ingestion commits have been archived. A couple of minor items worth double-checking in the inline comments — otherwise the change looks sensible and well tested. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of missed imports using fully-qualified java.util.Collections in both checkpoint constructors, and a redundant null-check in TableSchemaResolver that may mislead readers about the contract of getLastCommitMetadataWithSchema().
| throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); | ||
| } | ||
| }) | ||
| .filter(pair -> !StringUtils.isNullOrEmpty(pair.getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) |
There was a problem hiding this comment.
🤖 The previous instantOpt.isPresent() branch went through getCachedCommitMetadata(instant), which memoizes reads via ConcurrentHashMap.computeIfAbsent. The new path calls metaClient.getActiveTimeline().readCommitMetadata(instant) directly inside the stream, so repeated schema lookups at the same instant re-read the log/commit file. Was dropping the cache intentional, or would it be cheap to thread getCachedCommitMetadata through this new scan?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| .orElse(Option.empty()); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
🤖 Previously this read one instant; now it streams getReverseOrderedInstants() and reads commit metadata lazily until a match is found. On a table where the requested key is truly absent, this will read every completed commit in the active timeline. For checkpoint/streamer keys that scenario is plausible (e.g. first-time enable). Should this have a bounded lookback (like findRollingMetadataFromTimeline's RollingMetadataTimelineLookbackCommits) to avoid pathological scans?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
|
|
||
| public StreamerCheckpointV1(Map<String, String> metadata) { | ||
| Map<String, String> safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; |
There was a problem hiding this comment.
🤖 nit: could you add a proper import java.util.Collections; and use Collections.emptyMap() here instead of the fully-qualified name? The FQN reads like a missed import.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
|
|
||
| public StreamerCheckpointV2(Map<String, String> metadata) { | ||
| Map<String, String> safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; |
There was a problem hiding this comment.
🤖 nit: same as V1 — java.util.Collections.emptyMap() should be Collections.emptyMap() with a proper import.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| metaClient.getActiveTimeline().getLastCommitMetadataWithSchema(); | ||
| if (instantAndCommitMetadata.isPresent()) { | ||
| String schemaStr = instantAndCommitMetadata.get().getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY); | ||
| if (!StringUtils.isNullOrEmpty(schemaStr)) { |
There was a problem hiding this comment.
🤖 nit: the !StringUtils.isNullOrEmpty(schemaStr) guard here seems redundant — getLastCommitMetadataWithSchema() already filters to commits where the schema key is non-empty. Could you remove it, or add a brief comment explaining why the double-check is intentional?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This PR introduces comprehensive support for Apache Hudi across Spark 4.0.2, Lance file format, Vector and Variant data types, blob column batched reading, Flink continuous-sort buffering, multi-writer rollback exclusivity, and timeline archival enhancements. It also consolidates utility functions and updates dependencies.
CodeRabbit: hudi-agent#20 (review)
| } else { | ||
| schema = HoodieSchemaUtils.removeMetadataFields(schema); | ||
| } | ||
| return Option.of(schema); |
There was a problem hiding this comment.
Please keep the old “valid schema” fallback semantics here.
This now stops at the first non-empty SCHEMA_KEY and parses it immediately. If the newest schema-bearing instant has malformed or partially rolled-over schema metadata, HoodieSchema.parse(...) will fail resolution even when an older valid schema is still available. The previous flow explicitly asked the timeline for a valid schema-bearing commit, so this is a regression.
Please keep scanning past parse failures in the bounded branch, and keep the unbounded branch on a “valid schema” API/contract rather than getLastCommitMetadataWithSchema().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java`
around lines 221 - 243, The change regresses the previous “valid schema”
fallback: in the bounded scan (the stream/filter/map that parses
HoodieCommitMetadata.SCHEMA_KEY) wrap HoodieSchema.parse(...) so parse failures
are caught and skipped (i.e., continue scanning for the next non-empty schema)
instead of letting a parse exception stop the scan; in the unbounded branch,
stop using getLastCommitMetadataWithSchema() directly and call the timeline API
that guarantees a valid schema-bearing commit (the same "valid schema" contract
used previously) so you return only a successfully parsed HoodieSchema (apply
HoodieSchemaUtils.addMetadataFields/removeMetadataFields afterward as before).
— CodeRabbit (original) (source:comment#3139592453)
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback from the previous pass! The refactor simplifies things nicely: the new getLastCommitMetadataWithValidSchema(boolean) overload replaces the extra method, the cache is preserved on the instantOpt path, the unbounded timeline scan in TimelineUtils was reverted, and the clustering test now keeps the second clustering instant on the active timeline by skipping the post-round-1 inserts. Removing the Map<String,String> constructors on the checkpoint classes and the dead CheckpointUtils.getCheckpoint(Map) helper is a nice cleanup too. No issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
| getCommitMetadataStream() | ||
| .filter(instantCommitMetadataPair -> | ||
| WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()) | ||
| (!filterByCanUpdateSchema || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) |
There was a problem hiding this comment.
filterForSchemaMutableOperations
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR extends schema/checkpoint inference to fall back through non-ingestion write commits (clustering, compaction, delete_partition) when rolling metadata is enabled, which is a sensible improvement for the post-archival case. Prior review rounds have already flagged the main concerns (valid-schema fallback semantics, caching behavior in TableSchemaResolver, the test archiving away the asserted-on clustering instant, and the parameter naming in the new overload). No additional critical correctness issues from this pass — please address the prior comments, and this should be ready for a Hudi committer or PMC member to take it from here. One minor readability suggestion around a magic boolean call site; otherwise the code reads cleanly.
cc @yihua
| if (latestCommitWithValidSchema == null) { | ||
| Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = | ||
| metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); | ||
| metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false); |
There was a problem hiding this comment.
🤖 nit: the bare false here is opaque at the call site — a reader has to jump to the interface to know what it controls. Could you either introduce a small named local (e.g. boolean filterForSchemaMutableOps = false;) or expose two distinct methods (e.g. getLastCommitMetadataWithValidSchemaIncludingNonMutating())?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
…on-ingestion write commits (in case they have metadata rolled-over
…on-ingestion write commits (in case they have metadata rolled-over
272482e to
4b28bed
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends schema and checkpoint inference to fall back to non-ingestion write commits (clustering, compaction, delete_partition) so rolled-over metadata remains discoverable after archival removes ingestion commits. Prior review rounds have already flagged the main concerns around the getLatestCommitMetadataWithValidSchema semantics change, caching behavior, and a test archival ordering issue. No additional critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A couple of minor readability and consistency nits in the test and utility code.
cc @yihua
|
|
||
| // Only insert after the first round so that the second clustering instant | ||
| // remains on the active timeline after archival | ||
| if (round < 1) { |
There was a problem hiding this comment.
🤖 nit: round < 1 effectively means round == 0 — could you use round == 0 directly? The < 1 form makes a reader pause to ask whether a negative round value is ever possible.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| HoodieCommitMetadata metadata = completedTimeline.readCommitMetadata(instant); | ||
| if (metadata.getOperationType() == WriteOperationType.CLUSTER) { | ||
| String schema = metadata.getMetadata(schemaKey); | ||
| if (schema != null && !schema.isEmpty()) { |
There was a problem hiding this comment.
🤖 nit: could you use !StringUtils.isNullOrEmpty(schema) here instead of schema != null && !schema.isEmpty()? The rest of this PR consistently uses StringUtils.isNullOrEmpty for the same check.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| }).filter(Objects::nonNull).findFirst() | ||
| // Filter out null (from HoodieException) and empty strings (from commits | ||
| // that don't have checkpoint metadata, e.g. when rollover is not configured) | ||
| }).filter(key -> !StringUtils.isNullOrEmpty(key)).findFirst() |
There was a problem hiding this comment.
🤖 nit: the lambda parameter key here is actually a checkpoint value string (e.g. "topic:100"), not a metadata key name — something like checkpoint or checkpointValue might read more accurately.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18576 +/- ##
============================================
+ Coverage 68.89% 68.91% +0.01%
- Complexity 28549 28559 +10
============================================
Files 2480 2480
Lines 136904 136908 +4
Branches 16673 16673
============================================
+ Hits 94324 94353 +29
+ Misses 34994 34964 -30
- Partials 7586 7591 +5
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
When archival removes all ingestion commits from the active timeline, code paths that infer schema or checkpoint metadata can fail because they only inspect ingestion-type instants (commits whose
WriteOperationType.canUpdateSchema()is true). With Hudi's rolling metadata feature (hoodie.write.rolling.metadata.keys), non-ingestion commits like clustering, compaction, and delete_partition can carry rolled-over schema and checkpoint metadata. However, several inference paths don't search these commit types. This PR ensures schema and checkpoint resolution falls back to non-ingestion write commits when the latest instant doesn't carry the needed metadata.Summary and Changelog
Changes:
HoodieActiveTimeline/ActiveTimelineV1/ActiveTimelineV2: Added aboolean filterByCanUpdateSchemaoverload togetLastCommitMetadataWithValidSchema. Whenfalse, thecanUpdateSchemafilter is skipped, allowing schema discovery from any commit type (clustering, compaction, delete_partition). The no-arg version retains the original behavior (filter enabled).TableSchemaResolver: ChangedgetLatestCommitMetadataWithValidSchema()to callgetLastCommitMetadataWithValidSchema(false), so schema resolution searches all completed commit types instead of only ingestion commits.BaseHoodieClient: InmergeRollingMetadata, empty-string values are now treated as "missing" when checking both the current commit's existing metadata and values found in prior commits. This prevents an empty string from short-circuiting the walkback.InitialCheckpointFromAnotherHoodieTimelineProvider: Switched fromgetCommitsTimeline()togetWriteTimeline()to include compaction/logcompaction instants. Filters out empty checkpoint strings (not just nulls). Re-throwsIOExceptionasHoodieIOExceptioninstead of swallowing it.TestTimelineUtils(schema lookup ignoring operation type, empty schema returns empty) and 1 functional test inTestHoodieClientOnCopyOnWriteStorage(rolling metadata preserved across clustering after archival, withTableSchemaResolverstill able to find schema).Impact
TableSchemaResolvernow discovers schema from any commit type, not just ingestion commits. This is strictly more robust and has no impact on tables where ingestion commits are present on the timeline.getLastCommitMetadataWithValidSchema(boolean)overload is additive.Risk Level
Low — all changes are additive fallback paths. Existing behavior for tables with ingestion commits on the timeline is unchanged. The empty-string fix in
BaseHoodieClientcorrects a pre-existing edge case where an empty string value would prevent rolling metadata walkback.Documentation Update
None — no new configs or user-facing features. The rolling metadata config (
hoodie.write.rolling.metadata.keys) already exists; this PR ensures the metadata it produces is correctly discovered by all read paths.Contributor's checklist