diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 18f93faf245f..286e3adbb782 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -82,6 +82,7 @@ import java.text.ParseException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -898,21 +899,22 @@ private Option getPendingRollbackInfo(HoodieTableMeta } /** - * Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}. + * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}. * @param metaClient instance of {@link HoodieTableMetaClient} to use. - * @return map of pending commits to be rolledback instants to Rollback Instant and Rollback plan Pair. + * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair. */ protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { - return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map( - entry -> { - try { - HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry); - return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan))); - } catch (IOException e) { - throw new HoodieIOException("Fetching rollback plan failed for " + entry, e); - } - } - ).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList()); + Map> infoMap = new HashMap<>(); + for (HoodieInstant instant : instants) { + try { + HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant); + infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan))); + } catch (IOException e) { + LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e); + } + } + return infoMap; } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 1de1a1363887..a4b59a88b92c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -65,8 +65,9 @@ public class RollbackUtils { public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant) throws IOException { // TODO: add upgrade step if required. + final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant); return TimelineMetadataUtils.deserializeAvroMetadata( - metaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get(), HoodieRollbackPlan.class); + metaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class); } static Map generateHeader(String instantToRollback, String rollbackInstantTime) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 0898498be63b..d5d322384c19 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -334,6 +334,10 @@ static HoodieInstant getReplaceCommitInflightInstant(final String timestamp) { return new HoodieInstant(State.INFLIGHT, REPLACE_COMMIT_ACTION, timestamp); } + static HoodieInstant getRollbackRequestedInstant(HoodieInstant instant) { + return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant); + } + /** * Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names * between inflight and completed instants (compaction <=> commit).