Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2997] Skip the corrupt meta file for pending rollback action #4296

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -898,21 +899,22 @@ private Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMeta
}

/**
* Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}.
* Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
* @return map of pending commits to be rolledback instants to Rollback Instant and Rollback plan Pair.
* @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
*/
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map(
entry -> {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry);
return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan)));
} catch (IOException e) {
throw new HoodieIOException("Fetching rollback plan failed for " + entry, e);
}
}
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
for (HoodieInstant instant : instants) {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
} catch (IOException e) {
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also fail due to access/issues talking to cloud storage/HDFS right? So, does it make sense to just skip in all cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file was broken for most of the cases, maybe we can retry for network issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5245 has revised this logic to delete corrupted requested rollback plan if IOException is thrown while trying to get the rollback plan.

}
}
return infoMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public class RollbackUtils {
public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
throws IOException {
// TODO: add upgrade step if required.
final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant);
return TimelineMetadataUtils.deserializeAvroMetadata(
metaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get(), HoodieRollbackPlan.class);
metaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class);
}

static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String instantToRollback, String rollbackInstantTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ static HoodieInstant getReplaceCommitInflightInstant(final String timestamp) {
return new HoodieInstant(State.INFLIGHT, REPLACE_COMMIT_ACTION, timestamp);
}

static HoodieInstant getRollbackRequestedInstant(HoodieInstant instant) {
return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant);
}

/**
* Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names
* between inflight and completed instants (compaction <=> commit).
Expand Down