Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public interface SnapshotUpdate<ThisT> extends PendingUpdate<Snapshot> {
*/
ThisT scanManifestsWith(ExecutorService executorService);

/**
* Use a particular executor to write manifests during commit with the specified parallelism. The
* default worker pool will be used by default.
*
* <p>The parallelism parameter controls how many manifest writers are used, which determines the
* number of manifest files produced. The executor provides the threads for parallel execution.
*
* @param executorService the provided executor
* @param parallelism the number of parallel manifest writers to use
* @return this for method chaining
*/
default ThisT writeManifestsWith(ExecutorService executorService, int parallelism) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not support writeManifestsWith");
}

/**
* Perform operations on a particular branch
*
Expand Down
26 changes: 23 additions & 3 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public void accept(String file) {
SnapshotAncestryValidator.NON_VALIDATING;

private ExecutorService workerPool;
private ExecutorService writePool;
private int writePoolParallelism = ThreadPools.WORKER_THREAD_POOL_SIZE;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private CommitMetrics commitMetrics;

Expand Down Expand Up @@ -169,6 +171,16 @@ public ThisT scanManifestsWith(ExecutorService executorService) {
return self();
}

@Override
public ThisT writeManifestsWith(ExecutorService executorService, int parallelism) {
Preconditions.checkArgument(executorService != null, "Executor service cannot be null");
Preconditions.checkArgument(
Comment thread
stevenzwu marked this conversation as resolved.
parallelism > 0, "Parallelism must be greater than 0, but was: %s", parallelism);
this.writePool = executorService;
this.writePoolParallelism = parallelism;
return self();
}

/**
* Set a validator to check snapshot ancestry before committing changes.
*
Expand Down Expand Up @@ -227,6 +239,14 @@ protected ExecutorService workerPool() {
return workerPool;
}

protected ExecutorService writePool() {
if (writePool == null) {
this.writePool = ThreadPools.getWorkerPool();
}

return writePool;
}

@Override
public ThisT deleteWith(Consumer<String> deleteCallback) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -780,9 +800,9 @@ private List<ManifestFile> writeDeleteFileGroup(
return writer.toManifestFiles();
}

private static <F> List<ManifestFile> writeManifests(
private <F> List<ManifestFile> writeManifests(
Copy link
Copy Markdown
Contributor Author

@dramaticlly dramaticlly May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed static as now require to access the commitPoolSized for instance variable, also this method is called during apply() single-threaded per snapshot operation.

Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size());
int parallelism = manifestWriterCount(writePoolParallelism, files.size());
List<List<F>> groups = divide(files, parallelism);

// Create a new list pairing each group with its index
Expand All @@ -796,7 +816,7 @@ private static <F> List<ManifestFile> writeManifests(
Tasks.foreach(groupsWithIndex)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.executeWith(writePool())
.run(
indexedGroup -> {
int index = indexedGroup.first();
Expand Down
84 changes: 78 additions & 6 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ThreadPools;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -91,11 +90,6 @@ public void testAddManyFilesWithConsistentOrdering() {
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();

int multiplier = 3;
assumeThat(ThreadPools.WORKER_THREAD_POOL_SIZE)
.as(
"Worker thread pool size should be at least 3 to test manifest file ordering with multiple threads")
.isGreaterThanOrEqualTo(multiplier);

int groupSize = SnapshotProducer.MIN_FILE_GROUP_SIZE;
List<DataFile> dataFiles = Lists.newArrayList();

Expand All @@ -107,6 +101,7 @@ public void testAddManyFilesWithConsistentOrdering() {

AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
append.writeManifestsWith(Executors.newFixedThreadPool(multiplier), multiplier);
append.commit();

Snapshot snapshot = table.currentSnapshot();
Expand Down Expand Up @@ -381,6 +376,83 @@ public void testAppendWithManifestScanExecutor() {
assertThat(snapshot).isNotNull();
}

@TestTemplate
public void testAppendWithWriteManifestsExecutor() {
assertThat(listManifestFiles()).isEmpty();

TableMetadata base = readMetadata();
assertThat(base.currentSnapshot()).isNull();
assertThat(base.lastSequenceNumber()).isEqualTo(0);
AtomicInteger commitThreadsIndex = new AtomicInteger(0);
Snapshot snapshot =
commit(
table,
table
.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.writeManifestsWith(
Executors.newFixedThreadPool(
1,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("commit-" + commitThreadsIndex.getAndIncrement());
thread.setDaemon(true);
return thread;
}),
1),
branch);
assertThat(commitThreadsIndex.get())
.as("Thread should be created in provided commit pool")
.isGreaterThan(0);
assertThat(snapshot).isNotNull();
}

@TestTemplate
public void testAppendWithSeparateScanAndWriteExecutors() {
assertThat(listManifestFiles()).isEmpty();

TableMetadata base = readMetadata();
assertThat(base.currentSnapshot()).isNull();
assertThat(base.lastSequenceNumber()).isEqualTo(0);
AtomicInteger scanThreadsIndex = new AtomicInteger(0);
AtomicInteger commitThreadsIndex = new AtomicInteger(0);
Snapshot snapshot =
commit(
table,
table
.newAppend()
.appendFile(FILE_A)
.appendFile(FILE_B)
.scanManifestsWith(
Executors.newFixedThreadPool(
1,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("scan-" + scanThreadsIndex.getAndIncrement());
thread.setDaemon(true);
return thread;
}))
.writeManifestsWith(
Executors.newFixedThreadPool(
1,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("commit-" + commitThreadsIndex.getAndIncrement());
thread.setDaemon(true);
return thread;
}),
1),
branch);
assertThat(scanThreadsIndex.get())
.as("Thread should be created in provided scan pool")
.isGreaterThan(0);
assertThat(commitThreadsIndex.get())
.as("Thread should be created in provided commit pool")
.isGreaterThan(0);
assertThat(snapshot).isNotNull();
}

@TestTemplate
public void testMergeWithAppendFilesAndManifest() throws IOException {
// merge all manifests for this test
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -252,4 +254,26 @@ public void testAvroManifestCompressionFromTableProperty() throws IOException {
ManifestFile manifest = table.currentSnapshot().dataManifests(table.io()).get(0);
assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy");
}

@TestTemplate
public void testWriteManifestsWithNullExecutorThrows() {
assertThatThrownBy(() -> table.newAppend().writeManifestsWith(null, 4))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Executor service cannot be null");
}

@TestTemplate
public void testWriteManifestsWithInvalidParallelismThrows() {
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
assertThatThrownBy(() -> table.newAppend().writeManifestsWith(executor, 0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Parallelism must be greater than 0");
assertThatThrownBy(() -> table.newAppend().writeManifestsWith(executor, -1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Parallelism must be greater than 0");
} finally {
executor.shutdownNow();
}
}
}