From 3f4c06ee3f5a5833c22a8341c1c268f2dc86907e Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Thu, 23 Apr 2026 18:34:34 -0700 Subject: [PATCH 01/11] feat(common): When inferring checkpoint/schema from timeline, check non-ingestion write commits (in case they have metadata rolled-over --- .../apache/hudi/client/BaseHoodieClient.java | 94 +++++++++++-------- .../common/table/TableSchemaResolver.java | 53 +++++------ .../table/checkpoint/CheckpointUtils.java | 20 ++++ .../checkpoint/StreamerCheckpointV1.java | 7 ++ .../checkpoint/StreamerCheckpointV2.java | 7 ++ .../table/timeline/HoodieActiveTimeline.java | 11 ++- .../common/table/timeline/TimelineUtils.java | 22 +++-- .../versioning/v1/ActiveTimelineV1.java | 10 ++ .../versioning/v2/ActiveTimelineV2.java | 9 ++ .../hudi/common/table/TestTimelineUtils.java | 80 ++++++++++++++++ .../TestHoodieClientOnCopyOnWriteStorage.java | 70 ++++++++++++++ ...ointFromAnotherHoodieTimelineProvider.java | 11 ++- 12 files changed, 313 insertions(+), 81 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index e5fd93d56a601..b7c0c08227106 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -326,49 +326,65 @@ protected boolean isStreamingWriteToMetadataEnabled(HoodieTable table) { * @param metadata Current commit metadata to be augmented with rolling metadata */ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata metadata) { - // Skip for metadata table - rolling metadata is only for data tables + // IMPORTANT: We're inside the lock here. The timeline in 'table' is either: + // 1. Fresh from createTable() if no conflict resolution happened + // 2. Reloaded during resolveWriteConflict() if conflicts were checked + // In both cases, we have the latest view of the timeline. + if (table.isMetadataTable()) { return; } Set rollingKeys = config.getRollingMetadataKeys(); if (rollingKeys.isEmpty()) { - return; // No rolling metadata configured + return; } - // IMPORTANT: We're inside the lock here. The timeline in 'table' is either: - // 1. Fresh from createTable() if no conflict resolution happened - // 2. Reloaded during resolveWriteConflict() if conflicts were checked - // In both cases, we have the latest view of the timeline. + Map foundRollingMetadata = + findRollingMetadataFromTimeline(table, config, rollingKeys, metadata.getExtraMetadata()); + for (Map.Entry entry : foundRollingMetadata.entrySet()) { + metadata.addMetadata(entry.getKey(), entry.getValue()); + } + } + + /** + * Walks back the active commits timeline to find values for the given rolling metadata keys. + * Keys that already have non-empty values in {@code existingExtraMetadata} are skipped — + * empty strings are treated as "missing". + * + * @param table HoodieTable with a valid active timeline + * @param config HoodieWriteConfig with rolling metadata settings + * @param rollingKeys the set of keys to look for + * @param existingExtraMetadata existing metadata from the current commit + * @return a map of key to value for keys that were found in prior commits + */ + public static Map findRollingMetadataFromTimeline( + HoodieTable table, HoodieWriteConfig config, + Set rollingKeys, Map existingExtraMetadata) { + + Map foundRollingMetadata = new HashMap<>(); + Set remainingKeys = new HashSet<>(rollingKeys); + + for (String key : rollingKeys) { + if (existingExtraMetadata.containsKey(key) && !StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) { + remainingKeys.remove(key); + } + } + + if (remainingKeys.isEmpty()) { + log.debug("All rolling metadata keys are present in current commit. No walkback needed."); + return foundRollingMetadata; + } HoodieTimeline commitsTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits(); if (commitsTimeline.empty()) { log.info("No previous commits found. Rolling metadata will start with current commit."); - return; // First commit - nothing to roll forward + return foundRollingMetadata; } try { - Map existingExtraMetadata = metadata.getExtraMetadata(); - Map foundRollingMetadata = new HashMap<>(); - Set remainingKeys = new HashSet<>(rollingKeys); - - // Remove keys that are already present in current commit (current values take precedence) - for (String key : rollingKeys) { - if (existingExtraMetadata.containsKey(key)) { - remainingKeys.remove(key); - } - } - - if (remainingKeys.isEmpty()) { - log.debug("All rolling metadata keys are present in current commit. No walkback needed."); - return; - } - - int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits(); - int commitsWalkedBack = 0; - - // Walk back through the timeline in reverse order (most recent first) to find values for all remaining keys List recentCommits = commitsTimeline.getReverseOrderedInstantsByCompletionTime() .limit(lookbackLimit) .collect(Collectors.toList()); @@ -376,18 +392,18 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta log.debug("Walking back up to {} commits to find rolling metadata for keys: {}", lookbackLimit, remainingKeys); + int commitsWalkedBack = 0; for (HoodieInstant instant : recentCommits) { if (remainingKeys.isEmpty()) { - break; // Found all keys + break; } - commitsWalkedBack++; - HoodieCommitMetadata commitMetadata = table.getMetaClient().getActiveTimeline().readInstantContent(instant, HoodieCommitMetadata.class); + HoodieCommitMetadata commitMetadata = table.getMetaClient().getActiveTimeline() + .readInstantContent(instant, HoodieCommitMetadata.class); - // Check for remaining keys in this commit for (String key : new HashSet<>(remainingKeys)) { String value = commitMetadata.getMetadata(key); - if (value != null) { + if (!StringUtils.isNullOrEmpty(value)) { foundRollingMetadata.put(key, value); remainingKeys.remove(key); log.debug("Found rolling metadata key '{}' in commit {} with value: {}", @@ -396,28 +412,24 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta } } - // Add found rolling metadata to current commit - for (Map.Entry entry : foundRollingMetadata.entrySet()) { - metadata.addMetadata(entry.getKey(), entry.getValue()); - } - int rolledForwardCount = foundRollingMetadata.size(); int updatedCount = rollingKeys.size() - remainingKeys.size() - rolledForwardCount; if (rolledForwardCount > 0 || updatedCount > 0 || !remainingKeys.isEmpty()) { - log.info("Rolling metadata merge completed. Walked back {} commits. " - + "Rolled forward: {}, Updated in current: {}, Not found: {}, Total rolling keys: {}", + log.info("Rolling metadata: walked back {} commits. " + + "Rolled forward: {}, Already present: {}, Not found: {}, Total rolling keys: {}", commitsWalkedBack, rolledForwardCount, updatedCount, remainingKeys.size(), rollingKeys.size()); } if (!remainingKeys.isEmpty()) { log.warn("Rolling metadata keys not found in last {} commits: {}. " - + "These keys will not be included in the current commit.", lookbackLimit, remainingKeys); + + "These keys will not be included in the current commit.", commitsWalkedBack, remainingKeys); } - } catch (IOException e) { log.error("Failed to read previous commit metadata for rolling metadata keys: {}.", rollingKeys, e); throw new HoodieIOException("Failed to read previous commit metadata for rolling metadata keys: " + rollingKeys, e); } + + return foundRollingMetadata; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 095a4ffef7170..608facbf049f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -170,7 +170,7 @@ private Option getTableSchemaInternal(boolean includeMetadataField Option schema = (instantOpt.isPresent() ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) - : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) + : getTableSchemaFromAnyCommitMetadata(includeMetadataFields)) .or(() -> metaClient.getTableConfig().getTableCreateSchema() .map(tableSchema -> @@ -196,44 +196,43 @@ private Option getTableSchemaInternal(boolean includeMetadataField return schema; } - private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { - Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema(); + /** + * Find schema from ANY completed commit type (including clustering, compaction, delete_partition) + * regardless of {@link WriteOperationType#canUpdateSchema}. This ensures schema is discoverable + * even when the active timeline only contains non-ingestion commits (e.g. after archival). + */ + private Option getTableSchemaFromAnyCommitMetadata(boolean includeMetadataFields) { + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithSchema(); if (instantAndCommitMetadata.isPresent()) { - HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - HoodieSchema schema = HoodieSchema.parse(schemaStr); - if (includeMetadataFields) { - schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); - } else { - schema = HoodieSchemaUtils.removeMetadataFields(schema); - } - return Option.of(schema); - } else { - return Option.empty(); + String schemaStr = instantAndCommitMetadata.get().getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + return generateHoodieSchema(schemaStr, includeMetadataFields); } + return Option.empty(); } private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { try { HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); - String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - - if (StringUtils.isNullOrEmpty(existingSchemaStr)) { - return Option.empty(); - } - - HoodieSchema schema = HoodieSchema.parse(existingSchemaStr); - if (includeMetadataFields) { - schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); - } else { - schema = HoodieSchemaUtils.removeMetadataFields(schema); - } - return Option.of(schema); + return generateHoodieSchema(metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), includeMetadataFields); } catch (Exception e) { throw new HoodieException("Failed to read schema from commit metadata", e); } } + private Option generateHoodieSchema(String schemaStr, boolean includeMetadataFields) { + if (StringUtils.isNullOrEmpty(schemaStr)) { + return Option.empty(); + } + HoodieSchema schema = HoodieSchema.parse(schemaStr); + if (includeMetadataFields) { + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); + } else { + schema = HoodieSchemaUtils.removeMetadataFields(schema); + } + return Option.of(schema); + } + /** * Fetches the schema for a table from any the table's data files */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 15084a74ae46d..fc60c22c0f872 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -71,6 +72,25 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); } + public static Checkpoint getCheckpoint(Map metadata) { + if (!StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V2)) + || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2))) { + return new StreamerCheckpointV2(metadata); + } + if (!StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V1)) + || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1))) { + return new StreamerCheckpointV1(metadata); + } + throw new HoodieException("Checkpoint is not found in the metadata: " + metadata); + } + + public static boolean hasCheckpointKeys(Map metadata) { + return !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V2)) + || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2)) + || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V1)) + || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1)); + } + public static Checkpoint buildCheckpointFromGeneralSource( String sourceClassName, int writeTableVersion, String checkpointToResume) { return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java index d6f5510da5861..2b26521371a27 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java @@ -47,6 +47,13 @@ public StreamerCheckpointV1(HoodieCommitMetadata commitMetadata) { this.checkpointIgnoreKey = commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY); } + public StreamerCheckpointV1(Map metadata) { + Map safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; + this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V1); + this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1); + this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY); + } + @Override public Map getCheckpointCommitMetadata(String overrideResetKey, String overrideIgnoreKey) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java index 0326e715fc552..322d4af5812aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java @@ -48,6 +48,13 @@ public StreamerCheckpointV2(HoodieCommitMetadata commitMetadata) { this.checkpointIgnoreKey = commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY); } + public StreamerCheckpointV2(Map metadata) { + Map safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; + this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V2); + this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2); + this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY); + } + public void addV1Props() { this.extraProps.put(STREAMER_CHECKPOINT_KEY_V1, checkpointKey); this.extraProps.put(STREAMER_CHECKPOINT_RESET_KEY_V1, checkpointResetKey); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 7d65b0aa5f609..92ca6bf8c0f22 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -146,10 +146,19 @@ public interface HoodieActiveTimeline extends HoodieTimeline { void deleteInstantFileIfExists(HoodieInstant instant); /** - * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata} + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}, + * restricted to operations where {@link WriteOperationType#canUpdateSchema} is true. */ Option> getLastCommitMetadataWithValidSchema(); + /** + * Returns most recent instant having a non-empty schema in its {@link HoodieCommitMetadata}, + * regardless of {@link WriteOperationType}. This includes clustering, compaction, delete_partition, + * and any other commit type that may carry a schema in its metadata. + * Used as a fallback when no schema-evolving commits are found. + */ + Option> getLastCommitMetadataWithSchema(); + /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index cfc1520b23933..9d8c30219ce7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -233,24 +233,30 @@ public static List getAffectedPartitions(HoodieTimeline timeline) { } /** - * Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant. + * Get extra metadata for specified key from the most recent commit/deltacommit/replacecommit + * (excluding clustering) that actually contains the key. Scans backwards through the timeline + * so that if the latest commit doesn't carry the key, older commits are still checked. */ public static Option getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) { return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() - // exclude clustering commits for returning user stored extra metadata .filter(instant -> !isClusteringCommit(metaClient, instant)) - .findFirst().map(instant -> - getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); + .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant)) + .filter(Option::isPresent) + .findFirst() + .orElse(Option.empty()); } /** - * Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits - * such as clustering. + * Get extra metadata for specified key from the most recent commit/deltacommit/replacecommit + * instant (including clustering) that actually contains the key. Scans backwards through the + * timeline so that if the latest commit doesn't carry the key, older commits are still checked. */ public static Option getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) { return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() - .findFirst().map(instant -> - getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); + .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant)) + .filter(Option::isPresent) + .findFirst() + .orElse(Option.empty()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java index e69561fafb2a8..2e15f11618798 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java @@ -285,6 +285,16 @@ public Option> getLastCommitMetadataWi ); } + @Override + public Option> getLastCommitMetadataWithSchema() { + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + .findFirst() + ); + } + @Override public Option> getLastCommitMetadataWithValidData() { return Option.fromJavaOptional( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index 90352bed48b51..cd06000e38fa1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -299,6 +299,15 @@ public Option> getLastCommitMetadataWi .findFirst()); } + @Override + public Option> getLastCommitMetadataWithSchema() { + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + .findFirst()); + } + @Override public Option> getLastCommitMetadataWithValidData() { return Option.fromJavaOptional( diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 1175913420b81..9d314514885f5 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -662,4 +662,84 @@ void testGetDroppedPartitions() throws Exception { droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty()); assertTrue(droppedPartitions.isEmpty()); } + + @Test + void testGetLastCommitMetadataWithSchemaIgnoresOperationType() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); + HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, CLUSTERING_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(clusterInstant); + activeTimeline.transitionClusterInflightToComplete(true, clusterInstant, + getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER)); + + metaClient.reloadActiveTimeline(); + + // getLastCommitMetadataWithValidSchema should NOT find it (filtered by canUpdateSchema) + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(), + "canUpdateSchema filter should exclude clustering"); + + // getLastCommitMetadataWithSchema SHOULD find it (no operation type filter) + assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().isPresent(), + "getLastCommitMetadataWithSchema should find schema in clustering commit"); + assertEquals(schemaStr, + metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().get().getRight() + .getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + } + + @Test + void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap())); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().isPresent(), + "Should return empty when no commits have schema"); + } + + @Test + void testGetExtraMetadataScansMultipleInstants() throws Exception { + String extraMetadataKey = "checkpoint_key"; + String extraMetadataValue = "kafka:topic:0:100"; + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + // First commit: no checkpoint key + HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant1); + activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, Collections.emptyMap())); + + // Second commit: has the checkpoint key + Map withCheckpoint = new HashMap<>(); + withCheckpoint.put(extraMetadataKey, extraMetadataValue); + HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant2); + activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, withCheckpoint)); + + // Third commit: no checkpoint key (this is the latest) + HoodieInstant instant3 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "3", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant3); + activeTimeline.saveAsComplete(instant3, getCommitMetadata(basePath, "p1", "3", 2, Collections.emptyMap())); + + metaClient.reloadActiveTimeline(); + + // Should scan past instant3 and find the key in instant2 + Option result = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey); + assertTrue(result.isPresent(), "Should find metadata by scanning past latest commit"); + assertEquals(extraMetadataValue, result.get()); + + Option resultIncluding = TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, extraMetadataKey); + assertTrue(resultIncluding.isPresent(), "Should find metadata by scanning past latest commit (include clustering)"); + assertEquals(extraMetadataValue, resultIncluding.get()); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 6f873d2d5059d..2d605346ea0c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -55,8 +55,10 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.InstantGenerator; import org.apache.hudi.common.table.timeline.TimelineFactory; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -1992,6 +1994,74 @@ protected void autoCleanOnCommit() { } } + @Test + public void testRollingMetadataPreservedAcrossClusteringAfterArchival() throws Exception { + String schemaKey = HoodieCommitMetadata.SCHEMA_KEY; + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH}); + + HoodieWriteConfig writeConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(0).build()) + .withRollingMetadataKeys(schemaKey) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false).build()) + .build(); + + SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); + + // Insert multiple batches to create file groups for clustering + for (int i = 0; i < 5; i++) { + insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + } + + HoodieWriteConfig clusterConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA) + .withClusteringConfig(createClusteringBuilder(true, 1).build()) + .withRollingMetadataKeys(schemaKey) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false).build()) + .build(); + + for (int round = 0; round < 2; round++) { + SparkRDDWriteClient clusterWriter = getHoodieWriteClient(clusterConfig); + Option clusteringInstant = clusterWriter.scheduleClustering(Option.empty()); + assertTrue(clusteringInstant.isPresent(), + "Clustering plan should be created (round " + round + ")"); + clusterWriter.cluster(clusteringInstant.get()); + + for (int i = 0; i < 3; i++) { + insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + } + } + + client.archive(); + + HoodieTableMetaClient freshMeta = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline completedTimeline = freshMeta.getActiveTimeline() + .getCommitsTimeline().filterCompletedInstants(); + + boolean foundSchemaInClustering = false; + for (HoodieInstant instant : completedTimeline.getInstants()) { + HoodieCommitMetadata metadata = completedTimeline.readCommitMetadata(instant); + if (metadata.getOperationType() == WriteOperationType.CLUSTER) { + String schema = metadata.getMetadata(schemaKey); + if (schema != null && !schema.isEmpty()) { + foundSchemaInClustering = true; + break; + } + } + } + assertTrue(foundSchemaInClustering, + "Schema should be rolled over into clustering commits via rolling metadata"); + + TableSchemaResolver resolver = new TableSchemaResolver(freshMeta); + assertTrue(resolver.getTableSchemaIfPresent(false).isPresent(), + "TableSchemaResolver should find schema even with clustering-only timeline"); + } + /** * Disabling row writer here as clustering tests will throw the error below if it is used. * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 230e1a1189bef..6d2e3a7323d04 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.checkpoint.CheckpointUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hadoop.conf.Configuration; @@ -52,18 +53,20 @@ public void init(Configuration config) throws HoodieException { @Override public String getCheckpoint() throws HoodieException { - return anotherDsHoodieMetaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + // Use getWriteTimeline() to include compaction/logcompaction in addition to + // commit/deltacommit/replacecommit, so checkpoint metadata rolled into any + // non-ingestion commit type is discoverable after archival. + return anotherDsHoodieMetaClient.getActiveTimeline().getWriteTimeline() + .filterCompletedInstants().getReverseOrderedInstants() .map(instant -> { try { HoodieCommitMetadata commitMetadata = anotherDsHoodieMetaClient.getActiveTimeline().readCommitMetadata(instant); - // Use CheckpointUtils to handle both V1 and V2 checkpoint keys return CheckpointUtils.getCheckpoint(commitMetadata).getCheckpointKey(); } catch (HoodieException e) { - // No checkpoint found in this commit return null; } catch (IOException e) { - return null; + throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); } }).filter(Objects::nonNull).findFirst() .orElseThrow(() -> new HoodieException("Unable to find checkpoint in source table at: " From 2b3416431b6ee02db0e1650d17b607172d02a813 Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Thu, 23 Apr 2026 19:24:05 -0700 Subject: [PATCH 02/11] feat(common): When inferring checkpoint/schema from timeline, check non-ingestion write commits (in case they have metadata rolled-over --- .../InitialCheckpointFromAnotherHoodieTimelineProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 6d2e3a7323d04..194d1032db827 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.checkpoint.CheckpointUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -29,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import java.io.IOException; -import java.util.Objects; /** * This is used to set a checkpoint from latest commit of another (mirror) hudi dataset. @@ -68,7 +68,7 @@ public String getCheckpoint() throws HoodieException { } catch (IOException e) { throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); } - }).filter(Objects::nonNull).findFirst() + }).filter(key -> !StringUtils.isNullOrEmpty(key)).findFirst() .orElseThrow(() -> new HoodieException("Unable to find checkpoint in source table at: " + path + ". This table may not have been created with checkpoint tracking enabled.")); } From 9cb152d8fda045f72aad28848c412921560d4b35 Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Thu, 23 Apr 2026 23:16:21 -0700 Subject: [PATCH 03/11] change table schema to get latest instant --- .../common/table/TableSchemaResolver.java | 69 +++++++++++-------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 608facbf049f1..5e2617d192234 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -166,11 +166,10 @@ public Option getTableSchemaIfPresent(boolean includeMetadataField return getTableSchemaInternal(includeMetadataFields, Option.empty()); } - private Option getTableSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + private Option getTableSchemaInternal(boolean includeMetadataFields, + Option latestInstantOpt) { Option schema = - (instantOpt.isPresent() - ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) - : getTableSchemaFromAnyCommitMetadata(includeMetadataFields)) + getTableSchemaFromAnyCommitMetadata(includeMetadataFields, latestInstantOpt) .or(() -> metaClient.getTableConfig().getTableCreateSchema() .map(tableSchema -> @@ -200,39 +199,53 @@ private Option getTableSchemaInternal(boolean includeMetadataField * Find schema from ANY completed commit type (including clustering, compaction, delete_partition) * regardless of {@link WriteOperationType#canUpdateSchema}. This ensures schema is discoverable * even when the active timeline only contains non-ingestion commits (e.g. after archival). + * + * @param latestInstantOpt if present, only instants at or before this instant are considered */ - private Option getTableSchemaFromAnyCommitMetadata(boolean includeMetadataFields) { + private Option getTableSchemaFromAnyCommitMetadata(boolean includeMetadataFields, + Option latestInstantOpt) { + if (latestInstantOpt.isPresent()) { + HoodieTimeline boundedTimeline = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .findInstantsBeforeOrEquals(latestInstantOpt.get().requestedTime()); + return Option.fromJavaOptional( + boundedTimeline.getReverseOrderedInstants() + .map(instant -> { + try { + HoodieCommitMetadata commitMetadata = metaClient.getActiveTimeline().readCommitMetadata(instant); + return Pair.of(instant, commitMetadata); + } catch (IOException e) { + throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); + } + }) + .filter(pair -> !StringUtils.isNullOrEmpty(pair.getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + .map(pair -> { + HoodieSchema schema = HoodieSchema.parse(pair.getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + return includeMetadataFields + ? HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()) + : HoodieSchemaUtils.removeMetadataFields(schema); + }) + .findFirst() + ); + } + Option> instantAndCommitMetadata = metaClient.getActiveTimeline().getLastCommitMetadataWithSchema(); if (instantAndCommitMetadata.isPresent()) { String schemaStr = instantAndCommitMetadata.get().getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - return generateHoodieSchema(schemaStr, includeMetadataFields); + if (!StringUtils.isNullOrEmpty(schemaStr)) { + HoodieSchema schema = HoodieSchema.parse(schemaStr); + if (includeMetadataFields) { + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); + } else { + schema = HoodieSchemaUtils.removeMetadataFields(schema); + } + return Option.of(schema); + } } return Option.empty(); } - private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { - try { - HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); - return generateHoodieSchema(metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), includeMetadataFields); - } catch (Exception e) { - throw new HoodieException("Failed to read schema from commit metadata", e); - } - } - - private Option generateHoodieSchema(String schemaStr, boolean includeMetadataFields) { - if (StringUtils.isNullOrEmpty(schemaStr)) { - return Option.empty(); - } - HoodieSchema schema = HoodieSchema.parse(schemaStr); - if (includeMetadataFields) { - schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); - } else { - schema = HoodieSchemaUtils.removeMetadataFields(schema); - } - return Option.of(schema); - } - /** * Fetches the schema for a table from any the table's data files */ From 57b6ac206a53d449a09286461b03e667d5d6e8cc Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 11:16:02 -0700 Subject: [PATCH 04/11] change table schema to get latest instant --- .../apache/hudi/client/BaseHoodieClient.java | 92 ++++++++----------- .../table/checkpoint/CheckpointUtils.java | 20 ---- .../checkpoint/StreamerCheckpointV1.java | 7 -- .../checkpoint/StreamerCheckpointV2.java | 7 -- 4 files changed, 40 insertions(+), 86 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index b7c0c08227106..842dc38177a59 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -326,65 +326,49 @@ protected boolean isStreamingWriteToMetadataEnabled(HoodieTable table) { * @param metadata Current commit metadata to be augmented with rolling metadata */ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata metadata) { - // IMPORTANT: We're inside the lock here. The timeline in 'table' is either: - // 1. Fresh from createTable() if no conflict resolution happened - // 2. Reloaded during resolveWriteConflict() if conflicts were checked - // In both cases, we have the latest view of the timeline. - + // Skip for metadata table - rolling metadata is only for data tables if (table.isMetadataTable()) { return; } Set rollingKeys = config.getRollingMetadataKeys(); if (rollingKeys.isEmpty()) { - return; - } - - Map foundRollingMetadata = - findRollingMetadataFromTimeline(table, config, rollingKeys, metadata.getExtraMetadata()); - for (Map.Entry entry : foundRollingMetadata.entrySet()) { - metadata.addMetadata(entry.getKey(), entry.getValue()); - } - } - - /** - * Walks back the active commits timeline to find values for the given rolling metadata keys. - * Keys that already have non-empty values in {@code existingExtraMetadata} are skipped — - * empty strings are treated as "missing". - * - * @param table HoodieTable with a valid active timeline - * @param config HoodieWriteConfig with rolling metadata settings - * @param rollingKeys the set of keys to look for - * @param existingExtraMetadata existing metadata from the current commit - * @return a map of key to value for keys that were found in prior commits - */ - public static Map findRollingMetadataFromTimeline( - HoodieTable table, HoodieWriteConfig config, - Set rollingKeys, Map existingExtraMetadata) { - - Map foundRollingMetadata = new HashMap<>(); - Set remainingKeys = new HashSet<>(rollingKeys); - - for (String key : rollingKeys) { - if (existingExtraMetadata.containsKey(key) && !StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) { - remainingKeys.remove(key); - } + return; // No rolling metadata configured } - if (remainingKeys.isEmpty()) { - log.debug("All rolling metadata keys are present in current commit. No walkback needed."); - return foundRollingMetadata; - } + // IMPORTANT: We're inside the lock here. The timeline in 'table' is either: + // 1. Fresh from createTable() if no conflict resolution happened + // 2. Reloaded during resolveWriteConflict() if conflicts were checked + // In both cases, we have the latest view of the timeline. HoodieTimeline commitsTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits(); if (commitsTimeline.empty()) { log.info("No previous commits found. Rolling metadata will start with current commit."); - return foundRollingMetadata; + return; // First commit - nothing to roll forward } try { + Map existingExtraMetadata = metadata.getExtraMetadata(); + Map foundRollingMetadata = new HashMap<>(); + Set remainingKeys = new HashSet<>(rollingKeys); + + // Remove keys that are already present with non-empty values in current commit (current values take precedence) + for (String key : rollingKeys) { + if (existingExtraMetadata.containsKey(key) && !StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) { + remainingKeys.remove(key); + } + } + + if (remainingKeys.isEmpty()) { + log.debug("All rolling metadata keys are present in current commit. No walkback needed."); + return; + } + + int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits(); + int commitsWalkedBack = 0; + + // Walk back through the timeline in reverse order (most recent first) to find values for all remaining keys List recentCommits = commitsTimeline.getReverseOrderedInstantsByCompletionTime() .limit(lookbackLimit) .collect(Collectors.toList()); @@ -392,15 +376,15 @@ public static Map findRollingMetadataFromTimeline( log.debug("Walking back up to {} commits to find rolling metadata for keys: {}", lookbackLimit, remainingKeys); - int commitsWalkedBack = 0; for (HoodieInstant instant : recentCommits) { if (remainingKeys.isEmpty()) { - break; + break; // Found all keys } + commitsWalkedBack++; - HoodieCommitMetadata commitMetadata = table.getMetaClient().getActiveTimeline() - .readInstantContent(instant, HoodieCommitMetadata.class); + HoodieCommitMetadata commitMetadata = table.getMetaClient().getActiveTimeline().readInstantContent(instant, HoodieCommitMetadata.class); + // Check for remaining keys in this commit for (String key : new HashSet<>(remainingKeys)) { String value = commitMetadata.getMetadata(key); if (!StringUtils.isNullOrEmpty(value)) { @@ -412,24 +396,28 @@ public static Map findRollingMetadataFromTimeline( } } + // Add found rolling metadata to current commit + for (Map.Entry entry : foundRollingMetadata.entrySet()) { + metadata.addMetadata(entry.getKey(), entry.getValue()); + } + int rolledForwardCount = foundRollingMetadata.size(); int updatedCount = rollingKeys.size() - remainingKeys.size() - rolledForwardCount; if (rolledForwardCount > 0 || updatedCount > 0 || !remainingKeys.isEmpty()) { - log.info("Rolling metadata: walked back {} commits. " - + "Rolled forward: {}, Already present: {}, Not found: {}, Total rolling keys: {}", + log.info("Rolling metadata merge completed. Walked back {} commits. " + + "Rolled forward: {}, Updated in current: {}, Not found: {}, Total rolling keys: {}", commitsWalkedBack, rolledForwardCount, updatedCount, remainingKeys.size(), rollingKeys.size()); } if (!remainingKeys.isEmpty()) { log.warn("Rolling metadata keys not found in last {} commits: {}. " - + "These keys will not be included in the current commit.", commitsWalkedBack, remainingKeys); + + "These keys will not be included in the current commit.", lookbackLimit, remainingKeys); } + } catch (IOException e) { log.error("Failed to read previous commit metadata for rolling metadata keys: {}.", rollingKeys, e); throw new HoodieIOException("Failed to read previous commit metadata for rolling metadata keys: " + rollingKeys, e); } - - return foundRollingMetadata; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index fc60c22c0f872..15084a74ae46d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Objects; import java.util.Set; @@ -72,25 +71,6 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); } - public static Checkpoint getCheckpoint(Map metadata) { - if (!StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V2)) - || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2))) { - return new StreamerCheckpointV2(metadata); - } - if (!StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V1)) - || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1))) { - return new StreamerCheckpointV1(metadata); - } - throw new HoodieException("Checkpoint is not found in the metadata: " + metadata); - } - - public static boolean hasCheckpointKeys(Map metadata) { - return !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V2)) - || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2)) - || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_KEY_V1)) - || !StringUtils.isNullOrEmpty(metadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1)); - } - public static Checkpoint buildCheckpointFromGeneralSource( String sourceClassName, int writeTableVersion, String checkpointToResume) { return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java index 2b26521371a27..d6f5510da5861 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV1.java @@ -47,13 +47,6 @@ public StreamerCheckpointV1(HoodieCommitMetadata commitMetadata) { this.checkpointIgnoreKey = commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY); } - public StreamerCheckpointV1(Map metadata) { - Map safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; - this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V1); - this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V1); - this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY); - } - @Override public Map getCheckpointCommitMetadata(String overrideResetKey, String overrideIgnoreKey) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java index 322d4af5812aa..0326e715fc552 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java @@ -48,13 +48,6 @@ public StreamerCheckpointV2(HoodieCommitMetadata commitMetadata) { this.checkpointIgnoreKey = commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY); } - public StreamerCheckpointV2(Map metadata) { - Map safeMetadata = metadata == null ? java.util.Collections.emptyMap() : metadata; - this.checkpointKey = safeMetadata.get(STREAMER_CHECKPOINT_KEY_V2); - this.checkpointResetKey = safeMetadata.get(STREAMER_CHECKPOINT_RESET_KEY_V2); - this.checkpointIgnoreKey = safeMetadata.get(CHECKPOINT_IGNORE_KEY); - } - public void addV1Props() { this.extraProps.put(STREAMER_CHECKPOINT_KEY_V1, checkpointKey); this.extraProps.put(STREAMER_CHECKPOINT_RESET_KEY_V1, checkpointResetKey); From ee3f2abd031afe859c6c19a3d4f2d8cf65b11e0f Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 12:12:46 -0700 Subject: [PATCH 05/11] remove changes --- .../common/table/TableSchemaResolver.java | 86 ++++++++----------- .../table/timeline/HoodieActiveTimeline.java | 11 +-- .../versioning/v1/ActiveTimelineV1.java | 13 +-- .../versioning/v2/ActiveTimelineV2.java | 12 +-- .../hudi/common/table/TestTimelineUtils.java | 12 +-- 5 files changed, 57 insertions(+), 77 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 5e2617d192234..29ce50ef88dec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -166,10 +166,11 @@ public Option getTableSchemaIfPresent(boolean includeMetadataField return getTableSchemaInternal(includeMetadataFields, Option.empty()); } - private Option getTableSchemaInternal(boolean includeMetadataFields, - Option latestInstantOpt) { + private Option getTableSchemaInternal(boolean includeMetadataFields, Option instantOpt) { Option schema = - getTableSchemaFromAnyCommitMetadata(includeMetadataFields, latestInstantOpt) + (instantOpt.isPresent() + ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) + : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) .or(() -> metaClient.getTableConfig().getTableCreateSchema() .map(tableSchema -> @@ -195,55 +196,42 @@ private Option getTableSchemaInternal(boolean includeMetadataField return schema; } - /** - * Find schema from ANY completed commit type (including clustering, compaction, delete_partition) - * regardless of {@link WriteOperationType#canUpdateSchema}. This ensures schema is discoverable - * even when the active timeline only contains non-ingestion commits (e.g. after archival). - * - * @param latestInstantOpt if present, only instants at or before this instant are considered - */ - private Option getTableSchemaFromAnyCommitMetadata(boolean includeMetadataFields, - Option latestInstantOpt) { - if (latestInstantOpt.isPresent()) { - HoodieTimeline boundedTimeline = metaClient.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants() - .findInstantsBeforeOrEquals(latestInstantOpt.get().requestedTime()); - return Option.fromJavaOptional( - boundedTimeline.getReverseOrderedInstants() - .map(instant -> { - try { - HoodieCommitMetadata commitMetadata = metaClient.getActiveTimeline().readCommitMetadata(instant); - return Pair.of(instant, commitMetadata); - } catch (IOException e) { - throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); - } - }) - .filter(pair -> !StringUtils.isNullOrEmpty(pair.getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) - .map(pair -> { - HoodieSchema schema = HoodieSchema.parse(pair.getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); - return includeMetadataFields - ? HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()) - : HoodieSchemaUtils.removeMetadataFields(schema); - }) - .findFirst() - ); + private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { + Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema(); + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + HoodieSchema schema = HoodieSchema.parse(schemaStr); + if (includeMetadataFields) { + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); + } else { + schema = HoodieSchemaUtils.removeMetadataFields(schema); + } + return Option.of(schema); + } else { + return Option.empty(); } + } - Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithSchema(); - if (instantAndCommitMetadata.isPresent()) { - String schemaStr = instantAndCommitMetadata.get().getRight().getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - if (!StringUtils.isNullOrEmpty(schemaStr)) { - HoodieSchema schema = HoodieSchema.parse(schemaStr); - if (includeMetadataFields) { - schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); - } else { - schema = HoodieSchemaUtils.removeMetadataFields(schema); - } - return Option.of(schema); + private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { + try { + HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); + String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + + if (StringUtils.isNullOrEmpty(existingSchemaStr)) { + return Option.empty(); + } + + HoodieSchema schema = HoodieSchema.parse(existingSchemaStr); + if (includeMetadataFields) { + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); + } else { + schema = HoodieSchemaUtils.removeMetadataFields(schema); } + return Option.of(schema); + } catch (Exception e) { + throw new HoodieException("Failed to read schema from commit metadata", e); } - return Option.empty(); } /** @@ -393,7 +381,7 @@ public boolean hasOperationField() { private Option> getLatestCommitMetadataWithValidSchema() { if (latestCommitWithValidSchema == null) { Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false); if (instantAndCommitMetadata.isPresent()) { HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 92ca6bf8c0f22..70cc21c835c39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -152,12 +152,13 @@ public interface HoodieActiveTimeline extends HoodieTimeline { Option> getLastCommitMetadataWithValidSchema(); /** - * Returns most recent instant having a non-empty schema in its {@link HoodieCommitMetadata}, - * regardless of {@link WriteOperationType}. This includes clustering, compaction, delete_partition, - * and any other commit type that may carry a schema in its metadata. - * Used as a fallback when no schema-evolving commits are found. + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}. + * + * @param filterByCanUpdateSchema if true, only considers commits where + * {@link WriteOperationType#canUpdateSchema} is true (original behavior). + * If false, considers any commit type with a non-empty schema. */ - Option> getLastCommitMetadataWithSchema(); + Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema); /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java index 2e15f11618798..a4c9434a52ec0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java @@ -276,21 +276,16 @@ public HoodieInstantReader getInstantReader() { @Override public Option> getLastCommitMetadataWithValidSchema() { - return Option.fromJavaOptional( - getCommitMetadataStream() - .filter(instantCommitMetadataPair -> - WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()) - && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) - .findFirst() - ); + return getLastCommitMetadataWithValidSchema(true); } @Override - public Option> getLastCommitMetadataWithSchema() { + public Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + (!filterByCanUpdateSchema || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) + && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst() ); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index cd06000e38fa1..c8080302a422d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -291,20 +291,16 @@ public HoodieInstantReader getInstantReader() { @Override public Option> getLastCommitMetadataWithValidSchema() { - return Option.fromJavaOptional( - getCommitMetadataStream() - .filter(instantCommitMetadataPair -> - WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()) - && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) - .findFirst()); + return getLastCommitMetadataWithValidSchema(true); } @Override - public Option> getLastCommitMetadataWithSchema() { + public Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + (!filterByCanUpdateSchema || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) + && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 9d314514885f5..be072898c2281 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -678,15 +678,15 @@ void testGetLastCommitMetadataWithSchemaIgnoresOperationType() throws Exception metaClient.reloadActiveTimeline(); - // getLastCommitMetadataWithValidSchema should NOT find it (filtered by canUpdateSchema) + // getLastCommitMetadataWithValidSchema() should NOT find it (filtered by canUpdateSchema) assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(), "canUpdateSchema filter should exclude clustering"); - // getLastCommitMetadataWithSchema SHOULD find it (no operation type filter) - assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().isPresent(), - "getLastCommitMetadataWithSchema should find schema in clustering commit"); + // getLastCommitMetadataWithValidSchema(false) SHOULD find it (no operation type filter) + assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "getLastCommitMetadataWithValidSchema(false) should find schema in clustering commit"); assertEquals(schemaStr, - metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().get().getRight() + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight() .getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); } @@ -701,7 +701,7 @@ void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Except metaClient.reloadActiveTimeline(); - assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithSchema().isPresent(), + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), "Should return empty when no commits have schema"); } From a3e67f4ddfac255b0f4770b263260deee49b8f8a Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 12:27:26 -0700 Subject: [PATCH 06/11] remove uneeded methods --- .../common/table/timeline/TimelineUtils.java | 22 ++++------- .../hudi/common/table/TestTimelineUtils.java | 37 ------------------- ...ointFromAnotherHoodieTimelineProvider.java | 3 ++ 3 files changed, 11 insertions(+), 51 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 9d8c30219ce7c..cfc1520b23933 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -233,30 +233,24 @@ public static List getAffectedPartitions(HoodieTimeline timeline) { } /** - * Get extra metadata for specified key from the most recent commit/deltacommit/replacecommit - * (excluding clustering) that actually contains the key. Scans backwards through the timeline - * so that if the latest commit doesn't carry the key, older commits are still checked. + * Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant. */ public static Option getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) { return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + // exclude clustering commits for returning user stored extra metadata .filter(instant -> !isClusteringCommit(metaClient, instant)) - .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant)) - .filter(Option::isPresent) - .findFirst() - .orElse(Option.empty()); + .findFirst().map(instant -> + getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); } /** - * Get extra metadata for specified key from the most recent commit/deltacommit/replacecommit - * instant (including clustering) that actually contains the key. Scans backwards through the - * timeline so that if the latest commit doesn't carry the key, older commits are still checked. + * Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits + * such as clustering. */ public static Option getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) { return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() - .map(instant -> getMetadataValue(metaClient, extraMetadataKey, instant)) - .filter(Option::isPresent) - .findFirst() - .orElse(Option.empty()); + .findFirst().map(instant -> + getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); } /** diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index be072898c2281..8e7bd480e62a4 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -705,41 +705,4 @@ void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Except "Should return empty when no commits have schema"); } - @Test - void testGetExtraMetadataScansMultipleInstants() throws Exception { - String extraMetadataKey = "checkpoint_key"; - String extraMetadataValue = "kafka:topic:0:100"; - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - // First commit: no checkpoint key - HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", - InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); - activeTimeline.createNewInstant(instant1); - activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, Collections.emptyMap())); - - // Second commit: has the checkpoint key - Map withCheckpoint = new HashMap<>(); - withCheckpoint.put(extraMetadataKey, extraMetadataValue); - HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", - InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); - activeTimeline.createNewInstant(instant2); - activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, withCheckpoint)); - - // Third commit: no checkpoint key (this is the latest) - HoodieInstant instant3 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "3", - InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); - activeTimeline.createNewInstant(instant3); - activeTimeline.saveAsComplete(instant3, getCommitMetadata(basePath, "p1", "3", 2, Collections.emptyMap())); - - metaClient.reloadActiveTimeline(); - - // Should scan past instant3 and find the key in instant2 - Option result = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey); - assertTrue(result.isPresent(), "Should find metadata by scanning past latest commit"); - assertEquals(extraMetadataValue, result.get()); - - Option resultIncluding = TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, extraMetadataKey); - assertTrue(resultIncluding.isPresent(), "Should find metadata by scanning past latest commit (include clustering)"); - assertEquals(extraMetadataValue, resultIncluding.get()); - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 194d1032db827..9010bbd2846dd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -64,10 +64,13 @@ public String getCheckpoint() throws HoodieException { anotherDsHoodieMetaClient.getActiveTimeline().readCommitMetadata(instant); return CheckpointUtils.getCheckpoint(commitMetadata).getCheckpointKey(); } catch (HoodieException e) { + // No checkpoint found in this commit, skip to older instants return null; } catch (IOException e) { throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); } + // Filter out null (from HoodieException) and empty strings (from commits + // that don't have checkpoint metadata, e.g. when rollover is not configured) }).filter(key -> !StringUtils.isNullOrEmpty(key)).findFirst() .orElseThrow(() -> new HoodieException("Unable to find checkpoint in source table at: " + path + ". This table may not have been created with checkpoint tracking enabled.")); From c7dfd4484c2d6172744ebdad71e2cfbf086935b8 Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 12:32:50 -0700 Subject: [PATCH 07/11] remove uneeded methods --- .../InitialCheckpointFromAnotherHoodieTimelineProvider.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 9010bbd2846dd..462839ebcb4b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -62,9 +62,10 @@ public String getCheckpoint() throws HoodieException { try { HoodieCommitMetadata commitMetadata = anotherDsHoodieMetaClient.getActiveTimeline().readCommitMetadata(instant); + // Use CheckpointUtils to handle both V1 and V2 checkpoint keys return CheckpointUtils.getCheckpoint(commitMetadata).getCheckpointKey(); } catch (HoodieException e) { - // No checkpoint found in this commit, skip to older instants + // No checkpoint found in this commit return null; } catch (IOException e) { throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); From 1c8b339e879b89b74320b5ce4f48dd06136df08f Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 12:35:13 -0700 Subject: [PATCH 08/11] remove uneeded methods --- .../functional/TestHoodieClientOnCopyOnWriteStorage.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 2d605346ea0c3..2a4fa731e04bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -2032,8 +2032,12 @@ public void testRollingMetadataPreservedAcrossClusteringAfterArchival() throws E "Clustering plan should be created (round " + round + ")"); clusterWriter.cluster(clusteringInstant.get()); - for (int i = 0; i < 3; i++) { - insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + // Only insert after the first round so that the second clustering instant + // remains on the active timeline after archival + if (round < 1) { + for (int i = 0; i < 3; i++) { + insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + } } } From 71a919b728d1edd72d12bd3ac61ade08139d91f6 Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 15:18:17 -0700 Subject: [PATCH 09/11] address --- .../hudi/common/table/timeline/HoodieActiveTimeline.java | 4 ++-- .../common/table/timeline/versioning/v1/ActiveTimelineV1.java | 4 ++-- .../common/table/timeline/versioning/v2/ActiveTimelineV2.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 70cc21c835c39..012ffdec8c1c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -154,11 +154,11 @@ public interface HoodieActiveTimeline extends HoodieTimeline { /** * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}. * - * @param filterByCanUpdateSchema if true, only considers commits where + * @param filterForSchemaMutableOperations if true, only considers commits where * {@link WriteOperationType#canUpdateSchema} is true (original behavior). * If false, considers any commit type with a non-empty schema. */ - Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema); + Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations); /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java index a4c9434a52ec0..c670404ac7694 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java @@ -280,11 +280,11 @@ public Option> getLastCommitMetadataWi } @Override - public Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema) { + public Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - (!filterByCanUpdateSchema || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) + (!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst() ); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index c8080302a422d..69e36596e48da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -295,11 +295,11 @@ public Option> getLastCommitMetadataWi } @Override - public Option> getLastCommitMetadataWithValidSchema(boolean filterByCanUpdateSchema) { + public Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - (!filterByCanUpdateSchema || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) + (!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst()); } From 59bbf7afd60de5bf203a02fb968c4150b71dc57d Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 15:49:01 -0700 Subject: [PATCH 10/11] UT coverage --- .../hudi/client/TestRollingMetadata.java | 53 +++++++ .../hudi/common/table/TestTimelineUtils.java | 45 ++++++ ...ointFromAnotherHoodieTimelineProvider.java | 134 ++++++++++++++++++ 3 files changed, 232 insertions(+) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java index d123dcd86fd6c..0afccea84d56a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java @@ -574,4 +574,57 @@ public void testRollingMetadataWithEmptyAndWhitespaceKeys() throws IOException { client.close(); } + + /** + * Test that an empty-string value for a rolling metadata key is treated as missing, + * so the walkback still finds the most recent non-empty value. + */ + @Test + public void testRollingMetadataEmptyStringTreatedAsMissing() throws IOException { + HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), new Properties()); + HoodieWriteConfig config = getConfigBuilder(true) + .withPath(metaClient.getBasePath()) + .withRollingMetadataKeys("checkpoint.offset") + .build(); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + SparkRDDWriteClient client = getHoodieWriteClient(config); + + // First commit with valid rolling metadata value + String instant1 = client.createNewInstantTime(false); + List records1 = dataGen.generateInserts(instant1, 10); + JavaRDD writeRecords1 = jsc().parallelize(records1, 2); + + WriteClientTestUtils.startCommitWithTime(client, instant1); + List writeStatuses1 = client.insert(writeRecords1, instant1).collect(); + assertNoWriteErrors(writeStatuses1); + + Map extraMetadata1 = new HashMap<>(); + extraMetadata1.put("checkpoint.offset", "1000"); + client.commitStats(instant1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.of(extraMetadata1), metaClient.getCommitActionType()); + + // Second commit explicitly sets the rolling key to empty string + String instant2 = client.createNewInstantTime(false); + List records2 = dataGen.generateInserts(instant2, 10); + JavaRDD writeRecords2 = jsc().parallelize(records2, 2); + + WriteClientTestUtils.startCommitWithTime(client, instant2); + List writeStatuses2 = client.insert(writeRecords2, instant2).collect(); + assertNoWriteErrors(writeStatuses2); + + Map extraMetadata2 = new HashMap<>(); + extraMetadata2.put("checkpoint.offset", ""); + client.commitStats(instant2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.of(extraMetadata2), metaClient.getCommitActionType()); + + // The empty string should be treated as missing, so walkback finds "1000" from commit1 + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieInstant commit2 = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, metaClient.getActiveTimeline()); + assertEquals("1000", metadata2.getMetadata("checkpoint.offset"), + "Empty string value should be treated as missing; walkback should find the previous non-empty value"); + + client.close(); + } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 8e7bd480e62a4..9472cc7cc3cd7 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1; import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2; import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; @@ -705,4 +706,48 @@ void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Except "Should return empty when no commits have schema"); } + @Test + void testGetLastCommitMetadataWithSchemaIgnoresOperationType_V1() throws Exception { + cleanMetaClient(); + initMetaClient(true); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); + HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, REPLACE_COMMIT_ACTION, "1", + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(clusterInstant); + activeTimeline.transitionClusterInflightToComplete(true, clusterInstant, + getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER)); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(), + "canUpdateSchema filter should exclude clustering in V1 timeline"); + + assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "getLastCommitMetadataWithValidSchema(false) should find schema in V1 clustering commit"); + assertEquals(schemaStr, + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight() + .getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + } + + @Test + void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema_V1() throws Exception { + cleanMetaClient(); + initMetaClient(true); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap())); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "Should return empty when no V1 commits have schema"); + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java new file mode 100644 index 0000000000000..a9ef19ef58924 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.checkpointing; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.exception.HoodieException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link InitialCheckpointFromAnotherHoodieTimelineProvider}. + */ +class TestInitialCheckpointFromAnotherHoodieTimelineProvider extends HoodieCommonTestHarness { + + @BeforeEach + void setUp() throws Exception { + initMetaClient(); + } + + @AfterEach + void tearDown() throws Exception { + cleanMetaClient(); + } + + private InitialCheckpointFromAnotherHoodieTimelineProvider createProvider() { + TypedProperties props = new TypedProperties(); + props.put("hoodie.streamer.checkpoint.provider.path", basePath); + InitialCheckpointFromAnotherHoodieTimelineProvider provider = + new InitialCheckpointFromAnotherHoodieTimelineProvider(props); + provider.init(HoodieTestUtils.getDefaultStorageConf().unwrap()); + return provider; + } + + @Test + void testGetCheckpointFromCommitWithValidCheckpoint() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + Map extraMetadata = new HashMap<>(); + extraMetadata.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:100"); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "p1", "1", 2, extraMetadata)); + + assertEquals("topic:100", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointSkipsCommitsWithoutCheckpoint() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant1); + Map metadata1 = new HashMap<>(); + metadata1.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:50"); + activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, metadata1)); + + HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant2); + activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, Collections.emptyMap())); + + assertEquals("topic:50", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointSkipsEmptyCheckpointStrings() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant1); + Map metadata1 = new HashMap<>(); + metadata1.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:50"); + activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, metadata1)); + + // Newer commit has a reset key but empty checkpoint key — getCheckpointKey() returns "" + HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant2); + Map metadata2 = new HashMap<>(); + metadata2.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, ""); + metadata2.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1, "reset-value"); + activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, metadata2)); + + assertEquals("topic:50", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointThrowsWhenNoCheckpointExists() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "p1", "1", 2, Collections.emptyMap())); + + assertThrows(HoodieException.class, () -> createProvider().getCheckpoint()); + } +} From 4b28bedc56e0721baae7f04d9c4bc63cd55476b4 Mon Sep 17 00:00:00 2001 From: Krishen Bhan <“bkrishen@uber.com”> Date: Fri, 24 Apr 2026 15:52:11 -0700 Subject: [PATCH 11/11] UT coverage --- .../java/org/apache/hudi/common/table/TableSchemaResolver.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 29ce50ef88dec..ee2cfbf0d362c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -380,8 +380,9 @@ public boolean hasOperationField() { private Option> getLatestCommitMetadataWithValidSchema() { if (latestCommitWithValidSchema == null) { + boolean filterForSchemaMutableOperations = false; Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false); + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(filterForSchemaMutableOperations); if (instantAndCommitMetadata.isPresent()) { HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight();