diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index f1832f2787a8..fc91d42fcc96 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -31,8 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -65,7 +65,6 @@ enum TransactionType { private final TransactionTable transactionTable; private final TableOperations transactionOps; private final List updates; - private final Set intermediateSnapshotIds; private final Set deletedFiles = Sets.newHashSet(); // keep track of files deleted in the most recent commit private final Consumer enqueueDelete = deletedFiles::add; @@ -92,7 +91,6 @@ enum TransactionType { this.current = start; this.transactionOps = new TransactionTableOperations(); this.updates = Lists.newArrayList(); - this.intermediateSnapshotIds = Sets.newHashSet(); this.base = ops.current(); this.type = type; this.hasLastOpCommitted = true; @@ -395,9 +393,8 @@ private void commitSimpleTransaction() { return; } - // this is always set to the latest commit attempt's snapshot id. - AtomicLong currentSnapshotId = new AtomicLong(-1L); - + Set preTxnSnapshots = + base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); try { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) @@ -411,11 +408,6 @@ private void commitSimpleTransaction() { underlyingOps -> { applyUpdates(underlyingOps); - if (current.currentSnapshot() != null) { - currentSnapshotId.set(current.currentSnapshot().snapshotId()); - } - - // fix up the snapshot log, which should not contain intermediate snapshots underlyingOps.commit(base, current); }); @@ -433,17 +425,18 @@ private void commitSimpleTransaction() { // the commit succeeded try { - if (currentSnapshotId.get() != -1) { - intermediateSnapshotIds.add(currentSnapshotId.get()); + // clean up the data files that were deleted by each operation. first, get the list of + // committed manifests to ensure that no committed manifest is deleted. + // A manifest could be deleted in one successful operation commit, but reused in another + // successful commit of that operation if the whole transaction is retried. + Set currentSnapshots = Sets.newHashSet(); + for (Snapshot snapshot : current.snapshots()) { + if (!preTxnSnapshots.contains(snapshot.snapshotId())) { + currentSnapshots.add(snapshot.snapshotId()); + } } - // clean up the data files that were deleted by each operation. first, get the list of - // committed manifests to - // ensure that no committed manifest is deleted. a manifest could be deleted in one successful - // operation - // commit, but reused in another successful commit of that operation if the whole transaction - // is retried. - Set committedFiles = committedFiles(ops, intermediateSnapshotIds); + Set committedFiles = committedFiles(ops, currentSnapshots); if (committedFiles != null) { // delete all of the files that were deleted in the most recent set of operation commits Tasks.foreach(deletedFiles) @@ -520,15 +513,6 @@ private static Set committedFiles(TableOperations ops, Set snapsho return committedFiles; } - private static Long currentId(TableMetadata meta) { - if (meta != null) { - if (meta.currentSnapshot() != null) { - return meta.currentSnapshot().snapshotId(); - } - } - return null; - } - public class TransactionTableOperations implements TableOperations { private TableOperations tempOps = ops.temp(current); @@ -550,13 +534,6 @@ public void commit(TableMetadata underlyingBase, TableMetadata metadata) { throw new CommitFailedException("Table metadata refresh is required"); } - // track the intermediate snapshot ids for rewriting the snapshot log - // an id is intermediate if it isn't the base snapshot id and it is replaced by a new current - Long oldId = currentId(current); - if (oldId != null && !oldId.equals(currentId(metadata)) && !oldId.equals(currentId(base))) { - intermediateSnapshotIds.add(oldId); - } - BaseTransaction.this.current = metadata; this.tempOps = ops.temp(metadata);