Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta
Map<String, String> foundRollingMetadata = new HashMap<>();
Set<String> remainingKeys = new HashSet<>(rollingKeys);

// Remove keys that are already present in current commit (current values take precedence)
// Remove keys that are already present with non-empty values in current commit (current values take precedence)
for (String key : rollingKeys) {
if (existingExtraMetadata.containsKey(key)) {
if (existingExtraMetadata.containsKey(key) && !StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) {
remainingKeys.remove(key);
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata meta
// Check for remaining keys in this commit
for (String key : new HashSet<>(remainingKeys)) {
String value = commitMetadata.getMetadata(key);
if (value != null) {
if (!StringUtils.isNullOrEmpty(value)) {
foundRollingMetadata.put(key, value);
remainingKeys.remove(key);
log.debug("Found rolling metadata key '{}' in commit {} with value: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,57 @@ public void testRollingMetadataWithEmptyAndWhitespaceKeys() throws IOException {

client.close();
}

/**
* Test that an empty-string value for a rolling metadata key is treated as missing,
* so the walkback still finds the most recent non-empty value.
*/
@Test
public void testRollingMetadataEmptyStringTreatedAsMissing() throws IOException {
HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), new Properties());
HoodieWriteConfig config = getConfigBuilder(true)
.withPath(metaClient.getBasePath())
.withRollingMetadataKeys("checkpoint.offset")
.build();

HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
SparkRDDWriteClient client = getHoodieWriteClient(config);

// First commit with valid rolling metadata value
String instant1 = client.createNewInstantTime(false);
List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);

WriteClientTestUtils.startCommitWithTime(client, instant1);
List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, instant1).collect();
assertNoWriteErrors(writeStatuses1);

Map<String, String> extraMetadata1 = new HashMap<>();
extraMetadata1.put("checkpoint.offset", "1000");
client.commitStats(instant1, writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.of(extraMetadata1), metaClient.getCommitActionType());

// Second commit explicitly sets the rolling key to empty string
String instant2 = client.createNewInstantTime(false);
List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);

WriteClientTestUtils.startCommitWithTime(client, instant2);
List<WriteStatus> writeStatuses2 = client.insert(writeRecords2, instant2).collect();
assertNoWriteErrors(writeStatuses2);

Map<String, String> extraMetadata2 = new HashMap<>();
extraMetadata2.put("checkpoint.offset", "");
client.commitStats(instant2, writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.of(extraMetadata2), metaClient.getCommitActionType());

// The empty string should be treated as missing, so walkback finds "1000" from commit1
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieInstant commit2 = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, metaClient.getActiveTimeline());
assertEquals("1000", metadata2.getMetadata("checkpoint.offset"),
"Empty string value should be treated as missing; walkback should find the previous non-empty value");

client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,9 @@ public boolean hasOperationField() {

private Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLatestCommitMetadataWithValidSchema() {
if (latestCommitWithValidSchema == null) {
boolean filterForSchemaMutableOperations = false;
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata =
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema();
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(filterForSchemaMutableOperations);
if (instantAndCommitMetadata.isPresent()) {
HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,20 @@ public interface HoodieActiveTimeline extends HoodieTimeline {
void deleteInstantFileIfExists(HoodieInstant instant);

/**
* Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}
* Returns most recent instant having valid schema in its {@link HoodieCommitMetadata},
* restricted to operations where {@link WriteOperationType#canUpdateSchema} is true.
*/
Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema();

/**
* Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}.
*
* @param filterForSchemaMutableOperations if true, only considers commits where
* {@link WriteOperationType#canUpdateSchema} is true (original behavior).
* If false, considers any commit type with a non-empty schema.
*/
Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations);

/**
* Get the last instant with valid data, and convert this to HoodieCommitMetadata
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,15 @@ public HoodieInstantReader getInstantReader() {

@Override
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema() {
return getLastCommitMetadataWithValidSchema(true);
}

@Override
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) {
return Option.fromJavaOptional(
getCommitMetadataStream()
.filter(instantCommitMetadataPair ->
WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())
(!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()))
&& !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)))
.findFirst()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,15 @@ public HoodieInstantReader getInstantReader() {

@Override
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema() {
return getLastCommitMetadataWithValidSchema(true);
}

@Override
public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWithValidSchema(boolean filterForSchemaMutableOperations) {
return Option.fromJavaOptional(
getCommitMetadataStream()
.filter(instantCommitMetadataPair ->
WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType())
(!filterForSchemaMutableOperations || WriteOperationType.canUpdateSchema(instantCommitMetadataPair.getRight().getOperationType()))
&& !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)))
.findFirst());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
Expand Down Expand Up @@ -662,4 +663,91 @@ void testGetDroppedPartitions() throws Exception {
droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty());
assertTrue(droppedPartitions.isEmpty());
}

@Test
void testGetLastCommitMetadataWithSchemaIgnoresOperationType() throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();

String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}";
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr);
HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, CLUSTERING_ACTION, "1",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
activeTimeline.createNewInstant(clusterInstant);
activeTimeline.transitionClusterInflightToComplete(true, clusterInstant,
getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER));

metaClient.reloadActiveTimeline();

// getLastCommitMetadataWithValidSchema() should NOT find it (filtered by canUpdateSchema)
assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(),
"canUpdateSchema filter should exclude clustering");

// getLastCommitMetadataWithValidSchema(false) SHOULD find it (no operation type filter)
assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(),
"getLastCommitMetadataWithValidSchema(false) should find schema in clustering commit");
assertEquals(schemaStr,
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight()
.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
}

@Test
void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema() throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();

HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap()));

metaClient.reloadActiveTimeline();

assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(),
"Should return empty when no commits have schema");
}

@Test
void testGetLastCommitMetadataWithSchemaIgnoresOperationType_V1() throws Exception {
cleanMetaClient();
initMetaClient(true);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();

String schemaStr = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[]}";
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, schemaStr);
HoodieInstant clusterInstant = new HoodieInstant(INFLIGHT, REPLACE_COMMIT_ACTION, "1",
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
activeTimeline.createNewInstant(clusterInstant);
activeTimeline.transitionClusterInflightToComplete(true, clusterInstant,
getReplaceCommitMetadata(basePath, "1", "p1", 0, "p1", 3, extraMetadata, WriteOperationType.CLUSTER));

metaClient.reloadActiveTimeline();

assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema().isPresent(),
"canUpdateSchema filter should exclude clustering in V1 timeline");

assertTrue(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(),
"getLastCommitMetadataWithValidSchema(false) should find schema in V1 clustering commit");
assertEquals(schemaStr,
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).get().getRight()
.getMetadata(HoodieCommitMetadata.SCHEMA_KEY));
}

@Test
void testGetLastCommitMetadataWithSchemaReturnsEmptyWhenNoSchema_V1() throws Exception {
cleanMetaClient();
initMetaClient(true);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();

HoodieInstant instant = new HoodieInstant(INFLIGHT, COMMIT_ACTION, "1",
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, getCommitMetadata(basePath, "1", "1", 2, Collections.emptyMap()));

metaClient.reloadActiveTimeline();

assertFalse(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(false).isPresent(),
"Should return empty when no V1 commits have schema");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -1992,6 +1994,78 @@ protected void autoCleanOnCommit() {
}
}

@Test
public void testRollingMetadataPreservedAcrossClusteringAfterArchival() throws Exception {
String schemaKey = HoodieCommitMetadata.SCHEMA_KEY;
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH});

HoodieWriteConfig writeConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(0).build())
.withRollingMetadataKeys(schemaKey)
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(false).build())
.build();

SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);

// Insert multiple batches to create file groups for clustering
for (int i = 0; i < 5; i++) {
insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA);
}

HoodieWriteConfig clusterConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
.withClusteringConfig(createClusteringBuilder(true, 1).build())
.withRollingMetadataKeys(schemaKey)
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(false).build())
.build();

for (int round = 0; round < 2; round++) {
SparkRDDWriteClient clusterWriter = getHoodieWriteClient(clusterConfig);
Option<String> clusteringInstant = clusterWriter.scheduleClustering(Option.empty());
assertTrue(clusteringInstant.isPresent(),
"Clustering plan should be created (round " + round + ")");
clusterWriter.cluster(clusteringInstant.get());

// Only insert after the first round so that the second clustering instant
// remains on the active timeline after archival
if (round < 1) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: round < 1 effectively means round == 0 — could you use round == 0 directly? The < 1 form makes a reader pause to ask whether a negative round value is ever possible.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

for (int i = 0; i < 3; i++) {
insertCommitWithSchema(client, dataGen, 20, TRIP_EXAMPLE_SCHEMA);
}
}
}

client.archive();

HoodieTableMetaClient freshMeta = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline completedTimeline = freshMeta.getActiveTimeline()
.getCommitsTimeline().filterCompletedInstants();

boolean foundSchemaInClustering = false;
for (HoodieInstant instant : completedTimeline.getInstants()) {
HoodieCommitMetadata metadata = completedTimeline.readCommitMetadata(instant);
if (metadata.getOperationType() == WriteOperationType.CLUSTER) {
String schema = metadata.getMetadata(schemaKey);
if (schema != null && !schema.isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you use !StringUtils.isNullOrEmpty(schema) here instead of schema != null && !schema.isEmpty()? The rest of this PR consistently uses StringUtils.isNullOrEmpty for the same check.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

foundSchemaInClustering = true;
break;
}
}
}
assertTrue(foundSchemaInClustering,
"Schema should be rolled over into clustering commits via rolling metadata");
Comment thread
kbuci marked this conversation as resolved.

TableSchemaResolver resolver = new TableSchemaResolver(freshMeta);
assertTrue(resolver.getTableSchemaIfPresent(false).isPresent(),
"TableSchemaResolver should find schema even with clustering-only timeline");
}

/**
* Disabling row writer here as clustering tests will throw the error below if it is used.
* java.util.concurrent.CompletionException: java.lang.ClassNotFoundException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.util.Objects;

/**
* This is used to set a checkpoint from latest commit of another (mirror) hudi dataset.
Expand All @@ -52,7 +53,11 @@ public void init(Configuration config) throws HoodieException {

@Override
public String getCheckpoint() throws HoodieException {
return anotherDsHoodieMetaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
// Use getWriteTimeline() to include compaction/logcompaction in addition to
// commit/deltacommit/replacecommit, so checkpoint metadata rolled into any
// non-ingestion commit type is discoverable after archival.
return anotherDsHoodieMetaClient.getActiveTimeline().getWriteTimeline()
.filterCompletedInstants().getReverseOrderedInstants()
.map(instant -> {
try {
HoodieCommitMetadata commitMetadata =
Expand All @@ -63,9 +68,11 @@ public String getCheckpoint() throws HoodieException {
// No checkpoint found in this commit
return null;
} catch (IOException e) {
return null;
throw new HoodieIOException("Failed to read commit metadata for instant " + instant.requestedTime(), e);
}
}).filter(Objects::nonNull).findFirst()
// Filter out null (from HoodieException) and empty strings (from commits
// that don't have checkpoint metadata, e.g. when rollover is not configured)
}).filter(key -> !StringUtils.isNullOrEmpty(key)).findFirst()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the lambda parameter key here is actually a checkpoint value string (e.g. "topic:100"), not a metadata key name — something like checkpoint or checkpointValue might read more accurately.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.orElseThrow(() -> new HoodieException("Unable to find checkpoint in source table at: "
+ path + ". This table may not have been created with checkpoint tracking enabled."));
}
Expand Down
Loading
Loading