diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java index 73509c15384f..98ff7086c715 100644 --- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java +++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java @@ -60,6 +60,22 @@ public interface SnapshotUpdate extends PendingUpdate { */ ThisT scanManifestsWith(ExecutorService executorService); + /** + * Use a particular executor to write manifests during commit with the specified parallelism. The + * default worker pool will be used by default. + * + *

The parallelism parameter controls how many manifest writers are used, which determines the + * number of manifest files produced. The executor provides the threads for parallel execution. + * + * @param executorService the provided executor + * @param parallelism the number of parallel manifest writers to use + * @return this for method chaining + */ + default ThisT writeManifestsWith(ExecutorService executorService, int parallelism) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not support writeManifestsWith"); + } + /** * Perform operations on a particular branch * diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index e351009a9ea6..f77222d6a2a6 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -124,6 +124,8 @@ public void accept(String file) { SnapshotAncestryValidator.NON_VALIDATING; private ExecutorService workerPool; + private ExecutorService writePool; + private int writePoolParallelism = ThreadPools.WORKER_THREAD_POOL_SIZE; private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; @@ -169,6 +171,16 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + @Override + public ThisT writeManifestsWith(ExecutorService executorService, int parallelism) { + Preconditions.checkArgument(executorService != null, "Executor service cannot be null"); + Preconditions.checkArgument( + parallelism > 0, "Parallelism must be greater than 0, but was: %s", parallelism); + this.writePool = executorService; + this.writePoolParallelism = parallelism; + return self(); + } + /** * Set a validator to check snapshot ancestry before committing changes. * @@ -227,6 +239,14 @@ protected ExecutorService workerPool() { return workerPool; } + protected ExecutorService writePool() { + if (writePool == null) { + this.writePool = ThreadPools.getWorkerPool(); + } + + return writePool; + } + @Override public ThisT deleteWith(Consumer deleteCallback) { Preconditions.checkArgument( @@ -780,9 +800,9 @@ private List writeDeleteFileGroup( return writer.toManifestFiles(); } - private static List writeManifests( + private List writeManifests( Collection files, Function, List> writeFunc) { - int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size()); + int parallelism = manifestWriterCount(writePoolParallelism, files.size()); List> groups = divide(files, parallelism); // Create a new list pairing each group with its index @@ -796,7 +816,7 @@ private static List writeManifests( Tasks.foreach(groupsWithIndex) .stopOnFailure() .throwFailureWhenFinished() - .executeWith(ThreadPools.getWorkerPool()) + .executeWith(writePool()) .run( indexedGroup -> { int index = indexedGroup.first(); diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index c5f3d88f46d9..eba412d52b46 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -41,7 +41,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ThreadPools; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -91,11 +90,6 @@ public void testAddManyFilesWithConsistentOrdering() { assertThat(listManifestFiles()).as("Table should start empty").isEmpty(); int multiplier = 3; - assumeThat(ThreadPools.WORKER_THREAD_POOL_SIZE) - .as( - "Worker thread pool size should be at least 3 to test manifest file ordering with multiple threads") - .isGreaterThanOrEqualTo(multiplier); - int groupSize = SnapshotProducer.MIN_FILE_GROUP_SIZE; List dataFiles = Lists.newArrayList(); @@ -107,6 +101,7 @@ public void testAddManyFilesWithConsistentOrdering() { AppendFiles append = table.newAppend(); dataFiles.forEach(append::appendFile); + append.writeManifestsWith(Executors.newFixedThreadPool(multiplier), multiplier); append.commit(); Snapshot snapshot = table.currentSnapshot(); @@ -381,6 +376,83 @@ public void testAppendWithManifestScanExecutor() { assertThat(snapshot).isNotNull(); } + @TestTemplate + public void testAppendWithWriteManifestsExecutor() { + assertThat(listManifestFiles()).isEmpty(); + + TableMetadata base = readMetadata(); + assertThat(base.currentSnapshot()).isNull(); + assertThat(base.lastSequenceNumber()).isEqualTo(0); + AtomicInteger commitThreadsIndex = new AtomicInteger(0); + Snapshot snapshot = + commit( + table, + table + .newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .writeManifestsWith( + Executors.newFixedThreadPool( + 1, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("commit-" + commitThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + }), + 1), + branch); + assertThat(commitThreadsIndex.get()) + .as("Thread should be created in provided commit pool") + .isGreaterThan(0); + assertThat(snapshot).isNotNull(); + } + + @TestTemplate + public void testAppendWithSeparateScanAndWriteExecutors() { + assertThat(listManifestFiles()).isEmpty(); + + TableMetadata base = readMetadata(); + assertThat(base.currentSnapshot()).isNull(); + assertThat(base.lastSequenceNumber()).isEqualTo(0); + AtomicInteger scanThreadsIndex = new AtomicInteger(0); + AtomicInteger commitThreadsIndex = new AtomicInteger(0); + Snapshot snapshot = + commit( + table, + table + .newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .scanManifestsWith( + Executors.newFixedThreadPool( + 1, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("scan-" + scanThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .writeManifestsWith( + Executors.newFixedThreadPool( + 1, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("commit-" + commitThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + }), + 1), + branch); + assertThat(scanThreadsIndex.get()) + .as("Thread should be created in provided scan pool") + .isGreaterThan(0); + assertThat(commitThreadsIndex.get()) + .as("Thread should be created in provided commit pool") + .isGreaterThan(0); + assertThat(snapshot).isNotNull(); + } + @TestTemplate public void testMergeWithAppendFilesAndManifest() throws IOException { // merge all manifests for this test diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index c6092f0238b9..1e6021a5c9fc 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.iceberg.exceptions.ValidationException; @@ -252,4 +254,26 @@ public void testAvroManifestCompressionFromTableProperty() throws IOException { ManifestFile manifest = table.currentSnapshot().dataManifests(table.io()).get(0); assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy"); } + + @TestTemplate + public void testWriteManifestsWithNullExecutorThrows() { + assertThatThrownBy(() -> table.newAppend().writeManifestsWith(null, 4)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Executor service cannot be null"); + } + + @TestTemplate + public void testWriteManifestsWithInvalidParallelismThrows() { + ExecutorService executor = Executors.newFixedThreadPool(4); + try { + assertThatThrownBy(() -> table.newAppend().writeManifestsWith(executor, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Parallelism must be greater than 0"); + assertThatThrownBy(() -> table.newAppend().writeManifestsWith(executor, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Parallelism must be greater than 0"); + } finally { + executor.shutdownNow(); + } + } }