diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index e5fd93d56a601..842dc38177a59 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -353,9 +353,9 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta Map foundRollingMetadata = new HashMap<>(); Set remainingKeys = new HashSet<>(rollingKeys); - // Remove keys that are already present in current commit (current values take precedence) + // Remove keys that are already present with non-empty values in current commit (current values take precedence) for (String key : rollingKeys) { - if (existingExtraMetadata.containsKey(key)) { + if (existingExtraMetadata.containsKey(key) && !StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) { remainingKeys.remove(key); } } @@ -387,7 +387,7 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta // Check for remaining keys in this commit for (String key : new HashSet<>(remainingKeys)) { String value = commitMetadata.getMetadata(key); - if (value != null) { + if (!StringUtils.isNullOrEmpty(value)) { foundRollingMetadata.put(key, value); remainingKeys.remove(key); log.debug("Found rolling metadata key '{}' in commit {} with value: {}", diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java index d123dcd86fd6c..0afccea84d56a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java @@ -574,4 +574,57 @@ public void testRollingMetadataWithEmptyAndWhitespaceKeys() throws IOException { client.close(); } + + /** + * Test that an empty-string value for a rolling metadata key is treated as missing, + * so the walkback still finds the most recent non-empty value. + */ + @Test + public void testRollingMetadataEmptyStringTreatedAsMissing() throws IOException { + HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), new Properties()); + HoodieWriteConfig config = getConfigBuilder(true) + .withPath(metaClient.getBasePath()) + .withRollingMetadataKeys("checkpoint.offset") + .build(); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + SparkRDDWriteClient client = getHoodieWriteClient(config); + + // First commit with valid rolling metadata value + String instant1 = client.createNewInstantTime(false); + List records1 = dataGen.generateInserts(instant1, 10); + JavaRDD writeRecords1 = jsc().parallelize(records1, 2); + + WriteClientTestUtils.startCommitWithTime(client, instant1); + List writeStatuses1 = client.insert(writeRecords1, instant1).collect(); + assertNoWriteErrors(writeStatuses1); + + Map extraMetadata1 = new HashMap<>(); + extraMetadata1.put("checkpoint.offset", "1000"); + client.commitStats(instant1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.of(extraMetadata1), metaClient.getCommitActionType()); + + // Second commit explicitly sets the rolling key to empty string + String instant2 = client.createNewInstantTime(false); + List records2 = dataGen.generateInserts(instant2, 10); + JavaRDD writeRecords2 = jsc().parallelize(records2, 2); + + WriteClientTestUtils.startCommitWithTime(client, instant2); + List writeStatuses2 = client.insert(writeRecords2, instant2).collect(); + assertNoWriteErrors(writeStatuses2); + + Map extraMetadata2 = new HashMap<>(); + extraMetadata2.put("checkpoint.offset", ""); + client.commitStats(instant2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.of(extraMetadata2), metaClient.getCommitActionType()); + + // The empty string should be treated as missing, so walkback finds "1000" from commit1 + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieInstant commit2 = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, metaClient.getActiveTimeline()); + assertEquals("1000", metadata2.getMetadata("checkpoint.offset"), + "Empty string value should be treated as missing; walkback should find the previous non-empty value"); + + client.close(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 095a4ffef7170..ee2cfbf0d362c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -380,8 +380,9 @@ public boolean hasOperationField() { private Option> getLatestCommitMetadataWithValidSchema() { if (latestCommitWithValidSchema == null) { + boolean filterForSchemaMutableOperations = false; Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(filterForSchemaMutableOperations); if (instantAndCommitMetadata.isPresent()) { HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 7d65b0aa5f609..012ffdec8c1c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -146,10 +146,20 @@ public interface HoodieActiveTimeline extends HoodieTimeline { void deleteInstantFileIfExists(HoodieInstant instant); /** - * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata} + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}, + * restricted to operations where {@link WriteOperationType#canUpdateSchema} is true. */ Option> getLastCommitMetadataWithValidSchema(); + /** + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}. + * + * @param filterForSchemaMutableOperations if true, only considers commits where + * {@link WriteOperationType#canUpdateSchema} is true (original behavior). + * If false, considers any commit type with a non-empty schema. + */ + Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations); + /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java index e69561fafb2a8..c670404ac7694 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java @@ -276,10 +276,15 @@ public HoodieInstantReader getInstantReader() { @Override public Option> getLastCommitMetadataWithValidSchema() { + return getLastCommitMetadataWithValidSchema(true); + } + + @Override + public Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()) + (!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst() ); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java index 90352bed48b51..69e36596e48da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java @@ -291,10 +291,15 @@ public HoodieInstantReader getInstantReader() { @Override public Option> getLastCommitMetadataWithValidSchema() { + return getLastCommitMetadataWithValidSchema(true); + } + + @Override + public Option> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) { return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()) + (!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())) && !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) .findFirst()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 1175913420b81..9472cc7cc3cd7 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1; import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2; import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; @@ -662,4 +663,91 @@ void testGetDroppedPartitions() throws Exception { droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty()); assertTrue(droppedPartitions.isEmpty()); } + + @Test + void testGetLastCommitMetadataWithSchemaIgnoresOperationType() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); + HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, CLUSTERING_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(clusterInstant); + activeTimeline.transitionClusterInflightToComplete(true, clusterInstant, + getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER)); + + metaClient.reloadActiveTimeline(); + + // getLastCommitMetadataWithValidSchema() should NOT find it (filtered by canUpdateSchema) + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(), + "canUpdateSchema filter should exclude clustering"); + + // getLastCommitMetadataWithValidSchema(false) SHOULD find it (no operation type filter) + assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "getLastCommitMetadataWithValidSchema(false) should find schema in clustering commit"); + assertEquals(schemaStr, + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight() + .getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + } + + @Test + void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap())); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "Should return empty when no commits have schema"); + } + + @Test + void testGetLastCommitMetadataWithSchemaIgnoresOperationType_V1() throws Exception { + cleanMetaClient(); + initMetaClient(true); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}"; + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr); + HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, REPLACE_COMMIT_ACTION, "1", + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(clusterInstant); + activeTimeline.transitionClusterInflightToComplete(true, clusterInstant, + getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER)); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(), + "canUpdateSchema filter should exclude clustering in V1 timeline"); + + assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "getLastCommitMetadataWithValidSchema(false) should find schema in V1 clustering commit"); + assertEquals(schemaStr, + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight() + .getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + } + + @Test + void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema_V1() throws Exception { + cleanMetaClient(); + initMetaClient(true); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap())); + + metaClient.reloadActiveTimeline(); + + assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(), + "Should return empty when no V1 commits have schema"); + } + } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 6f873d2d5059d..2a4fa731e04bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -55,8 +55,10 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.InstantGenerator; import org.apache.hudi.common.table.timeline.TimelineFactory; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -1992,6 +1994,78 @@ protected void autoCleanOnCommit() { } } + @Test + public void testRollingMetadataPreservedAcrossClusteringAfterArchival() throws Exception { + String schemaKey = HoodieCommitMetadata.SCHEMA_KEY; + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH}); + + HoodieWriteConfig writeConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(0).build()) + .withRollingMetadataKeys(schemaKey) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false).build()) + .build(); + + SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); + + // Insert multiple batches to create file groups for clustering + for (int i = 0; i < 5; i++) { + insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + } + + HoodieWriteConfig clusterConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA) + .withClusteringConfig(createClusteringBuilder(true, 1).build()) + .withRollingMetadataKeys(schemaKey) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAutoClean(false).build()) + .build(); + + for (int round = 0; round < 2; round++) { + SparkRDDWriteClient clusterWriter = getHoodieWriteClient(clusterConfig); + Option clusteringInstant = clusterWriter.scheduleClustering(Option.empty()); + assertTrue(clusteringInstant.isPresent(), + "Clustering plan should be created (round " + round + ")"); + clusterWriter.cluster(clusteringInstant.get()); + + // Only insert after the first round so that the second clustering instant + // remains on the active timeline after archival + if (round < 1) { + for (int i = 0; i < 3; i++) { + insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA); + } + } + } + + client.archive(); + + HoodieTableMetaClient freshMeta = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline completedTimeline = freshMeta.getActiveTimeline() + .getCommitsTimeline().filterCompletedInstants(); + + boolean foundSchemaInClustering = false; + for (HoodieInstant instant : completedTimeline.getInstants()) { + HoodieCommitMetadata metadata = completedTimeline.readCommitMetadata(instant); + if (metadata.getOperationType() == WriteOperationType.CLUSTER) { + String schema = metadata.getMetadata(schemaKey); + if (schema != null && !schema.isEmpty()) { + foundSchemaInClustering = true; + break; + } + } + } + assertTrue(foundSchemaInClustering, + "Schema should be rolled over into clustering commits via rolling metadata"); + + TableSchemaResolver resolver = new TableSchemaResolver(freshMeta); + assertTrue(resolver.getTableSchemaIfPresent(false).isPresent(), + "TableSchemaResolver should find schema even with clustering-only timeline"); + } + /** * Disabling row writer here as clustering tests will throw the error below if it is used. * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 230e1a1189bef..462839ebcb4b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -22,13 +22,14 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.checkpoint.CheckpointUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hadoop.conf.Configuration; import java.io.IOException; -import java.util.Objects; /** * This is used to set a checkpoint from latest commit of another (mirror) hudi dataset. @@ -52,7 +53,11 @@ public void init(Configuration config) throws HoodieException { @Override public String getCheckpoint() throws HoodieException { - return anotherDsHoodieMetaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + // Use getWriteTimeline() to include compaction/logcompaction in addition to + // commit/deltacommit/replacecommit, so checkpoint metadata rolled into any + // non-ingestion commit type is discoverable after archival. + return anotherDsHoodieMetaClient.getActiveTimeline().getWriteTimeline() + .filterCompletedInstants().getReverseOrderedInstants() .map(instant -> { try { HoodieCommitMetadata commitMetadata = @@ -63,9 +68,11 @@ public String getCheckpoint() throws HoodieException { // No checkpoint found in this commit return null; } catch (IOException e) { - return null; + throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e); } - }).filter(Objects::nonNull).findFirst() + // Filter out null (from HoodieException) and empty strings (from commits + // that don't have checkpoint metadata, e.g. when rollover is not configured) + }).filter(key -> !StringUtils.isNullOrEmpty(key)).findFirst() .orElseThrow(() -> new HoodieException("Unable to find checkpoint in source table at: " + path + ". This table may not have been created with checkpoint tracking enabled.")); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java new file mode 100644 index 0000000000000..a9ef19ef58924 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestInitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.checkpointing; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.exception.HoodieException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link InitialCheckpointFromAnotherHoodieTimelineProvider}. + */ +class TestInitialCheckpointFromAnotherHoodieTimelineProvider extends HoodieCommonTestHarness { + + @BeforeEach + void setUp() throws Exception { + initMetaClient(); + } + + @AfterEach + void tearDown() throws Exception { + cleanMetaClient(); + } + + private InitialCheckpointFromAnotherHoodieTimelineProvider createProvider() { + TypedProperties props = new TypedProperties(); + props.put("hoodie.streamer.checkpoint.provider.path", basePath); + InitialCheckpointFromAnotherHoodieTimelineProvider provider = + new InitialCheckpointFromAnotherHoodieTimelineProvider(props); + provider.init(HoodieTestUtils.getDefaultStorageConf().unwrap()); + return provider; + } + + @Test + void testGetCheckpointFromCommitWithValidCheckpoint() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + Map extraMetadata = new HashMap<>(); + extraMetadata.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:100"); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "p1", "1", 2, extraMetadata)); + + assertEquals("topic:100", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointSkipsCommitsWithoutCheckpoint() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant1); + Map metadata1 = new HashMap<>(); + metadata1.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:50"); + activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, metadata1)); + + HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant2); + activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, Collections.emptyMap())); + + assertEquals("topic:50", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointSkipsEmptyCheckpointStrings() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant1); + Map metadata1 = new HashMap<>(); + metadata1.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, "topic:50"); + activeTimeline.saveAsComplete(instant1, getCommitMetadata(basePath, "p1", "1", 2, metadata1)); + + // Newer commit has a reset key but empty checkpoint key — getCheckpointKey() returns "" + HoodieInstant instant2 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "2", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant2); + Map metadata2 = new HashMap<>(); + metadata2.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, ""); + metadata2.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1, "reset-value"); + activeTimeline.saveAsComplete(instant2, getCommitMetadata(basePath, "p1", "2", 2, metadata2)); + + assertEquals("topic:50", createProvider().getCheckpoint()); + } + + @Test + void testGetCheckpointThrowsWhenNoCheckpointExists() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1", + InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "p1", "1", 2, Collections.emptyMap())); + + assertThrows(HoodieException.class, () -> createProvider().getCheckpoint()); + } +}