From 71f6cae440b684715411b52f31c4a7df948e9cb4 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 18 Mar 2026 12:43:31 +0100 Subject: [PATCH 1/3] Flink: Add branch support to RewriteDataFiles maintenance task The current code always compact the main branch, regardless of which branch gets set via `IcebergSink#toBranch(branch)`. This change now compacts the correct branch by adding branch support to the RewriteDataFiles maintenance task. --- .../maintenance/api/RewriteDataFiles.java | 20 ++++++++- .../operator/DataFileRewriteCommitter.java | 18 ++++++-- .../operator/DataFileRewritePlanner.java | 23 +++++++--- .../operator/DataFileRewriteRunner.java | 10 +++-- .../iceberg/flink/sink/IcebergSink.java | 5 ++- .../maintenance/api/TestRewriteDataFiles.java | 35 +++++++++++++++ .../maintenance/operator/RewriteUtil.java | 3 +- .../TestDataFileRewriteCommitter.java | 3 +- .../operator/TestDataFileRewritePlanner.java | 44 ++++++++++++++++++- .../operator/TestDataFileRewriteRunner.java | 3 +- 10 files changed, 143 insertions(+), 21 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 3b64a79eee89..9aeee75b1464 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; import org.apache.iceberg.expressions.Expression; @@ -59,6 +60,7 @@ public static class Builder extends MaintenanceTaskBuilder rewriteOptions = Maps.newHashMapWithExpectedSize(6); private long maxRewriteBytes = Long.MAX_VALUE; private Expression filter = Expressions.alwaysTrue(); + private String branch = SnapshotRef.MAIN_BRANCH; @Override String maintenanceTaskName() { @@ -218,6 +220,18 @@ public Builder filter(Expression newFilter) { return this; } + /** + * Sets the branch to compact. When set, the planner reads from the branch's snapshot and + * commits are made to this branch. + * + * @param newBranch the branch name + * @return this for method chaining + */ + public Builder branch(String newBranch) { + this.branch = newBranch; + return this; + } + /** * Configures the properties for the rewriter. * @@ -262,7 +276,8 @@ DataStream append(DataStream trigger) { partialProgressEnabled ? partialProgressMaxCommits : 1, maxRewriteBytes, rewriteOptions, - filter)) + filter, + branch)) .name(operatorName(PLANNER_TASK_NAME)) .uid(PLANNER_TASK_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) @@ -282,7 +297,8 @@ DataStream append(DataStream trigger) { .transform( operatorName(COMMIT_TASK_NAME), TypeInformation.of(Trigger.class), - new DataFileRewriteCommitter(tableName(), taskName(), index(), tableLoader())) + new DataFileRewriteCommitter( + tableName(), taskName(), index(), tableLoader(), branch)) .uid(COMMIT_TASK_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .forceNonParallel(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java index 135d3d9b42db..a1882edb0415 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java @@ -29,12 +29,14 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService; import org.apache.iceberg.actions.RewriteFileGroup; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ public class DataFileRewriteCommitter extends AbstractStreamOperator private final String taskName; private final int taskIndex; private final TableLoader tableLoader; + private final String branch; private transient Table table; private transient CommitService commitService; @@ -62,7 +65,7 @@ public class DataFileRewriteCommitter extends AbstractStreamOperator private transient Counter removedDataFileSizeCounter; public DataFileRewriteCommitter( - String tableName, String taskName, int taskIndex, TableLoader tableLoader) { + String tableName, String taskName, int taskIndex, TableLoader tableLoader, String branch) { Preconditions.checkNotNull(tableName, "Table name should no be null"); Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); @@ -71,6 +74,7 @@ public DataFileRewriteCommitter( this.taskName = taskName; this.taskIndex = taskIndex; this.tableLoader = tableLoader; + this.branch = branch; } @Override @@ -103,7 +107,7 @@ public void processElement(StreamRecord str FlinkRewriteDataFilesCommitManager commitManager = new FlinkRewriteDataFilesCommitManager( - table, executedGroup.snapshotId(), streamRecord.getTimestamp()); + table, executedGroup.snapshotId(), streamRecord.getTimestamp(), branch); this.commitService = commitManager.service(executedGroup.groupsPerCommit()); commitService.start(); } @@ -164,8 +168,14 @@ public void close() throws IOException { private class FlinkRewriteDataFilesCommitManager extends RewriteDataFilesCommitManager { private final long timestamp; - FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, long timestamp) { - super(table, startingSnapshotId); + FlinkRewriteDataFilesCommitManager( + Table table, long startingSnapshotId, long timestamp, String branch) { + super( + table, + startingSnapshotId, + RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER_DEFAULT, + ImmutableMap.of(), + branch); this.timestamp = timestamp; } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index c50060e16a6c..f9a3c87ab726 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -29,6 +29,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.FileRewritePlan; import org.apache.iceberg.actions.RewriteDataFiles; @@ -62,6 +63,7 @@ public class DataFileRewritePlanner private final Map rewriterOptions; private transient Counter errorCounter; private final Expression filter; + private final String branch; public DataFileRewritePlanner( String tableName, @@ -71,7 +73,8 @@ public DataFileRewritePlanner( int newPartialProgressMaxCommits, long maxRewriteBytes, Map rewriterOptions, - Expression filter) { + Expression filter, + String branch) { Preconditions.checkNotNull(tableName, "Table name should no be null"); Preconditions.checkNotNull(taskName, "Task name should no be null"); @@ -86,6 +89,7 @@ public DataFileRewritePlanner( this.maxRewriteBytes = maxRewriteBytes; this.rewriterOptions = rewriterOptions; this.filter = filter; + this.branch = branch; } @Override @@ -108,7 +112,8 @@ public void processElement(Trigger value, Context ctx, Collector o try { SerializableTable table = (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); - if (table.currentSnapshot() == null) { + Snapshot snapshot = branch != null ? table.snapshot(branch) : table.currentSnapshot(); + if (snapshot == null) { LOG.info( DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table", tableName, @@ -118,7 +123,8 @@ public void processElement(Trigger value, Context ctx, Collector o return; } - BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, filter); + BinPackRewriteFilePlanner planner = + new BinPackRewriteFilePlanner(table, filter, snapshot.snapshotId(), false); planner.init(rewriterOptions); FileRewritePlan @@ -164,7 +170,7 @@ public void processElement(Trigger value, Context ctx, Collector o taskIndex, ctx.timestamp(), group); - out.collect(new PlannedGroup(table, groupsPerCommit, group)); + out.collect(new PlannedGroup(table, groupsPerCommit, group, branch)); } } catch (Exception e) { LOG.warn( @@ -189,11 +195,14 @@ public static class PlannedGroup { private final SerializableTable table; private final int groupsPerCommit; private final RewriteFileGroup group; + private final String branch; - private PlannedGroup(SerializableTable table, int groupsPerCommit, RewriteFileGroup group) { + private PlannedGroup( + SerializableTable table, int groupsPerCommit, RewriteFileGroup group, String branch) { this.table = table; this.groupsPerCommit = groupsPerCommit; this.group = group; + this.branch = branch; } SerializableTable table() { @@ -207,5 +216,9 @@ int groupsPerCommit() { RewriteFileGroup group() { return group; } + + String branch() { + return branch; + } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java index 57b0e53d86e6..ce459ed2cb49 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -35,6 +35,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.RewriteFileGroup; @@ -122,11 +123,12 @@ public void processElement(PlannedGroup value, Context ctx, Collector dataFiles = Sets.newHashSet(writer.dataFiles()); value.group().setOutputFiles(dataFiles); + Snapshot snapshot = + value.branch() != null + ? value.table().snapshot(value.branch()) + : value.table().currentSnapshot(); out.collect( - new ExecutedGroup( - value.table().currentSnapshot().snapshotId(), - value.groupsPerCommit(), - value.group())); + new ExecutedGroup(snapshot.snapshotId(), value.groupsPerCommit(), value.group())); if (LOG.isDebugEnabled()) { LOG.debug( DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from {} to {}", diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 9f90d8fd352d..eaaf4ea6e4e3 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -769,7 +769,10 @@ IcebergSink build() { if (flinkWriteConf.compactMode()) { RewriteDataFilesConfig rewriteDataFilesConfig = flinkMaintenanceConfig.createRewriteDataFilesConfig(); - maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig)); + maintenanceTasks.add( + RewriteDataFiles.builder() + .branch(flinkWriteConf.branch()) + .config(rewriteDataFilesConfig)); } if (flinkWriteConf.expireSnapshotsMode()) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 707038c925d5..bb53b5265655 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -529,6 +529,41 @@ void testRewriteWithFilter() throws Exception { createRecord(4, "d"))); } + @Test + void testBranch() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + // Create branch based on above inserts + String branchName = "test-branch"; + table.manageSnapshots().createBranch(branchName).commit(); + + // Insert another file on main only (main has 3 files, branch stays at 2) + insert(table, 3, "c"); + + appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true).branch(branchName)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + table.refresh(); + + // Branch should be compacted from 2 files to 1 + assertThat( + table.snapshot(branchName).dataManifests(table.io()).stream() + .flatMap( + m -> + StreamSupport.stream( + ManifestFiles.read(m, table.io(), table.specs()).spliterator(), false)) + .count()) + .isEqualTo(1); + SimpleDataUtil.assertTableRecords( + table, ImmutableList.of(createRecord(1, "a"), createRecord(2, "b")), branchName); + + // Main should be untouched with 3 files + assertFileNum(table, 3, 0); + } + private void appendRewriteDataFiles() { appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true)); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java index 95992ccd979a..5676a5b06c20 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -55,7 +55,8 @@ static List planDataFileRewrite( 11, 10_000_000L, rewriterOptions, - Expressions.alwaysTrue()))) { + Expressions.alwaysTrue(), + null))) { testHarness.open(); OperatorTestBase.trigger(testHarness); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java index 9e8f2ec92162..2d740a13978c 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java @@ -225,7 +225,8 @@ private OneInputStreamOperatorTestHarness + testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new DataFileRewritePlanner( + OperatorTestBase.DUMMY_TABLE_NAME, + OperatorTestBase.DUMMY_TABLE_NAME, + 0, + tableLoader(), + 11, + 10_000_000L, + ImmutableMap.of(MIN_INPUT_FILES, "2"), + Expressions.alwaysTrue(), + branchName))) { + testHarness.open(); + + trigger(testHarness); + + assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull(); + List planned = testHarness.extractOutputValues(); + assertThat(planned).hasSize(1); + // Branch has 2 files, main has 3 + assertThat(planned.get(0).group().fileScanTasks()).hasSize(2); + assertThat(planned.get(0).branch()).isEqualTo(branchName); + } + } + void assertRewriteFileGroup( DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, Set files) { assertThat(plannedGroup.table().currentSnapshot().snapshotId()) diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 4e21c7a956e4..43f2e4010488 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -308,7 +308,8 @@ void testSplitSize() throws Exception { "2", TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSize)), - Expressions.alwaysTrue()))) { + Expressions.alwaysTrue(), + null))) { testHarness.open(); OperatorTestBase.trigger(testHarness); From febf1d72ba5488bc2fb7dbae6f50e3542bd28705 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 19 Mar 2026 10:57:50 +0100 Subject: [PATCH 2/3] fixup! Remove snapshot ref nullability --- .../maintenance/operator/DataFileRewriteCommitter.java | 1 + .../maintenance/operator/DataFileRewritePlanner.java | 2 +- .../maintenance/operator/DataFileRewriteRunner.java | 10 ++++------ .../flink/maintenance/operator/RewriteUtil.java | 3 ++- .../operator/TestDataFileRewriteCommitter.java | 3 ++- .../operator/TestDataFileRewritePlanner.java | 5 +++-- .../operator/TestDataFileRewriteRunner.java | 3 ++- 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java index a1882edb0415..1125e5d9b6a5 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java @@ -69,6 +69,7 @@ public DataFileRewriteCommitter( Preconditions.checkNotNull(tableName, "Table name should no be null"); Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(branch, "Branch should not be null"); this.tableName = tableName; this.taskName = taskName; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index f9a3c87ab726..7ca2284b68f1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -112,7 +112,7 @@ public void processElement(Trigger value, Context ctx, Collector o try { SerializableTable table = (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); - Snapshot snapshot = branch != null ? table.snapshot(branch) : table.currentSnapshot(); + Snapshot snapshot = table.snapshot(branch); if (snapshot == null) { LOG.info( DataFileRewritePlanner.MESSAGE_PREFIX + "Nothing to plan for in an empty table", diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java index ce459ed2cb49..6fbfacf9f6c9 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java @@ -35,7 +35,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.RewriteFileGroup; @@ -123,12 +122,11 @@ public void processElement(PlannedGroup value, Context ctx, Collector dataFiles = Sets.newHashSet(writer.dataFiles()); value.group().setOutputFiles(dataFiles); - Snapshot snapshot = - value.branch() != null - ? value.table().snapshot(value.branch()) - : value.table().currentSnapshot(); out.collect( - new ExecutedGroup(snapshot.snapshotId(), value.groupsPerCommit(), value.group())); + new ExecutedGroup( + value.table().snapshot(value.branch()).snapshotId(), + value.groupsPerCommit(), + value.group())); if (LOG.isDebugEnabled()) { LOG.debug( DataFileRewritePlanner.MESSAGE_PREFIX + "Rewritten files {} from {} to {}", diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java index 5676a5b06c20..9b19a50b092d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; import org.apache.iceberg.DataFile; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.TableLoader; @@ -56,7 +57,7 @@ static List planDataFileRewrite( 10_000_000L, rewriterOptions, Expressions.alwaysTrue(), - null))) { + SnapshotRef.MAIN_BRANCH))) { testHarness.open(); OperatorTestBase.trigger(testHarness); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java index 2d740a13978c..cdd40eb2a2bf 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteCommitter.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -226,7 +227,7 @@ private OneInputStreamOperatorTestHarness Date: Thu, 19 Mar 2026 11:18:04 +0100 Subject: [PATCH 3/3] fixup! Null check in DataFileRewritePlanner --- .../flink/maintenance/operator/DataFileRewritePlanner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index 7ca2284b68f1..a9360374df28 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -80,6 +80,7 @@ public DataFileRewritePlanner( Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); Preconditions.checkNotNull(rewriterOptions, "Options map should no be null"); + Preconditions.checkNotNull(branch, "Branch should no be null"); this.tableName = tableName; this.taskName = taskName;