Skip to content

feat(common): Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean#18543

Closed
kbuci wants to merge 12 commits into
apache:masterfrom
kbuci:clean-rolling-metadata
Closed

feat(common): Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean#18543
kbuci wants to merge 12 commits into
apache:masterfrom
kbuci:clean-rolling-metadata

Conversation

@kbuci
Copy link
Copy Markdown
Contributor

@kbuci kbuci commented Apr 22, 2026

Describe the issue this Pull Request addresses

It becomes possible for the active timeline to contain only non-ingestion instants (clustering, delete_partition, clean) after a backfill + archival cycle. In addition, currently the metadata roll-over does not handle preserving schema. This breaks readers/writers that rely on finding schema in ingestion commit metadata

This PR ensures that schema and checkpoint metadata are always discoverable on the active timeline, regardless of which commit types survive archival.

Summary and Changelog

Extends the rolling extra metadata feature to clean operations and adds fallback lookup paths so that schema and checkpoint metadata can always be resolved — even when the active timeline has no ingestion commits and commit roll-over is not set for clustering.

Changes:

  • BaseHoodieClient: Refactored mergeRollingMetadata to extract a reusable findRollingMetadataFromTimeline() utility that searches both the commits timeline and the cleaner timeline. Empty string values are now treated as "missing" for rolling purposes.
  • CleanActionExecutor: runClean() now calls findRollingMetadataFromTimeline() to populate HoodieCleanMetadata.extraMetadata with rolled-over keys (e.g., schema, checkpoint).
  • TableSchemaResolver: Added two fallback methods in getTableSchemaInternal:
    • getTableSchemaFromAnyCommitMetadata — uses the new getLastCommitMetadataWithSchema() (no canUpdateSchema filter) to find schema in clustering/delete_partition commits.
    • getTableSchemaFromCleanMetadata — searches clean instants' extraMetadata for a rolled-over schema.
  • HoodieActiveTimeline / ActiveTimelineV1 / ActiveTimelineV2: Added getLastCommitMetadataWithSchema() which returns the latest commit with a non-empty schema, regardless of WriteOperationType.
  • TimelineUtils: getExtraMetadataFromLatest and getExtraMetadataFromLatestIncludeClustering now fall back to searching clean instants' extraMetadata when the key is not found in completed commits.
  • StreamerCheckpointUtils: resolveCheckpointToResumeFrom now falls back to getCheckpointFromCleanInstants when no checkpoint is found on the commits timeline.
  • Error handling: All fallback paths re-throw IOException as HoodieIOException instead of suppressing it.

Tests:

  • TestTimelineUtils: 6 new unit tests covering clean instant fallback for getExtraMetadataFromLatest, getExtraMetadataFromLatestIncludeClustering, commit-over-clean priority, missing key in clean, and getLastCommitMetadataWithSchema behavior.
  • TestHoodieClientOnCopyOnWriteStorage: 2 new functional tests:
    • testRollingMetadataPreservedAcrossClusteringAfterArchival — verifies schema is rolled into clustering commits and TableSchemaResolver finds it after archival removes ingestion commits.
    • testRollingMetadataPreservedInCleanCommits — verifies clean commits contain rolled-over schema in extraMetadata.

Impact

  • No public API changes. All changes are internal to metadata propagation and lookup logic.
  • Existing behavior is unchanged when rolling metadata is not configured — all new code paths are additive fallbacks.
  • When hoodie.write.rolling.metadata.keys includes schema or checkpoint keys, non-ingestion operations (clustering, delete_partition, clean) will now carry those values forward, making the table resilient to archival of ingestion commits.

Risk Level

Low — The changes are additive fallback paths that only activate when existing lookups return empty. The core write path is unchanged; only the metadata propagation in CleanActionExecutor and the lookup fallbacks in TableSchemaResolver, TimelineUtils, and StreamerCheckpointUtils are new. All changes are covered by unit and functional tests.

Documentation Update

None — no new configs are introduced. The existing hoodie.write.rolling.metadata.keys and hoodie.write.rolling.metadata.timeline.lookback.commits configs now apply to clean operations as well, which is a natural extension of their documented behavior.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label Apr 22, 2026
@kbuci kbuci changed the title feature(common) Ensure schema can be rolled-over and clean can contai… feature(common) Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean Apr 22, 2026
@kbuci kbuci changed the title feature(common) Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean feature(common): Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean Apr 22, 2026
@kbuci kbuci marked this pull request as ready for review April 22, 2026 20:18
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR extends rolling metadata to clean commits and adds fallback paths so schema/checkpoint can still be resolved when archival leaves only non-ingestion instants on the active timeline. A few edge cases around lock/timeline-freshness semantics, lookback-limit double counting, and the broadened semantics of TimelineUtils.getExtraMetadataFromLatest are worth double-checking in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few small readability issues below, mostly in the new getCheckpointFromCleanInstants method and the getTableSchemaFromCleanMetadata stream chain.

Set<String> rollingKeys = config.getRollingMetadataKeys();
if (!rollingKeys.isEmpty() && !table.isMetadataTable()) {
Map<String, String> existingExtra = metadata.getExtraMetadata() != null
? metadata.getExtraMetadata() : new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The rolling-metadata merge happens before txnManager.beginStateChange(...) on the next line — so the timeline read inside findRollingMetadataFromTimeline is not inside the state-change lock. The equivalent call for commits (mergeRollingMetadata) explicitly documents that it must run inside the lock after the timeline is refreshed. Could you move this merge to after beginStateChange (and refresh the active timeline first) so it follows the same contract? Otherwise a concurrent write could complete between here and the transition, and we may roll forward a stale schema/checkpoint.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

String value = commitMetadata.getMetadata(key);
if (!StringUtils.isNullOrEmpty(value)) {
foundRollingMetadata.put(key, value);
remainingKeys.remove(key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 lookbackLimit is applied separately to commits AND clean instants, so we can end up reading up to 2 * lookbackLimit instants (and instantsWalkedBack is also shared / aggregated across both paths). Was the intent for the config to cap the total instants scanned? If so, the clean loop should subtract instantsWalkedBack from the limit (lookbackLimit - instantsWalkedBack).

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

return result;
}
return getExtraMetadataFromCleanInstants(metaClient, extraMetadataKey);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Silently broadening getExtraMetadataFromLatest / getExtraMetadataFromLatestIncludeClustering to also search clean instants changes the semantics for every existing caller. A caller that today uses this API to fetch a user-set key from commits will now inadvertently receive any value with the same key that happened to be rolled forward into a clean. Could this be made opt-in (e.g. a new includeCleans parameter or a distinct method) instead of changing the default behavior? At minimum it's worth auditing all call sites to confirm this expansion is safe for each.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

// Fallback: if no checkpoint found in commits timeline, check clean instants' extraMetadata.
// Clean instants carry rolled-over metadata when rolling metadata is configured.
if (!checkpoint.isPresent()) {
checkpoint = getCheckpointFromCleanInstants(metaClient, streamerConfig, props);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 resolveCheckpointBetweenConfigAndPrevCommit does more than just look up a checkpoint — it also handles source-class-change detection, reset/ignore keys, and various edge cases relative to the commits timeline. The fallback here bypasses all that logic when a checkpoint is only present in a clean instant. Is that deliberate? In particular, could a rolled-forward V1/V2 checkpoint from an old source be resumed without triggering the source-mismatch guard that the commit path enforces?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
// Fallback: look in ANY commit type (clustering, compaction, delete_partition, etc.)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The two fallbacks (getTableSchemaFromAnyCommitMetadata, getTableSchemaFromCleanMetadata) are now invoked on every schema resolution even when the primary lookup already failed for a benign reason (brand new table, only metadata commits, etc.). getTableSchemaFromCleanMetadata in particular does a stream scan over all clean instants. For callers that resolve schema on hot paths, is there a concern about extra I/O here — and would it be worth short-circuiting when tableCreateSchema is already present and is known to be fresh?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

.filter(cleanMeta -> cleanMeta.getExtraMetadata() != null)
.map(cleanMeta -> cleanMeta.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY))
.filter(schemaStr -> !StringUtils.isNullOrEmpty(schemaStr))
.map(schemaStr -> parseSchemaString(schemaStr, includeMetadataFields).orElse(null))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .orElse(null) followed immediately by .filter(schema -> schema != null) is a known Java streams anti-pattern — could you replace these two lines with a single .flatMap(schemaStr -> parseSchemaString(schemaStr, includeMetadataFields).toJavaOptional()) instead?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

* of checkpoint metadata on the active timeline.
*/
private static Option<Checkpoint> getCheckpointFromCleanInstants(
HoodieTableMetaClient metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: streamerConfig and props are accepted as parameters but never referenced in the method body — could you remove them (or, if they're placeholders for future use, add a short comment explaining why they're there)?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

.map(instant -> {
try {
HoodieCleanMetadata cleanMetadata = cleanerTimeline.readCleanMetadata(instant);
java.util.Map<String, String> extraMetadata = cleanMetadata.getExtraMetadata();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: java.util.Map is fully qualified here while the rest of the file uses the short form — could you add a Map import and write Map<String, String> extraMetadata to keep it consistent?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Krishen Bhan added 2 commits April 22, 2026 17:38
@danny0405
Copy link
Copy Markdown
Contributor

This PR ensures that schema and checkpoint metadata are always discoverable on the active timeline

The LSM timeline access is incremental and fast, why cann't we just load the rolling metadata from the archived timeline in such case. it looks complicated (also hard to maintain) to add the rolling metadata to table services.

@kbuci kbuci changed the title feature(common): Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean feat(common): Ensure schema can be rolled-over and that commit metadata roll-over will apply to clean Apr 23, 2026
@kbuci kbuci requested a review from hudi-agent April 23, 2026 03:13
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for addressing the feedback! The CleanActionExecutor lock-ordering issue is now handled correctly (merge runs inside beginStateChange with a fresh timeline reload), the lookback is now decremented across commits and cleans, and the readability/nit findings in StreamerCheckpointUtils and TableSchemaResolver are cleaned up. The two earlier open questions around resolveCheckpointBetweenConfigAndPrevCommit source-mismatch semantics and extra I/O on the schema-resolution fallback were not addressed in code, but they are exploratory questions suitable for follow-up. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: This pull request consolidates utility classes (MapUtils → CollectionUtils), enhances timeline-based metadata resolution with clean instant fallbacks, introduces batched blob reading for Spark, adds VECTOR type support in SQL parsers, implements continuous-sort buffering for Flink sink, improves rolling metadata propagation across commit and clean actions, and adds a standalone Hive sync job utility.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant User as Spark SQL User
    participant Parser as SQL Parser<br/>(ReadBlobRule)
    participant Optimizer as Catalyst Optimizer
    participant Strategy as Planner Strategy
    participant Executor as Physical Executor<br/>(BatchedBlobReadExec)
    participant Reader as BatchedBlobReader
    participant Storage as HoodieStorage

    User->>Parser: SELECT read_blob(data) FROM table
    Parser->>Parser: Detect read_blob() expression<br/>Extract blob columns
    Parser->>Parser: Wrap child plan with<br/>BatchedBlobRead nodes
    Parser->>Optimizer: Rewritten logical plan<br/>(Project[read_blob→dataAttr]<br/>  BatchedBlobRead<br/>    child)
    Optimizer->>Strategy: Logical plan
    Strategy->>Strategy: Match BatchedBlobRead<br/>Read config params
    Strategy->>Executor: Create BatchedBlobReadExec
    Optimizer->>Executor: Optimized physical plan
    Executor->>Executor: Execute child RDD
    Executor->>Reader: processRDD(childRDD,<br/>maxGapBytes,<br/>lookaheadSize)
    Reader->>Reader: Batch rows<br/>Merge byte ranges
    Reader->>Storage: Perform batched<br/>range reads
    Storage-->>Reader: Merged bytes
    Reader->>Reader: Split buffer into<br/>per-row slices
    Reader-->>Executor: RDD[InternalRow]<br/>with blob data
    Executor-->>User: Result set
Loading

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Action as CleanAction
    participant Timeline as HoodieActiveTimeline
    participant Commits as Commit Metadata
    participant Clean as Clean Metadata
    participant Helper as findRollingMetadata<br/>Helper

    Action->>Timeline: Lock & reload timeline
    Action->>Action: Get existing<br/>extraMetadata
    Action->>Helper: Compute rolling keys<br/>(existingExtraMetadata,<br/>rollingKeys)
    Helper->>Timeline: Walk back timeline<br/>(commits + clean)
    Helper->>Commits: Skip keys with<br/>non-empty values
    Helper->>Commits: Search commits<br/>(up to lookback limit)
    Commits-->>Helper: Found key/values
    Helper->>Clean: Search clean instants<br/>if keys missing
    Clean-->>Helper: Extra metadata<br/>from cleaners
    Helper-->>Action: Computed rolling<br/>key/values
    Action->>Action: Merge into<br/>cleanMetadata.extraMetadata
    Action->>Action: Persist via<br/>writeTableMetadata
Loading

CodeRabbit: hudi-agent#14 (review)

this.checkpointKey = metadata.get(STREAMER_CHECKPOINT_KEY_V1);
this.checkpointResetKey = metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1);
this.checkpointIgnoreKey = metadata.get(CHECKPOINT_IGNORE_KEY);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle null metadata defensively in constructor.

Line 50 assumes metadata is non-null; a null input would fail checkpoint reconstruction with an NPE.

Suggested fix
 public StreamerCheckpointV1(Map<String, String> metadata) {
-  this.checkpointKey = metadata.get(STREAMER_CHECKPOINT_KEY_V1);
-  this.checkpointResetKey = metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1);
-  this.checkpointIgnoreKey = metadata.get(CHECKPOINT_IGNORE_KEY);
+  Map<String, String> safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata;
+  this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V1);
+  this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1);
+  this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java`
around lines 50 - 54, The constructor StreamerCheckpointV1(Map<String, String>
metadata) currently dereferences metadata and can NPE; guard against null by
treating metadata as empty when null (e.g., set localMap = metadata != null ?
metadata : Collections.emptyMap()) and then read checkpointKey,
checkpointResetKey, and checkpointIgnoreKey from that localMap using the
existing constants (STREAMER_CHECKPOINT_KEY_V1,
STREAMER_CHECKPOINT_RESET_KEY_V1, CHECKPOINT_IGNORE_KEY); ensure to import
Collections if needed and avoid changing the public API.

CodeRabbit (original) (source:comment#3128223853)

this.checkpointKey = metadata.get(STREAMER_CHECKPOINT_KEY_V2);
this.checkpointResetKey = metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2);
this.checkpointIgnoreKey = metadata.get(CHECKPOINT_IGNORE_KEY);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add null-safety for metadata-map constructor.

If metadata is null, this constructor throws immediately; guarding here avoids fragile call-site assumptions.

Suggested fix
 public StreamerCheckpointV2(Map<String, String> metadata) {
-  this.checkpointKey = metadata.get(STREAMER_CHECKPOINT_KEY_V2);
-  this.checkpointResetKey = metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2);
-  this.checkpointIgnoreKey = metadata.get(CHECKPOINT_IGNORE_KEY);
+  Map<String, String> safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata;
+  this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V2);
+  this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2);
+  this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java`
around lines 51 - 55, The StreamerCheckpointV2(Map<String, String>) constructor
should be made null-safe: check if the incoming metadata is null and treat it as
an empty map (or use null-safe lookups) before assigning checkpointKey,
checkpointResetKey, and checkpointIgnoreKey from STREAMER_CHECKPOINT_KEY_V2,
STREAMER_CHECKPOINT_RESET_KEY_V2, and CHECKPOINT_IGNORE_KEY; update the
constructor in StreamerCheckpointV2 to guard against metadata == null so it does
not throw NPE at call sites.

CodeRabbit (original) (source:comment#3128223861)

if (result.isPresent()) {
return result;
}
return getExtraMetadataFromCleanInstants(metaClient, extraMetadataKey);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep scanning commit instants until the key is found.

Both methods stop at the newest completed commit because findFirst() runs before checking whether getMetadataValue(...) is present. If that latest instant does not carry extraMetadataKey but an older active commit still does, this now returns clean metadata or empty instead of the correct commit value.

Proposed fix
   public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
     Option<String> result = metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
         // exclude clustering commits for returning user stored extra metadata
         .filter(instant -> !isClusteringCommit(metaClient, instant))
-        .findFirst().map(instant ->
-            getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
+        .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant))
+        .filter(Option::isPresent)
+        .findFirst()
+        .orElse(Option.empty());
     if (result.isPresent()) {
       return result;
     }
     return getExtraMetadataFromCleanInstants(metaClient, extraMetadataKey);
   }
@@
   public static Option<String> getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) {
     Option<String> result = metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
-        .findFirst().map(instant ->
-            getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
+        .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant))
+        .filter(Option::isPresent)
+        .findFirst()
+        .orElse(Option.empty());
     if (result.isPresent()) {
       return result;
     }
     return getExtraMetadataFromCleanInstants(metaClient, extraMetadataKey);
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java`
around lines 241 - 265, Both getExtraMetadataFromLatest and
getExtraMetadataFromLatestIncludeClustering stop at the newest instant because
they call findFirst() before checking whether getMetadataValue(...) returned a
non-empty Option; change the stream to evaluate getMetadataValue for each
instant and select the first non-empty Option. Concretely, in
getExtraMetadataFromLatest and getExtraMetadataFromLatestIncludeClustering
replace the current getReverseOrderedInstants().findFirst().map(instant ->
getMetadataValue(...)).orElse(Option.empty()) logic with something like
getReverseOrderedInstants().map(instant -> getMetadataValue(metaClient,
extraMetadataKey, instant)).filter(opt ->
opt.isPresent()).findFirst().orElse(Option.empty()) (or equivalent flatMap) so
the code returns the first instant that actually contains the extraMetadataKey
before falling back to getExtraMetadataFromCleanInstants.

CodeRabbit (original) (source:comment#3128223865)

// Clean instants carry rolled-over metadata when rolling metadata is configured.
if (!checkpoint.isPresent()) {
checkpoint = getCheckpointFromCleanInstants(metaClient);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply --checkpoint / --ignore-checkpoint precedence to the clean fallback too.

This path returns a checkpoint directly from clean metadata, so a clean-derived value suppresses the later override handling and ignoreCheckpoint is never honored here. It also only accepts non-empty checkpointKey, while the commit-based path uses reset metadata as part of conflict resolution. That changes resume semantics once the active timeline relies on rolled-over clean metadata.

Also applies to: 249-275

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java`
around lines 87 - 91, The clean-instants fallback currently returns a checkpoint
unconditionally and ignores the same override/ignore logic used for
commit-derived checkpoints; update the code so that when
getCheckpointFromCleanInstants(metaClient) yields a value it is passed through
the same checkpoint-precedence logic as the commit path (respecting the
checkpointKey override and the ignoreCheckpoint flag) before assigning to
checkpoint, and apply the same change in the other occurrence (the block around
lines 249-275). Locate the usages of getCheckpointFromCleanInstants and the
variables/methods that implement override/ignore handling (e.g., checkpointKey,
ignoreCheckpoint and the commit-path resolution logic) and reuse that resolution
code so clean-derived checkpoints do not bypass precedence rules.

CodeRabbit (original) (source:comment#3128223929)

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 53.43915% with 88 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.82%. Comparing base (a834736) to head (f35f52e).
⚠️ Report is 11 commits behind head on master.

Files with missing lines Patch % Lines
.../java/org/apache/hudi/client/BaseHoodieClient.java 65.62% 20 Missing and 2 partials ⚠️
.../apache/hudi/common/table/TableSchemaResolver.java 42.85% 17 Missing and 3 partials ⚠️
...di/utilities/streamer/StreamerCheckpointUtils.java 18.18% 17 Missing and 1 partial ⚠️
.../hudi/common/table/checkpoint/CheckpointUtils.java 0.00% 11 Missing ⚠️
.../common/table/checkpoint/StreamerCheckpointV1.java 0.00% 5 Missing ⚠️
.../common/table/checkpoint/StreamerCheckpointV2.java 0.00% 5 Missing ⚠️
...e/hudi/table/action/clean/CleanActionExecutor.java 72.72% 0 Missing and 3 partials ⚠️
...ache/hudi/common/table/timeline/TimelineUtils.java 88.46% 2 Missing and 1 partial ⚠️
...table/timeline/versioning/v1/ActiveTimelineV1.java 80.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18543      +/-   ##
============================================
- Coverage     68.85%   68.82%   -0.03%     
- Complexity    28452    28495      +43     
============================================
  Files          2475     2475              
  Lines        136485   136804     +319     
  Branches      16590    16657      +67     
============================================
+ Hits          93974    94161     +187     
- Misses        34955    35071     +116     
- Partials       7556     7572      +16     
Flag Coverage Δ
common-and-other-modules 44.43% <11.64%> (-0.05%) ⬇️
hadoop-mr-java-client 44.71% <6.58%> (-0.10%) ⬇️
spark-client-hadoop-common 48.53% <48.50%> (+0.10%) ⬆️
spark-java-tests 49.39% <35.44%> (-0.02%) ⬇️
spark-scala-tests 45.27% <12.69%> (-0.05%) ⬇️
utilities 38.01% <14.81%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...table/timeline/versioning/v2/ActiveTimelineV2.java 82.92% <100.00%> (+0.23%) ⬆️
...table/timeline/versioning/v1/ActiveTimelineV1.java 67.19% <80.00%> (+0.20%) ⬆️
...e/hudi/table/action/clean/CleanActionExecutor.java 84.96% <72.72%> (-0.95%) ⬇️
...ache/hudi/common/table/timeline/TimelineUtils.java 80.66% <88.46%> (+0.66%) ⬆️
.../common/table/checkpoint/StreamerCheckpointV1.java 82.14% <0.00%> (-17.86%) ⬇️
.../common/table/checkpoint/StreamerCheckpointV2.java 73.33% <0.00%> (-14.67%) ⬇️
.../hudi/common/table/checkpoint/CheckpointUtils.java 73.84% <0.00%> (-15.05%) ⬇️
...di/utilities/streamer/StreamerCheckpointUtils.java 72.63% <18.18%> (-16.41%) ⬇️
.../apache/hudi/common/table/TableSchemaResolver.java 79.79% <42.85%> (-7.85%) ⬇️
.../java/org/apache/hudi/client/BaseHoodieClient.java 83.59% <65.62%> (-10.95%) ⬇️

... and 35 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the follow-up! The new change adds an --ignore-checkpoint guard to the clean-instants fallback so the user's explicit "fresh start" intent is respected. That partially addresses the prior question about resolveCheckpointBetweenConfigAndPrevCommit precedence (the --checkpoint override is already covered by the primary path, which returns before the fallback runs). The source-class-mismatch concern raised earlier is still not handled, but it's a reasonable follow-up rather than a blocker for this delta. None of the other prior findings were touched in this incremental diff. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.

@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented Apr 23, 2026

The LSM timeline access is incremental and fast
Oh I thought that wasn't available on table version 6 ?

// Add found rolling metadata to current commit
for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
metadata.addMetadata(entry.getKey(), entry.getValue());
// If some keys are still missing, also check clean instants' extraMetadata
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want to make another pass.
why can't we do it in one for loop. might be much simpler.
we just keep looking for N rollback keys upto lookbackLimit.
for first time, when user enables this, they can set high value for the lookback limit. and after that, they can lower the limit.

// matching the same contract as mergeRollingMetadata for commit metadata.
Set<String> rollingKeys = config.getRollingMetadataKeys();
if (!rollingKeys.isEmpty() && !table.isMetadataTable()) {
table.getMetaClient().reloadActiveTimeline();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually in L230, we already reload the timeline.
can we move that to L 230 and we may not need L 234.

* and any other commit type that carries a schema in its extraMetadata.
* Used as a fallback when no schema-evolving commits are found.
*/
Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithSchema();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the diff b/w this and getLastCommitMetadataWithValidSchema ?
L152

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it now.
how about we rename getLastCommitMetadataWithValidSchema -> getLastCommitMetadataWithValidSchemaForMutableOprtns

public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
// exclude clustering commits for returning user stored extra metadata
Option<String> result = metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should not change so many places for the rolling commit metadata feature.
thats an optional feature.
lets try our best to see if we can keep the changes confined to the feature and not change OOB behaviors.

if (result.isPresent()) {
return result;
}
return getExtraMetadataFromCleanInstants(metaClient, extraMetadataKey);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

.orElse(Option.empty());
}

private Option<HoodieSchema> parseSchemaString(String schemaStr, boolean includeMetadataFields) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseSchemaString - > generateHoodieSchema

// Fallback: if no checkpoint found in commits timeline, check clean instants' extraMetadata.
// Clean instants carry rolled-over metadata when rolling metadata is configured.
// Skip this fallback when --ignore-checkpoint is set, since the user explicitly wants a fresh start.
if (!checkpoint.isPresent() && streamerConfig.ignoreCheckpoint == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check for false for ignoreCheckpoint

? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
// Fallback: look in ANY commit type (clustering, compaction, delete_partition, etc.)
.or(() -> getTableSchemaFromAnyCommitMetadata(includeMetadataFields))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we replace L 173 w/ getTableSchemaFromAnyCommitMetadata and then fallback to getTableSchemaFromCleanMetadata ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we even need a fallback here, each fallback incurs a iteration on the timeline instants that can trigger the actual read of the commit metadata from files, can we just include the clean instant inside the getTableSchemaFromAnyCommitMetadata

// Fallback: look in ANY commit type (clustering, compaction, delete_partition, etc.)
.or(() -> getTableSchemaFromAnyCommitMetadata(includeMetadataFields))
// Fallback: look in clean instants' extraMetadata (populated by rolling metadata)
.or(() -> getTableSchemaFromCleanMetadata(includeMetadataFields))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to find ways to keep OOB unchanged.

can we lookup in clean commit metadata only if is there is no completed commits in the active timeline.
if we have completed commits, we should never be looking into clean commits.

Also, clarification on the empty cleans.
Only reason we might need to look into clean commit is, where empty cleans are enabled, and entire active timeline only contains empty cleans. Is that a fair statement?

but since the table schema resolver happens on the read side, we can't rely on writer property to confine this behavior only when empty cleans are enabled.
and hence making a generic fix here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, confused too why clean is the critical fix here against to other table services. If empty clean is the case that needs the fix, should we check if first before putting rolling metadata into the clean write metadata.

@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented Apr 24, 2026

Thanks for feedback. Based on feedback it seems there isn't a strong use for falling back to and using clean for inferring/rolling over ingestion metadata. Closing this PR and creating a separate one #18576 that just focuses on making sure schema can be rolled over and that all places in HUDI that infer checkpoint/schema from timeline will also check for non-ingestion writes

@kbuci kbuci closed this Apr 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants