Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,37 +177,34 @@ private void validateRollbackCommitSequence() {
// since with LAZY rollback we support parallel writing which can allow a new inflight while rollback is ongoing
// Remove this once we support LAZY rollback of failed writes by default as parallel writing becomes the default
// writer mode.
if (config.getFailedWritesCleanPolicy().isEager() && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
final String instantTimeToRollback = instantToRollback.requestedTime();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
HoodieTimeline pendingCommitsTimeline = table.getPendingCommitsTimeline();
// Check validity of completed commit timeline.
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
// this condition may not hold good for metadata table. since the order of commits applied to MDT in data table commits and the ordering could be different.
if ((instantTimeToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
// check if remnants are from a previous LAZY rollback config, if yes, let out of order rollback continue
try {
if (!HoodieHeartbeatClient.heartbeatExists(table.getStorage(),
config.getBasePath(), instantTimeToRollback)) {
throw new HoodieRollbackException(
"Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
} catch (IOException io) {
throw new HoodieRollbackException("Unable to rollback commits ", io);
final String instantTimeToRollback = instantToRollback.requestedTime();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
HoodieTimeline pendingCommitsTimeline = table.getPendingCommitsTimeline();
// Check validity of completed commit timeline.
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
// this condition may not hold good for metadata table. since the order of commits applied to MDT in data table commits and the ordering could be different.
if ((instantTimeToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
// check if remnants are from a previous LAZY rollback config, if yes, let out of order rollback continue
try {
if (!HoodieHeartbeatClient.heartbeatExists(table.getStorage(), config.getBasePath(), instantTimeToRollback)) {
throw new HoodieRollbackException(
"Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
} catch (IOException io) {
throw new HoodieRollbackException("Unable to rollback commits ", io);
}
}

List<String> inflights = pendingCommitsTimeline.getInstantsAsStream()
.filter(instant -> !ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant, instantGenerator))
.map(HoodieInstant::requestedTime)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
List<String> inflights = pendingCommitsTimeline.getInstantsAsStream()
.filter(instant -> !ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant, instantGenerator))
.map(HoodieInstant::requestedTime)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
}

Expand All @@ -220,15 +217,19 @@ private void rollBackIndex() {

public List<HoodieRollbackStat> doRollbackAndGetStats(HoodieRollbackPlan hoodieRollbackPlan) {
final String instantTimeToRollback = instantToRollback.requestedTime();
validateSavepointRollbacks();
final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
&& !instantToRollback.isCompleted();

final boolean isPendingClustering = !instantToRollback.isCompleted()
&& ClusteringUtils.isClusteringInstant(
table.getMetaClient().getActiveTimeline(), instantToRollback, instantGenerator);
validateSavepointRollbacks();
if (!isPendingCompaction && !isPendingClustering) {
validateRollbackCommitSequence();
// Validate commit sequence on main tables when clean policy is eager.
if (config.getFailedWritesCleanPolicy().isEager() && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
final boolean isPendingClustering = !instantToRollback.isCompleted()
&& ClusteringUtils.isClusteringInstant(
table.getMetaClient().getActiveTimeline(), instantToRollback, instantGenerator);

if (!isPendingCompaction && !isPendingClustering) {
validateRollbackCommitSequence();
}
}

backupRollbackInstantsIfNeeded();
Expand Down
Loading