diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 26282affd1771..1df2e451bdbac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); - Option rollbackPlanOption; - String rollbackInstantTime; - if (pendingRollbackInfo.isPresent()) { + Option rollbackPlanOption = Option.empty(); + Option rollbackInstantTimeOpt; + if (!config.isExclusiveRollbackEnabled() && pendingRollbackInfo.isPresent()) { + // Only case when lock can be skipped is if exclusive rollback is disabled and + // there is a pending rollback info available rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan()); - rollbackInstantTime = pendingRollbackInfo.get().getRollbackInstant().requestedTime(); + rollbackInstantTimeOpt = Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime()); } else { - if (commitInstantOpt.isEmpty()) { - log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); - return false; - } if (!skipLocking) { txnManager.beginStateChange(Option.empty(), Option.empty()); } try { - rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false)); - rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); + if (config.isExclusiveRollbackEnabled()) { + // Reload meta client within the lock so that the timeline is latest while executing pending rollback + table.getMetaClient().reloadActiveTimeline(); + Option pendingRollbackOpt = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); + rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> info.getRollbackInstant().requestedTime()); + if (pendingRollbackOpt.isPresent()) { + // If pending rollback and heartbeat is expired, writer should start heartbeat and execute rollback + if (heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) { + LOG.info("Heartbeat expired for rollback instant {}, executing rollback now", rollbackInstantTimeOpt); + HeartbeatUtils.deleteHeartbeatFile(storage, basePath, rollbackInstantTimeOpt.get(), config); + heartbeatClient.start(rollbackInstantTimeOpt.get()); + rollbackPlanOption = pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan); + } else { + // Heartbeat is still active for another writer, ignore rollback for now + // TODO: ABCDEFGHI revisit return value + return false; + } + } else if (Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() + .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) + .findFirst()).isEmpty()) { + // Assume rollback is already executed since the commit is no longer present in the timeline + return false; + } + } else { + // Case where no pending rollback is present, + if (commitInstantOpt.isEmpty()) { + log.error("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath()); + return false; + } + rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> Option.of(createNewInstantTime(false))); + if (config.isExclusiveRollbackEnabled()) { + heartbeatClient.start(rollbackInstantTimeOpt.get()); + } + rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false); + } } finally { if (!skipLocking) { txnManager.endStateChange(Option.empty()); @@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime, Option ENABLE_EXCLUSIVE_ROLLBACK = ConfigProperty + .key("hoodie.rollback.enforce.single.rollback.instant") + .defaultValue("false") + .markAdvanced() + .withDocumentation("Enables exclusive rollback so that rollback plan is generated and executed by only one writer at a time"); + public static final ConfigProperty FAIL_JOB_ON_DUPLICATE_DATA_FILE_DETECTION = ConfigProperty .key("hoodie.fail.job.on.duplicate.data.file.detection") .defaultValue("false") @@ -1578,6 +1584,10 @@ public boolean shouldRollbackUsingMarkers() { return getBoolean(ROLLBACK_USING_MARKERS_ENABLE); } + public boolean isExclusiveRollbackEnabled() { + return getBoolean(ENABLE_EXCLUSIVE_ROLLBACK) && getWriteConcurrencyMode().supportsMultiWriter(); + } + public boolean enableComplexKeygenValidation() { return getBoolean(ENABLE_COMPLEX_KEYGEN_VALIDATION); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 82fe9591de270..71507f1a92b6f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieRollbackRequest; +import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -66,6 +67,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b } } + /** + * Test exclusive rollback with multi-writer: when a pending rollback exists with an expired heartbeat + * (no heartbeat file present → returns 0L → always expired), the current writer should take ownership + * and execute the rollback. + */ + @Test + public void testExclusiveRollbackPendingRollbackHeartbeatExpired() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + final String rollbackInstantTime = "20160506040611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // Create a valid pending rollback plan for commitTime3 + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + List rollbackRequestList = partitionAndFileId3.entrySet().stream() + .map(entry -> new HoodieRollbackRequest(entry.getKey(), EMPTY_STRING, EMPTY_STRING, + Collections.singletonList( + metaClient.getBasePath() + "/" + entry.getKey() + "/" + + FileCreateUtilsLegacy.baseFileName(commitTime3, entry.getValue())), + Collections.emptyMap())) + .collect(Collectors.toList()); + rollbackPlan.setRollbackRequests(rollbackRequestList); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); + FileCreateUtilsLegacy.createRequestedRollbackFile(metaClient.getBasePath().toString(), rollbackInstantTime, rollbackPlan); + // No heartbeat file → getLastHeartbeatTime returns 0L → heartbeat is always expired + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + assertTrue(result, "Rollback should execute when pending rollback heartbeat is expired"); + + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + // Verify the pending rollback instant was reused and completed + metaClient.reloadActiveTimeline(); + List rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants(); + assertEquals(1, rollbackInstants.size()); + assertTrue(rollbackInstants.get(0).isCompleted()); + assertEquals(rollbackInstantTime, rollbackInstants.get(0).requestedTime()); + + // Verify heartbeat was cleaned up after rollback completion + assertFalse(HoodieHeartbeatClient.heartbeatExists(storage, basePath, rollbackInstantTime)); + } + } + + /** + * Test exclusive rollback with multi-writer: when a pending rollback exists with an active heartbeat + * (another writer is currently executing the rollback), the current writer should skip it and return false. + */ + @Test + public void testExclusiveRollbackPendingRollbackHeartbeatActive() throws Exception { + final String p1 = "2016/05/01"; + final String p2 = "2016/05/02"; + final String commitTime1 = "20160501010101"; + final String commitTime2 = "20160502020601"; + final String commitTime3 = "20160506030611"; + final String rollbackInstantTime = "20160506040611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1).getLeft() + .addCommit(commitTime2).withBaseFilesInPartitions(partitionAndFileId2).getLeft() + .addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + + // Create a pending rollback plan for commitTime3 + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + rollbackPlan.setRollbackRequests(Collections.emptyList()); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); + FileCreateUtilsLegacy.createRequestedRollbackFile(metaClient.getBasePath().toString(), rollbackInstantTime, rollbackPlan); + + // Simulate an active heartbeat by another writer for the rollback instant + try (HoodieHeartbeatClient otherWriterHeartbeat = new HoodieHeartbeatClient( + storage, basePath, config.getHoodieClientHeartbeatIntervalInMs(), + config.getHoodieClientHeartbeatTolerableMisses())) { + otherWriterHeartbeat.start(rollbackInstantTime); + // The heartbeat file is fresh → isHeartbeatExpired returns false + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(commitTime3); + assertFalse(result, "Rollback should be skipped when another writer holds an active heartbeat"); + + // Verify the inflight commit and data files are still present + assertTrue(testTable.inflightCommitExists(commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + + // Verify no completed rollback was created + metaClient.reloadActiveTimeline(); + List completedRollbacks = metaClient.getActiveTimeline() + .getRollbackTimeline().filterCompletedInstants().getInstants(); + assertEquals(0, completedRollbacks.size()); + } + } + } + + /** + * Test exclusive rollback with multi-writer: when the commit is no longer in the timeline + * (already rolled back by another writer) and no pending rollback exists, rollback should return false. + */ + @Test + public void testExclusiveRollbackWhenCommitNotInTimeline() throws Exception { + final String p1 = "2016/05/01"; + final String commitTime1 = "20160501010101"; + final String nonExistentCommitTime = "20160506030611"; + + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + } + }; + + HoodieWriteConfig config = buildExclusiveRollbackMultiWriterConfig(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + testTable.withPartitionMetaFiles(p1) + .addCommit(commitTime1).withBaseFilesInPartitions(partitionAndFileId1); + + // nonExistentCommitTime is not in the timeline and no pending rollback exists for it + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + boolean result = client.rollback(nonExistentCommitTime); + assertFalse(result, "Rollback should return false when commit is not in timeline (already rolled back)"); + + // Verify no rollback instant was created + metaClient.reloadActiveTimeline(); + assertTrue(metaClient.getActiveTimeline().getRollbackTimeline().empty()); + // Existing commit should be unaffected + assertTrue(testTable.baseFilesExist(partitionAndFileId1, commitTime1)); + } + } + + private HoodieWriteConfig buildExclusiveRollbackMultiWriterConfig() { + Properties props = new Properties(); + props.setProperty(HoodieWriteConfig.ENABLE_EXCLUSIVE_ROLLBACK.key(), "true"); + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withRollbackUsingMarkers(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(InProcessLockProvider.class) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMetadataIndexColumnStats(false).enable(false).build()) + .withProperties(props) + .build(); + } + @Test public void testFallbackToListingBasedRollbackForCompletedInstant() throws Exception { // Let's create some commit files and base files