Skip to content

Commit

Permalink
Core: Fix for deleting files when commiting transactions with multipl…
Browse files Browse the repository at this point in the history
…e branches
  • Loading branch information
amogh-jahagirdar committed Jan 23, 2023
1 parent b008af5 commit 3eb6f3c
Showing 1 changed file with 13 additions and 36 deletions.
49 changes: 13 additions & 36 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Expand Up @@ -31,8 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -65,7 +65,6 @@ enum TransactionType {
private final TransactionTable transactionTable;
private final TableOperations transactionOps;
private final List<PendingUpdate> updates;
private final Set<Long> intermediateSnapshotIds;
private final Set<String> deletedFiles =
Sets.newHashSet(); // keep track of files deleted in the most recent commit
private final Consumer<String> enqueueDelete = deletedFiles::add;
Expand All @@ -92,7 +91,6 @@ enum TransactionType {
this.current = start;
this.transactionOps = new TransactionTableOperations();
this.updates = Lists.newArrayList();
this.intermediateSnapshotIds = Sets.newHashSet();
this.base = ops.current();
this.type = type;
this.hasLastOpCommitted = true;
Expand Down Expand Up @@ -395,9 +393,8 @@ private void commitSimpleTransaction() {
return;
}

// this is always set to the latest commit attempt's snapshot id.
AtomicLong currentSnapshotId = new AtomicLong(-1L);

Set<Long> preTxnSnapshots =
base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Expand All @@ -411,11 +408,6 @@ private void commitSimpleTransaction() {
underlyingOps -> {
applyUpdates(underlyingOps);

if (current.currentSnapshot() != null) {
currentSnapshotId.set(current.currentSnapshot().snapshotId());
}

// fix up the snapshot log, which should not contain intermediate snapshots
underlyingOps.commit(base, current);
});

Expand All @@ -433,17 +425,18 @@ private void commitSimpleTransaction() {
// the commit succeeded

try {
if (currentSnapshotId.get() != -1) {
intermediateSnapshotIds.add(currentSnapshotId.get());
// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to ensure that no committed manifest is deleted.
// A manifest could be deleted in one successful operation commit, but reused in another
// successful commit of that operation if the whole transaction is retried.
Set<Long> currentSnapshots = Sets.newHashSet();
for (Snapshot snapshot : current.snapshots()) {
if (!preTxnSnapshots.contains(snapshot.snapshotId())) {
currentSnapshots.add(snapshot.snapshotId());
}
}

// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to
// ensure that no committed manifest is deleted. a manifest could be deleted in one successful
// operation
// commit, but reused in another successful commit of that operation if the whole transaction
// is retried.
Set<String> committedFiles = committedFiles(ops, intermediateSnapshotIds);
Set<String> committedFiles = committedFiles(ops, currentSnapshots);
if (committedFiles != null) {
// delete all of the files that were deleted in the most recent set of operation commits
Tasks.foreach(deletedFiles)
Expand Down Expand Up @@ -520,15 +513,6 @@ private static Set<String> committedFiles(TableOperations ops, Set<Long> snapsho
return committedFiles;
}

private static Long currentId(TableMetadata meta) {
if (meta != null) {
if (meta.currentSnapshot() != null) {
return meta.currentSnapshot().snapshotId();
}
}
return null;
}

public class TransactionTableOperations implements TableOperations {
private TableOperations tempOps = ops.temp(current);

Expand All @@ -550,13 +534,6 @@ public void commit(TableMetadata underlyingBase, TableMetadata metadata) {
throw new CommitFailedException("Table metadata refresh is required");
}

// track the intermediate snapshot ids for rewriting the snapshot log
// an id is intermediate if it isn't the base snapshot id and it is replaced by a new current
Long oldId = currentId(current);
if (oldId != null && !oldId.equals(currentId(metadata)) && !oldId.equals(currentId(base))) {
intermediateSnapshotIds.add(oldId);
}

BaseTransaction.this.current = metadata;

this.tempOps = ops.temp(metadata);
Expand Down

0 comments on commit 3eb6f3c

Please sign in to comment.