From 9d744bb35ce54d347c5ef50adeba3f2a8840d043 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 7 Apr 2022 03:02:34 -0700 Subject: [PATCH] [HUDI-3805] Delete existing corrupted requested rollback plan during rollback (#5245) --- .../hudi/client/BaseHoodieWriteClient.java | 41 +++++-- .../hudi/client/TestClientRollback.java | 106 ++++++++++++++++++ .../common/testutils/FileCreateUtils.java | 4 + 3 files changed, 140 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 70e3cebce4a1..32a8dee51738 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,7 +18,6 @@ package org.apache.hudi.client; -import org.apache.hadoop.fs.Path; import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.HoodieAvroUtils; @@ -72,7 +71,6 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Type; import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier; @@ -82,6 +80,7 @@ import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.metrics.HoodieMetrics; @@ -95,8 +94,9 @@ import org.apache.hudi.table.upgrade.UpgradeDowngrade; import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -105,11 +105,11 @@ import java.text.ParseException; import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.Set; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1113,9 +1113,28 @@ protected Map> getPendingRollbackInfos protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) { List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList()); Map> infoMap = new HashMap<>(); - for (HoodieInstant instant : instants) { + for (HoodieInstant rollbackInstant : instants) { + HoodieRollbackPlan rollbackPlan; + try { + rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant); + } catch (IOException e) { + if (rollbackInstant.isRequested()) { + LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e); + try { + metaClient.getActiveTimeline().deletePending(rollbackInstant); + } catch (HoodieIOException he) { + LOG.warn("Cannot delete " + rollbackInstant, he); + continue; + } + } else { + // Here we assume that if the rollback is inflight, the rollback plan is intact + // in instant.rollback.requested. The exception here can be due to other reasons. + LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e); + } + continue; + } + try { - HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant); String action = rollbackPlan.getInstantToRollback().getAction(); if (ignoreCompactionAndClusteringInstants) { if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) { @@ -1124,14 +1143,14 @@ protected Map> getPendingRollbackInfos rollbackPlan.getInstantToRollback().getCommitTime())).isPresent(); if (!isClustering) { String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime(); - infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); + infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); } } } else { - infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); + infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); } - } catch (IOException e) { - LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e); + } catch (Exception e) { + LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e); } } return infoMap; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index d2dabc0792aa..f6315eec7d21 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -61,9 +62,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -481,4 +484,107 @@ public void testAutoRollbackInflightCommit() throws Exception { assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); } } + + private static Stream testRollbackWithRequestedRollbackPlanParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, {true, false}, {false, true}, {false, false}, + }).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("testRollbackWithRequestedRollbackPlanParams") + public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception { + // Let's create some commit files and base files + final String p1 = "2022/04/05"; + final String p2 = "2022/04/06"; + final String commitTime1 = "20220406010101002"; + final String commitTime2 = "20220406020601002"; + final String commitTime3 = "20220406030611002"; + final String rollbackInstantTime = "20220406040611002"; + Map partitionAndFileId1 = new HashMap() { + { + put(p1, "id11"); + put(p2, "id12"); + } + }; + Map partitionAndFileId2 = new HashMap() { + { + put(p1, "id21"); + put(p2, "id22"); + } + }; + Map partitionAndFileId3 = new HashMap() { + { + put(p1, "id31"); + put(p2, "id32"); + } + }; + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(false) + .withMetadataConfig( + HoodieMetadataConfig.newBuilder() + // Column Stats Index is disabled, since these tests construct tables which are + // not valid (empty commit metadata, invalid parquet files) + .withMetadataIndexColumnStats(false) + .enable(enableMetadataTable) + .build() + ) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + + HoodieTestTable testTable = enableMetadataTable + ? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create( + metaClient.getHadoopConf(), config, context)) + : HoodieTestTable.of(metaClient); + + testTable.withPartitionMetaFiles(p1, p2) + .addCommit(commitTime1) + .withBaseFilesInPartitions(partitionAndFileId1) + .addCommit(commitTime2) + .withBaseFilesInPartitions(partitionAndFileId2) + .addInflightCommit(commitTime3) + .withBaseFilesInPartitions(partitionAndFileId3); + + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + if (isRollbackPlanCorrupted) { + // Add a corrupted requested rollback plan + FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2}); + } else { + // Add a valid requested rollback plan to roll back commitTime3 + HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(); + List rollbackRequestList = partitionAndFileId3.keySet().stream() + .map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, EMPTY_STRING, + Collections.singletonList(metaClient.getBasePath() + "/" + partition + "/" + + FileCreateUtils.baseFileName(commitTime3, partitionAndFileId3.get(p1))), + Collections.emptyMap())) + .collect(Collectors.toList()); + rollbackPlan.setRollbackRequests(rollbackRequestList); + rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION)); + FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, rollbackPlan); + } + + // Rollback commit3 + client.rollback(commitTime3); + assertFalse(testTable.inflightCommitExists(commitTime3)); + assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3)); + assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2)); + + metaClient.reloadActiveTimeline(); + List rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList()); + // Corrupted requested rollback plan should be deleted before scheduling a new one + assertEquals(rollbackInstants.size(), 1); + HoodieInstant rollbackInstant = rollbackInstants.get(0); + assertTrue(rollbackInstant.isCompleted()); + + if (isRollbackPlanCorrupted) { + // Should create a new rollback instant + assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp()); + } else { + // Should reuse the rollback instant + assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp()); + } + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 06f0ac49b63e..27dd9df5edd5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -244,6 +244,10 @@ public static void createRequestedRollbackFile(String basePath, String instantTi createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get()); } + public static void createRequestedRollbackFile(String basePath, String instantTime, byte[] content) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content); + } + public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); }