Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3805] Delete existing corrupted requested rollback plan during rollback #5245

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.client;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
Expand Down Expand Up @@ -72,7 +71,6 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
Expand All @@ -82,6 +80,7 @@
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
Expand All @@ -95,8 +94,9 @@
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -105,11 +105,11 @@
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1113,9 +1113,28 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
for (HoodieInstant instant : instants) {
for (HoodieInstant rollbackInstant : instants) {
HoodieRollbackPlan rollbackPlan;
try {
rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
} catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would n't this go ahead and delete even if say the error was a legit IO exception? lets say cloud storage was inaccessible/connection timeout.. We should explicitly handle corruptions cleanly IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. probably we should catch only for the known exception and stacktrace as well.

Caused by: java.io.IOException: Not an Avro data fileat org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50)

this is the stacktrace when avro file is corrupt. we will follow up w/ the right fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@yihua yihua Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deletion only happens for the requested rollback plan. Any inflight rollback is not affected. The assumption here is that even if the requested rollback plan is inaccessible and deleted, it can be requested again by a new writer, which is still safe. We don't want the hanging rollback plan to block metadata table compaction.

The corruption is mainly due to writes of a rollback plan not atomic, and the job fails during that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. Ethan reminded me of the same discussion we had when the patch was put up. we found the existing fix as the safest option compared to other alternatives. Just to add to what Ethan has mentioned above, we do this only incase of rollback.requested and not for rollback.inflight. For inflight, its safe to re-use the plan from the rollback.requested file.

if (rollbackInstant.isRequested()) {
LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
try {
metaClient.getActiveTimeline().deletePending(rollbackInstant);
} catch (HoodieIOException he) {
LOG.warn("Cannot delete " + rollbackInstant, he);
continue;
}
} else {
// Here we assume that if the rollback is inflight, the rollback plan is intact
// in instant.rollback.requested. The exception here can be due to other reasons.
LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
}
continue;
}

try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
String action = rollbackPlan.getInstantToRollback().getAction();
if (ignoreCompactionAndClusteringInstants) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
Expand All @@ -1124,14 +1143,14 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
if (!isClustering) {
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
}
}
} else {
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
}
} catch (IOException e) {
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
} catch (Exception e) {
LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
}
}
return infoMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -61,9 +62,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -481,4 +484,107 @@ public void testAutoRollbackInflightCommit() throws Exception {
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
}
}

private static Stream<Arguments> testRollbackWithRequestedRollbackPlanParams() {
return Arrays.stream(new Boolean[][] {
{true, true}, {true, false}, {false, true}, {false, false},
}).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("testRollbackWithRequestedRollbackPlanParams")
public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception {
// Let's create some commit files and base files
final String p1 = "2022/04/05";
final String p2 = "2022/04/06";
final String commitTime1 = "20220406010101002";
final String commitTime2 = "20220406020601002";
final String commitTime3 = "20220406030611002";
final String rollbackInstantTime = "20220406040611002";
Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
{
put(p1, "id11");
put(p2, "id12");
}
};
Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
{
put(p1, "id21");
put(p2, "id22");
}
};
Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
{
put(p1, "id31");
put(p2, "id32");
}
};

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
// Column Stats Index is disabled, since these tests construct tables which are
// not valid (empty commit metadata, invalid parquet files)
.withMetadataIndexColumnStats(false)
.enable(enableMetadataTable)
.build()
)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

HoodieTestTable testTable = enableMetadataTable
? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
metaClient.getHadoopConf(), config, context))
: HoodieTestTable.of(metaClient);

testTable.withPartitionMetaFiles(p1, p2)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
if (isRollbackPlanCorrupted) {
// Add a corrupted requested rollback plan
FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2});
} else {
// Add a valid requested rollback plan to roll back commitTime3
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
List<HoodieRollbackRequest> rollbackRequestList = partitionAndFileId3.keySet().stream()
.map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, EMPTY_STRING,
Collections.singletonList(metaClient.getBasePath() + "/" + partition + "/"
+ FileCreateUtils.baseFileName(commitTime3, partitionAndFileId3.get(p1))),
Collections.emptyMap()))
.collect(Collectors.toList());
rollbackPlan.setRollbackRequests(rollbackRequestList);
rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION));
FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, rollbackPlan);
}

// Rollback commit3
client.rollback(commitTime3);
assertFalse(testTable.inflightCommitExists(commitTime3));
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));

metaClient.reloadActiveTimeline();
List<HoodieInstant> rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
// Corrupted requested rollback plan should be deleted before scheduling a new one
assertEquals(rollbackInstants.size(), 1);
HoodieInstant rollbackInstant = rollbackInstants.get(0);
assertTrue(rollbackInstant.isCompleted());

if (isRollbackPlanCorrupted) {
// Should create a new rollback instant
assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
} else {
// Should reuse the rollback instant
assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public static void createRequestedRollbackFile(String basePath, String instantTi
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
}

public static void createRequestedRollbackFile(String basePath, String instantTime, byte[] content) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
}

public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
}
Expand Down