From 4ed84b216db335521bd4c51169c40e63dc86f27d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 28 Mar 2022 11:54:31 -0700 Subject: [PATCH] [HUDI-3720] Fix the logic of reattempting pending rollback (#5148) --- .../hudi/client/BaseHoodieWriteClient.java | 20 ++++++--- .../hudi/client/TestClientRollback.java | 45 ++++++++++++++----- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index a6a7e18b1f6a..5eb8e270a143 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -672,14 +672,24 @@ public boolean rollback(final String commitInstantTime, Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); - if (commitInstantOpt.isPresent()) { - LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); + if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) { + LOG.info(String.format("Scheduling Rollback at instant time : %s " + + "(exists in active timeline: %s), with rollback plan: %s", + rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent())); Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); if (rollbackPlanOption.isPresent()) { - // execute rollback - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, - skipLocking); + // There can be a case where the inflight rollback failed after the instant files + // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is + // not present in the timeline. In such a case, the hoodie instant instance + // is reconstructed to allow the rollback to be reattempted, and the deleteInstants + // is set to false since they are already deleted. + // Execute rollback + HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() + ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) + : table.rollback(context, rollbackInstantTime, new HoodieInstant( + true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), + false, skipLocking); if (timerContext != null) { long durationInMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 3b5393527fd7..c06b0a0621c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -48,6 +49,9 @@ import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; @@ -55,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -269,11 +274,19 @@ public void testRollbackCommit() throws Exception { } } + private static Stream testFailedRollbackCommitParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, {true, false}, {false, true}, {false, false}, + }).map(Arguments::of); + } + /** * Test Cases for effects of rollbacking completed/inflight commits. */ - @Test - public void testFailedRollbackCommit() throws Exception { + @ParameterizedTest + @MethodSource("testFailedRollbackCommitParams") + public void testFailedRollbackCommit( + boolean enableMetadataTable, boolean instantToRollbackExists) throws Exception { // Let's create some commit files and base files final String p1 = "2016/05/01"; final String p2 = "2016/05/02"; @@ -302,21 +315,27 @@ public void testFailedRollbackCommit() throws Exception { put(p3, "id33"); } }; - HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .withPartitionMetaFiles(p1, p2, p3) - .addCommit(commitTime1) - .withBaseFilesInPartitions(partitionAndFileId1) - .addCommit(commitTime2) - .withBaseFilesInPartitions(partitionAndFileId2) - .addInflightCommit(commitTime3) - .withBaseFilesInPartitions(partitionAndFileId3); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withRollbackUsingMarkers(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + HoodieTestTable testTable = enableMetadataTable + ? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create( + metaClient.getHadoopConf(), config, context)) + : HoodieTestTable.of(metaClient); + + testTable.withPartitionMetaFiles(p1, p2, p3) + .addCommit(commitTime1) + .withBaseFilesInPartitions(partitionAndFileId1) + .addCommit(commitTime2) + .withBaseFilesInPartitions(partitionAndFileId2) + .addInflightCommit(commitTime3) + .withBaseFilesInPartitions(partitionAndFileId3); + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { // Rollback commit3 @@ -333,8 +352,10 @@ public void testFailedRollbackCommit() throws Exception { // delete rollback completed meta file and retry rollback. FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp()); - // recreate actual commit files so that we can retry the rollback - testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + if (instantToRollbackExists) { + // recreate actual commit files if needed + testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + } // retry rolling back the commit again. client.rollback(commitTime3);