Skip to content

Commit

Permalink
[HUDI-3720] Fix the logic of reattempting pending rollback (#5148)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Mar 28, 2022
1 parent 2e2d08c commit 4ed84b2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
Expand Up @@ -672,14 +672,24 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
LOG.info(String.format("Scheduling Rollback at instant time : %s "
+ "(exists in active timeline: %s), with rollback plan: %s",
rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
skipLocking);
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
// not present in the timeline. In such a case, the hoodie instant instance
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
: table.rollback(context, rollbackInstantTime, new HoodieInstant(
true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
Expand Down Expand Up @@ -48,13 +49,17 @@

import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -269,11 +274,19 @@ public void testRollbackCommit() throws Exception {
}
}

private static Stream<Arguments> testFailedRollbackCommitParams() {
return Arrays.stream(new Boolean[][] {
{true, true}, {true, false}, {false, true}, {false, false},
}).map(Arguments::of);
}

/**
* Test Cases for effects of rollbacking completed/inflight commits.
*/
@Test
public void testFailedRollbackCommit() throws Exception {
@ParameterizedTest
@MethodSource("testFailedRollbackCommitParams")
public void testFailedRollbackCommit(
boolean enableMetadataTable, boolean instantToRollbackExists) throws Exception {
// Let's create some commit files and base files
final String p1 = "2016/05/01";
final String p2 = "2016/05/02";
Expand Down Expand Up @@ -302,21 +315,27 @@ public void testFailedRollbackCommit() throws Exception {
put(p3, "id33");
}
};
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

HoodieTestTable testTable = enableMetadataTable
? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
metaClient.getHadoopConf(), config, context))
: HoodieTestTable.of(metaClient);

testTable.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {

// Rollback commit3
Expand All @@ -333,8 +352,10 @@ public void testFailedRollbackCommit() throws Exception {
// delete rollback completed meta file and retry rollback.
FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp());

// recreate actual commit files so that we can retry the rollback
testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3);
if (instantToRollbackExists) {
// recreate actual commit files if needed
testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3);
}

// retry rolling back the commit again.
client.rollback(commitTime3);
Expand Down

0 comments on commit 4ed84b2

Please sign in to comment.