Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uncommitted file clean-up in transactions #218

Merged
merged 3 commits into from Jun 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg;

import java.util.function.Consumer;

/**
* API for table changes that produce snapshots. This interface contains common methods for all
* updates that create a new table {@link Snapshot}.
Expand All @@ -35,4 +37,12 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT set(String property, String value);

/**
* Set a callback to delete files instead of the table's default.
*
* @param deleteFunc a String consumer used to delete locations.
* @return this for method chaining
*/
ThisT deleteWith(Consumer<String> deleteFunc);

}
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTransaction.java
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -62,12 +63,13 @@ static Transaction newTransaction(TableOperations ops) {
return new BaseTransaction(ops, ops.refresh());
}

// exposed for testing
private final TableOperations ops;
private final TransactionTable transactionTable;
private final TableOperations transactionOps;
private final List<PendingUpdate> updates;
private final Set<Long> intermediateSnapshotIds;
private final Set<String> deletedFiles = Sets.newHashSet(); // keep track of files deleted in the most recent commit
private final Consumer<String> enqueueDelete = deletedFiles::add;
private TransactionType type;
private TableMetadata base;
private TableMetadata lastBase;
Expand Down Expand Up @@ -130,6 +132,7 @@ public UpdateLocation updateLocation() {
public AppendFiles newAppend() {
checkLastOperationCommitted("AppendFiles");
AppendFiles append = new MergeAppend(transactionOps);
append.deleteWith(enqueueDelete);
updates.add(append);
return append;
}
Expand All @@ -146,6 +149,7 @@ public AppendFiles newFastAppend() {
public RewriteFiles newRewrite() {
checkLastOperationCommitted("RewriteFiles");
RewriteFiles rewrite = new ReplaceFiles(transactionOps);
rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
}
Expand All @@ -154,6 +158,7 @@ public RewriteFiles newRewrite() {
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
RewriteManifests rewrite = new ReplaceManifests(transactionOps);
rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
}
Expand All @@ -162,6 +167,7 @@ public RewriteManifests rewriteManifests() {
public OverwriteFiles newOverwrite() {
checkLastOperationCommitted("OverwriteFiles");
OverwriteFiles overwrite = new OverwriteData(transactionOps);
overwrite.deleteWith(enqueueDelete);
updates.add(overwrite);
return overwrite;
}
Expand All @@ -170,6 +176,7 @@ public OverwriteFiles newOverwrite() {
public ReplacePartitions newReplacePartitions() {
checkLastOperationCommitted("ReplacePartitions");
ReplacePartitionsOperation replacePartitions = new ReplacePartitionsOperation(transactionOps);
replacePartitions.deleteWith(enqueueDelete);
updates.add(replacePartitions);
return replacePartitions;
}
Expand All @@ -178,6 +185,7 @@ public ReplacePartitions newReplacePartitions() {
public DeleteFiles newDelete() {
checkLastOperationCommitted("DeleteFiles");
DeleteFiles delete = new StreamingDelete(transactionOps);
delete.deleteWith(enqueueDelete);
updates.add(delete);
return delete;
}
Expand All @@ -186,6 +194,7 @@ public DeleteFiles newDelete() {
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
ExpireSnapshots expire = new RemoveSnapshots(transactionOps);
expire.deleteWith(enqueueDelete);
updates.add(expire);
return expire;
}
Expand Down Expand Up @@ -246,6 +255,7 @@ public void commitTransaction() {
if (base != underlyingOps.refresh()) {
this.base = underlyingOps.current(); // just refreshed
this.current = base;
this.deletedFiles.clear(); // clear deletes from the last set of operation commits
for (PendingUpdate update : updates) {
// re-commit each update in the chain to apply it and update current
update.commit();
Expand All @@ -255,6 +265,9 @@ public void commitTransaction() {
// fix up the snapshot log, which should not contain intermediate snapshots
underlyingOps.commit(base, current.removeSnapshotLogEntries(intermediateSnapshotIds));
});

// delete all of the files that were deleted in the most recent set of operation commits
deletedFiles.forEach(ops.io()::deleteFile);
break;
}
}
Expand Down Expand Up @@ -452,4 +465,9 @@ public LocationProvider locationProvider() {
TableOperations ops() {
return ops;
}

@VisibleForTesting
Set<String> deletedFiles() {
return deletedFiles;
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Expand Up @@ -50,6 +50,11 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
this.spec = ops.current().spec();
}

@Override
protected AppendFiles self() {
return this;
}

@Override
public AppendFiles set(String property, String value) {
summaryBuilder.set(property, value);
Expand Down
Expand Up @@ -116,8 +116,6 @@ public String partition() {
.propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT);
}

protected abstract ThisT self();

@Override
public ThisT set(String property, String value) {
summaryBuilder.set(property, value);
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/ReplaceManifests.java
Expand Up @@ -73,6 +73,11 @@ public class ReplaceManifests extends SnapshotProducer<RewriteManifests> impleme
ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
}

@Override
protected RewriteManifests self() {
return this;
}

@Override
protected String operation() {
return DataOperations.REPLACE;
Expand Down
28 changes: 25 additions & 3 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Expand Up @@ -21,6 +21,7 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -32,6 +33,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -56,6 +58,16 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class);
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();

/**
* Default callback used to delete files.
*/
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
ops.io().deleteFile(file);
}
};

/**
* Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
*/
Expand All @@ -67,6 +79,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private final List<String> manifestLists = Lists.newArrayList();
private Long snapshotId = null;
private TableMetadata base = null;
private Consumer<String> deleteFunc = defaultDelete;

protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
Expand All @@ -81,6 +94,15 @@ protected SnapshotProducer(TableOperations ops) {
});
}

protected abstract ThisT self();

@Override
public ThisT deleteWith(Consumer<String> deleteCallback) {
Preconditions.checkArgument(this.deleteFunc == defaultDelete, "Cannot set delete callback more than once");
this.deleteFunc = deleteCallback;
return self();
}

/**
* Clean up any uncommitted manifests that were created.
* <p>
Expand Down Expand Up @@ -227,7 +249,7 @@ public void commit() {
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
ops.io().deleteFile(manifestList);
deleteFile(manifestList);
}
}
} else {
Expand All @@ -243,14 +265,14 @@ public void commit() {

protected void cleanAll() {
for (String manifestList : manifestLists) {
ops.io().deleteFile(manifestList);
deleteFile(manifestList);
}
manifestLists.clear();
cleanUncommitted(EMPTY_SET);
}

protected void deleteFile(String path) {
ops.io().deleteFile(path);
deleteFunc.accept(path);
}

protected OutputFile manifestListPath() {
Expand Down
87 changes: 87 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTransaction.java
Expand Up @@ -19,12 +19,15 @@

package org.apache.iceberg;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.OutputFile;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -460,6 +463,90 @@ public void testTransactionRetryMergeCleanup() {
Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}

@Test
public void testTransactionRetryAndAppendManifests() throws Exception {
// use only one retry and aggressively merge manifests
table.updateProperties()
.set(TableProperties.COMMIT_NUM_RETRIES, "1")
.set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0")
.commit();

Assert.assertEquals("Table should be on version 1", 1, (int) version());

table.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.commit();

Assert.assertEquals("Table should be on version 2 after append", 2, (int) version());
Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().manifests().size());
ManifestFile v1manifest = table.currentSnapshot().manifests().get(0);

TableMetadata base = readMetadata();

// create a manifest append
OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
ManifestWriter writer = ManifestWriter.write(table.spec(), manifestLocation);
try {
writer.add(FILE_D);
} finally {
writer.close();
}

Transaction txn = table.newTransaction();

txn.newAppend()
.appendManifest(writer.toManifestFile())
.commit();

Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata());
Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version());

Assert.assertEquals("Append should have one merged manifest", 1, txn.table().currentSnapshot().manifests().size());
ManifestFile mergedManifest = txn.table().currentSnapshot().manifests().get(0);

// find the initial copy of the appended manifest
String copiedAppendManifest = Iterables.getOnlyElement(Iterables.filter(
Iterables.transform(listManifestFiles(), File::getPath),
path -> !v1manifest.path().contains(path) && !mergedManifest.path().contains(path)));

Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertTrue("Copied append manifest should not be deleted yet", new File(copiedAppendManifest).exists());

// cause the transaction commit to fail and retry
table.newAppend()
.appendFile(FILE_C)
.commit();

Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version());

txn.commitTransaction();

Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version());

Assert.assertTrue("Transaction should hijack the delete of the original copied manifest",
((BaseTransaction) txn).deletedFiles().contains(copiedAppendManifest));
Assert.assertFalse("Append manifest should be deleted", new File(copiedAppendManifest).exists());
Assert.assertTrue("Transaction should hijack the delete of the first merged manifest",
((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path()));
Assert.assertFalse("Append manifest should be deleted", new File(mergedManifest.path()).exists());

Assert.assertEquals("Should merge all commit manifests into a single manifest",
1, table.currentSnapshot().manifests().size());
}

@Test
public void testTransactionNoCustomDeleteFunc() {
AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction",
IllegalArgumentException.class, "Cannot set delete callback more than once",
() -> table.newTransaction()
.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.deleteWith(file -> { }));
}

@Test
public void testTransactionFastAppends() {
table.updateProperties()
Expand Down