From 07d0b2bffe00997c270d2e39637f13453ed1daa3 Mon Sep 17 00:00:00 2001 From: Colin Reid Date: Mon, 25 May 2026 18:33:42 -0700 Subject: [PATCH 1/2] Spark: Support long-valued streaming max rows per micro-batch --- docs/docs/spark-configuration.md | 2 +- docs/docs/spark-structured-streaming.md | 2 +- .../apache/iceberg/spark/SparkReadConf.java | 6 +- .../source/BaseSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 7 +- .../iceberg/spark/TestSparkReadConf.java | 15 +++ .../source/TestMicroBatchPlanningUtils.java | 16 ++- .../source/TestStructuredStreamingRead3.java | 97 +++++++++++++++++++ .../apache/iceberg/spark/SparkReadConf.java | 6 +- .../source/BaseSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 7 +- .../iceberg/spark/TestSparkReadConf.java | 15 +++ .../source/TestMicroBatchPlanningUtils.java | 16 ++- .../source/TestStructuredStreamingRead3.java | 97 +++++++++++++++++++ .../apache/iceberg/spark/SparkReadConf.java | 6 +- .../source/BaseSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 7 +- .../iceberg/spark/TestSparkReadConf.java | 17 ++++ .../source/TestMicroBatchPlanningUtils.java | 16 ++- .../source/TestStructuredStreamingRead3.java | 97 +++++++++++++++++++ 20 files changed, 403 insertions(+), 32 deletions(-) diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md index 5972aafc3d39..54ff05281d0a 100644 --- a/docs/docs/spark-configuration.md +++ b/docs/docs/spark-configuration.md @@ -231,7 +231,7 @@ spark.read | batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size | | stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used | | streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch | -| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit | +| streaming-max-rows-per-micro-batch | (none) | Optional long-valued "soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit | | async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks | | streaming-snapshot-polling-interval-ms | 30000 | Overrides the polling time for async planner to refresh and detect new snapshots. Only affects when async-micro-batch-planning-enabled is set | | async-queue-preload-file-limit | 100 | Overrides the number of files loaded to background queue initially. Tune to prevent queue starvation. Only affects when async-micro-batch-planning-enabled is set | diff --git a/docs/docs/spark-structured-streaming.md b/docs/docs/spark-structured-streaming.md index 3313f8150b73..ba96a4bac8d5 100644 --- a/docs/docs/spark-structured-streaming.md +++ b/docs/docs/spark-structured-streaming.md @@ -40,7 +40,7 @@ val df = spark.readStream To control the size of micro-batches in the DataFrame API, Iceberg supports two read options: * `streaming-max-files-per-micro-batch` Maximum number of files to be processed in every micro-batch. -* `streaming-max-rows-per-micro-batch` A "soft max" on the number of rows to be processed in every micro-batch. A batch will always include all the rows in the next unprocessed data file but additional files will not be included if doing so would exceed the soft max limit. +* `streaming-max-rows-per-micro-batch` An optional, long-valued "soft max" on the number of rows to be processed in every micro-batch. A batch will always include all the rows in the next unprocessed data file but additional files will not be included if doing so would exceed the soft max limit. Large values can be used to let one streaming job process a substantial initial backlog before continuing with ongoing incremental micro-batches. If both options are set, the micro-batch size will be limited by whichever option is reached first. diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index a5de652f8047..333481f701c8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -254,11 +254,11 @@ public int maxFilesPerMicroBatch() { .parse(); } - public int maxRecordsPerMicroBatch() { + public long maxRecordsPerMicroBatch() { return confParser - .intConf() + .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) - .defaultValue(Integer.MAX_VALUE) + .defaultValue(Long.MAX_VALUE) .parse(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java index 9298c2bbdfcc..a131466475d0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -118,7 +118,7 @@ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) { } static class UnpackedLimits { - private long maxRows = Integer.MAX_VALUE; + private long maxRows = Long.MAX_VALUE; private long maxFiles = Integer.MAX_VALUE; UnpackedLimits(ReadLimit limit) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index a1ff767fe2a0..1e1c79a41086 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -76,7 +76,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final StreamingOffset initialOffset; private final long fromTimestamp; private final int maxFilesPerMicroBatch; - private final int maxRecordsPerMicroBatch; + private final long maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; @@ -232,15 +232,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { @Override public ReadLimit getDefaultReadLimit() { - if (maxFilesPerMicroBatch != Integer.MAX_VALUE - && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + if (maxFilesPerMicroBatch != Integer.MAX_VALUE && maxRecordsPerMicroBatch != Long.MAX_VALUE) { ReadLimit[] readLimits = new ReadLimit[2]; readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch); return ReadLimit.compositeLimit(readLimits); } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { return ReadLimit.maxFiles(maxFilesPerMicroBatch); - } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + } else if (maxRecordsPerMicroBatch != Long.MAX_VALUE) { return ReadLimit.maxRows(maxRecordsPerMicroBatch); } else { return ReadLimit.allAvailable(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index c3fc69c8b25c..c8051b6b464d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -92,4 +92,19 @@ public void testSplitParallelismRejectsNegative() { .withMessageContaining("Split parallelism must be > 0"); }); } + + @TestTemplate + public void testMaxRecordsPerMicroBatchAllowsLongValue() { + Table table = validationCatalog.loadTable(tableIdent); + long maxRecords = 3_000_000_000L; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java index a9ce340fd4ec..9d1728ebd26d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java @@ -56,7 +56,10 @@ public void dropTable() { public void testUnpackedLimitsCompositeChoosesMinimum() { ReadLimit[] limits = new ReadLimit[] { - ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), ReadLimit.maxFiles(2) + ReadLimit.maxRows(4_000_000_000L), + ReadLimit.maxRows(3_000_000_000L), + ReadLimit.maxFiles(8), + ReadLimit.maxFiles(2) }; ReadLimit composite = ReadLimit.compositeLimit(limits); @@ -64,10 +67,19 @@ public void testUnpackedLimitsCompositeChoosesMinimum() { BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = new BaseSparkMicroBatchPlanner.UnpackedLimits(composite); - assertThat(unpacked.getMaxRows()).isEqualTo(4); + assertThat(unpacked.getMaxRows()).isEqualTo(3_000_000_000L); assertThat(unpacked.getMaxFiles()).isEqualTo(2); } + @TestTemplate + public void testUnpackedLimitsDoNotApplyDefaultRowCap() { + BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = + new BaseSparkMicroBatchPlanner.UnpackedLimits(ReadLimit.allAvailable()); + + assertThat(unpacked.getMaxRows()).isEqualTo(Long.MAX_VALUE); + assertThat(unpacked.getMaxFiles()).isEqualTo(Integer.MAX_VALUE); + } + @TestTemplate public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() { sql("INSERT INTO %s VALUES (1, 'one')", tableName); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index d97e6ec00d7f..f9f2d2d9a27e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; @@ -307,6 +308,61 @@ public void testReadStreamWithMaxRows4() throws Exception { Trigger.AvailableNow()); } + @TestTemplate + public void testReadStreamWithNoRowLimitPlansMultiTrillionBackfill() throws Exception { + appendDataFilesWithRecordCounts(1_500_000_000_000L, 1_500_000_000_000L, 2_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream(ImmutableMap.of(), "multi-trillion-backfill-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(endOffset).isNotNull(); + assertThat(((StreamingOffset) endOffset).position()).isEqualTo(3L); + + List tasks = plannedTasks(stream, startOffset, endOffset); + assertThat(tasks).hasSize(3); + assertThat(plannedRecordCount(tasks)).isEqualTo(5_000_000_000_000L); + } finally { + stream.stop(); + } + } + + @TestTemplate + public void testReadStreamWithLongBackfillRowLimitUsesSoftLimit() throws Exception { + appendDataFilesWithRecordCounts( + 1_000_000_000_000L, 1_000_000_000_000L, 1_500_000_000_000L, 1_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "3000000000000"), + "long-backfill-row-limit-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset firstEndOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(firstEndOffset).isNotNull(); + assertThat(((StreamingOffset) firstEndOffset).position()).isEqualTo(3L); + + List firstBatchTasks = plannedTasks(stream, startOffset, firstEndOffset); + assertThat(firstBatchTasks).hasSize(3); + assertThat(plannedRecordCount(firstBatchTasks)).isEqualTo(3_500_000_000_000L); + + Offset secondEndOffset = stream.latestOffset(firstEndOffset, stream.getDefaultReadLimit()); + assertThat(secondEndOffset).isNotNull(); + assertThat(((StreamingOffset) secondEndOffset).position()).isEqualTo(4L); + + List secondBatchTasks = plannedTasks(stream, firstEndOffset, secondEndOffset); + assertThat(secondBatchTasks).hasSize(1); + assertThat(plannedRecordCount(secondBatchTasks)).isEqualTo(1_000_000_000_000L); + } finally { + stream.stop(); + } + } + @TestTemplate public void testReadStreamWithCompositeReadLimit() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); @@ -1114,6 +1170,24 @@ private void appendData(List data, String format) { .save(tableName); } + private void appendDataFilesWithRecordCounts(long... recordCounts) throws IOException { + AppendFiles append = table.newFastAppend(); + + for (long recordCount : recordCounts) { + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(recordCount) + .withFormat(FileFormat.PARQUET) + .build(); + + append.appendFile(dataFile); + } + + append.commit(); + } + private static final String MEMORY_TABLE = "_stream_view_mem"; private StreamingQuery startStream(Map options) throws TimeoutException { @@ -1202,4 +1276,27 @@ private SparkMicroBatchStream newMicroBatchStream( table.schema(), temp.resolve(checkpointDirName).toString()); } + + private List plannedTasks( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) { + List tasks = Lists.newArrayList(); + + for (InputPartition partition : stream.planInputPartitions(startOffset, endOffset)) { + SparkInputPartition sparkInputPartition = (SparkInputPartition) partition; + for (FileScanTask task : sparkInputPartition.taskGroup().tasks()) { + tasks.add(task); + } + } + + return tasks; + } + + private long plannedRecordCount(List tasks) { + long recordCount = 0; + for (FileScanTask task : tasks) { + recordCount += task.file().recordCount(); + } + + return recordCount; + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index da1093627f8b..3dbbdd7623a9 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -254,11 +254,11 @@ public int maxFilesPerMicroBatch() { .parse(); } - public int maxRecordsPerMicroBatch() { + public long maxRecordsPerMicroBatch() { return confParser - .intConf() + .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) - .defaultValue(Integer.MAX_VALUE) + .defaultValue(Long.MAX_VALUE) .parse(); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java index 9298c2bbdfcc..a131466475d0 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -118,7 +118,7 @@ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) { } static class UnpackedLimits { - private long maxRows = Integer.MAX_VALUE; + private long maxRows = Long.MAX_VALUE; private long maxFiles = Integer.MAX_VALUE; UnpackedLimits(ReadLimit limit) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index a1ff767fe2a0..1e1c79a41086 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -76,7 +76,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final StreamingOffset initialOffset; private final long fromTimestamp; private final int maxFilesPerMicroBatch; - private final int maxRecordsPerMicroBatch; + private final long maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; @@ -232,15 +232,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { @Override public ReadLimit getDefaultReadLimit() { - if (maxFilesPerMicroBatch != Integer.MAX_VALUE - && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + if (maxFilesPerMicroBatch != Integer.MAX_VALUE && maxRecordsPerMicroBatch != Long.MAX_VALUE) { ReadLimit[] readLimits = new ReadLimit[2]; readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch); return ReadLimit.compositeLimit(readLimits); } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { return ReadLimit.maxFiles(maxFilesPerMicroBatch); - } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + } else if (maxRecordsPerMicroBatch != Long.MAX_VALUE) { return ReadLimit.maxRows(maxRecordsPerMicroBatch); } else { return ReadLimit.allAvailable(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index c3fc69c8b25c..c8051b6b464d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -92,4 +92,19 @@ public void testSplitParallelismRejectsNegative() { .withMessageContaining("Split parallelism must be > 0"); }); } + + @TestTemplate + public void testMaxRecordsPerMicroBatchAllowsLongValue() { + Table table = validationCatalog.loadTable(tableIdent); + long maxRecords = 3_000_000_000L; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java index a9ce340fd4ec..9d1728ebd26d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java @@ -56,7 +56,10 @@ public void dropTable() { public void testUnpackedLimitsCompositeChoosesMinimum() { ReadLimit[] limits = new ReadLimit[] { - ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), ReadLimit.maxFiles(2) + ReadLimit.maxRows(4_000_000_000L), + ReadLimit.maxRows(3_000_000_000L), + ReadLimit.maxFiles(8), + ReadLimit.maxFiles(2) }; ReadLimit composite = ReadLimit.compositeLimit(limits); @@ -64,10 +67,19 @@ public void testUnpackedLimitsCompositeChoosesMinimum() { BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = new BaseSparkMicroBatchPlanner.UnpackedLimits(composite); - assertThat(unpacked.getMaxRows()).isEqualTo(4); + assertThat(unpacked.getMaxRows()).isEqualTo(3_000_000_000L); assertThat(unpacked.getMaxFiles()).isEqualTo(2); } + @TestTemplate + public void testUnpackedLimitsDoNotApplyDefaultRowCap() { + BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = + new BaseSparkMicroBatchPlanner.UnpackedLimits(ReadLimit.allAvailable()); + + assertThat(unpacked.getMaxRows()).isEqualTo(Long.MAX_VALUE); + assertThat(unpacked.getMaxFiles()).isEqualTo(Integer.MAX_VALUE); + } + @TestTemplate public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() { sql("INSERT INTO %s VALUES (1, 'one')", tableName); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 5f9b460f3707..d5c5cd1e738a 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; @@ -313,6 +314,61 @@ public void testReadStreamWithMaxRows4() throws Exception { Trigger.AvailableNow()); } + @TestTemplate + public void testReadStreamWithNoRowLimitPlansMultiTrillionBackfill() throws Exception { + appendDataFilesWithRecordCounts(1_500_000_000_000L, 1_500_000_000_000L, 2_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream(ImmutableMap.of(), "multi-trillion-backfill-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(endOffset).isNotNull(); + assertThat(((StreamingOffset) endOffset).position()).isEqualTo(3L); + + List tasks = plannedTasks(stream, startOffset, endOffset); + assertThat(tasks).hasSize(3); + assertThat(plannedRecordCount(tasks)).isEqualTo(5_000_000_000_000L); + } finally { + stream.stop(); + } + } + + @TestTemplate + public void testReadStreamWithLongBackfillRowLimitUsesSoftLimit() throws Exception { + appendDataFilesWithRecordCounts( + 1_000_000_000_000L, 1_000_000_000_000L, 1_500_000_000_000L, 1_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "3000000000000"), + "long-backfill-row-limit-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset firstEndOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(firstEndOffset).isNotNull(); + assertThat(((StreamingOffset) firstEndOffset).position()).isEqualTo(3L); + + List firstBatchTasks = plannedTasks(stream, startOffset, firstEndOffset); + assertThat(firstBatchTasks).hasSize(3); + assertThat(plannedRecordCount(firstBatchTasks)).isEqualTo(3_500_000_000_000L); + + Offset secondEndOffset = stream.latestOffset(firstEndOffset, stream.getDefaultReadLimit()); + assertThat(secondEndOffset).isNotNull(); + assertThat(((StreamingOffset) secondEndOffset).position()).isEqualTo(4L); + + List secondBatchTasks = plannedTasks(stream, firstEndOffset, secondEndOffset); + assertThat(secondBatchTasks).hasSize(1); + assertThat(plannedRecordCount(secondBatchTasks)).isEqualTo(1_000_000_000_000L); + } finally { + stream.stop(); + } + } + @TestTemplate public void testReadStreamWithCompositeReadLimit() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); @@ -1126,6 +1182,24 @@ private void appendData(List data, String format) { .save(tableName); } + private void appendDataFilesWithRecordCounts(long... recordCounts) throws IOException { + AppendFiles append = table.newFastAppend(); + + for (long recordCount : recordCounts) { + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(recordCount) + .withFormat(FileFormat.PARQUET) + .build(); + + append.appendFile(dataFile); + } + + append.commit(); + } + private static final String MEMORY_TABLE = "_stream_view_mem"; private StreamingQuery startStream(Map options) throws TimeoutException { @@ -1213,4 +1287,27 @@ private SparkMicroBatchStream newMicroBatchStream( table.schema(), temp.resolve(checkpointDirName).toString()); } + + private List plannedTasks( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) { + List tasks = Lists.newArrayList(); + + for (InputPartition partition : stream.planInputPartitions(startOffset, endOffset)) { + SparkInputPartition sparkInputPartition = (SparkInputPartition) partition; + for (FileScanTask task : sparkInputPartition.taskGroup().tasks()) { + tasks.add(task); + } + } + + return tasks; + } + + private long plannedRecordCount(List tasks) { + long recordCount = 0; + for (FileScanTask task : tasks) { + recordCount += task.file().recordCount(); + } + + return recordCount; + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 8128babfa340..7b314488c5ab 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -218,11 +218,11 @@ public int maxFilesPerMicroBatch() { .parse(); } - public int maxRecordsPerMicroBatch() { + public long maxRecordsPerMicroBatch() { return confParser - .intConf() + .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) - .defaultValue(Integer.MAX_VALUE) + .defaultValue(Long.MAX_VALUE) .parse(); } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java index 9298c2bbdfcc..a131466475d0 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -118,7 +118,7 @@ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) { } static class UnpackedLimits { - private long maxRows = Integer.MAX_VALUE; + private long maxRows = Long.MAX_VALUE; private long maxFiles = Integer.MAX_VALUE; UnpackedLimits(ReadLimit limit) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 7adf3c633cd0..0102bfc05c0b 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -75,7 +75,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final StreamingOffset initialOffset; private final long fromTimestamp; private final int maxFilesPerMicroBatch; - private final int maxRecordsPerMicroBatch; + private final long maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; @@ -233,15 +233,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { @Override public ReadLimit getDefaultReadLimit() { - if (maxFilesPerMicroBatch != Integer.MAX_VALUE - && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + if (maxFilesPerMicroBatch != Integer.MAX_VALUE && maxRecordsPerMicroBatch != Long.MAX_VALUE) { ReadLimit[] readLimits = new ReadLimit[2]; readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch); return ReadLimit.compositeLimit(readLimits); } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { return ReadLimit.maxFiles(maxFilesPerMicroBatch); - } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) { + } else if (maxRecordsPerMicroBatch != Long.MAX_VALUE) { return ReadLimit.maxRows(maxRecordsPerMicroBatch); } else { return ReadLimit.allAvailable(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index c3fc69c8b25c..d5876d964db9 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -92,4 +92,21 @@ public void testSplitParallelismRejectsNegative() { .withMessageContaining("Split parallelism must be > 0"); }); } + + @TestTemplate + public void testMaxRecordsPerMicroBatchAllowsLongValue() { + Table table = validationCatalog.loadTable(tableIdent); + long maxRecords = 3_000_000_000L; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + new CaseInsensitiveStringMap( + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, + String.valueOf(maxRecords)))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java index a9ce340fd4ec..9d1728ebd26d 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java @@ -56,7 +56,10 @@ public void dropTable() { public void testUnpackedLimitsCompositeChoosesMinimum() { ReadLimit[] limits = new ReadLimit[] { - ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), ReadLimit.maxFiles(2) + ReadLimit.maxRows(4_000_000_000L), + ReadLimit.maxRows(3_000_000_000L), + ReadLimit.maxFiles(8), + ReadLimit.maxFiles(2) }; ReadLimit composite = ReadLimit.compositeLimit(limits); @@ -64,10 +67,19 @@ public void testUnpackedLimitsCompositeChoosesMinimum() { BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = new BaseSparkMicroBatchPlanner.UnpackedLimits(composite); - assertThat(unpacked.getMaxRows()).isEqualTo(4); + assertThat(unpacked.getMaxRows()).isEqualTo(3_000_000_000L); assertThat(unpacked.getMaxFiles()).isEqualTo(2); } + @TestTemplate + public void testUnpackedLimitsDoNotApplyDefaultRowCap() { + BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = + new BaseSparkMicroBatchPlanner.UnpackedLimits(ReadLimit.allAvailable()); + + assertThat(unpacked.getMaxRows()).isEqualTo(Long.MAX_VALUE); + assertThat(unpacked.getMaxFiles()).isEqualTo(Integer.MAX_VALUE); + } + @TestTemplate public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() { sql("INSERT INTO %s VALUES (1, 'one')", tableName); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 3957872be721..3578e8ec30e9 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; @@ -314,6 +315,61 @@ public void testReadStreamWithMaxRows4() throws Exception { Trigger.AvailableNow()); } + @TestTemplate + public void testReadStreamWithNoRowLimitPlansMultiTrillionBackfill() throws Exception { + appendDataFilesWithRecordCounts(1_500_000_000_000L, 1_500_000_000_000L, 2_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream(ImmutableMap.of(), "multi-trillion-backfill-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(endOffset).isNotNull(); + assertThat(((StreamingOffset) endOffset).position()).isEqualTo(3L); + + List tasks = plannedTasks(stream, startOffset, endOffset); + assertThat(tasks).hasSize(3); + assertThat(plannedRecordCount(tasks)).isEqualTo(5_000_000_000_000L); + } finally { + stream.stop(); + } + } + + @TestTemplate + public void testReadStreamWithLongBackfillRowLimitUsesSoftLimit() throws Exception { + appendDataFilesWithRecordCounts( + 1_000_000_000_000L, 1_000_000_000_000L, 1_500_000_000_000L, 1_000_000_000_000L); + + SparkMicroBatchStream stream = + newMicroBatchStream( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "3000000000000"), + "long-backfill-row-limit-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset firstEndOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(firstEndOffset).isNotNull(); + assertThat(((StreamingOffset) firstEndOffset).position()).isEqualTo(3L); + + List firstBatchTasks = plannedTasks(stream, startOffset, firstEndOffset); + assertThat(firstBatchTasks).hasSize(3); + assertThat(plannedRecordCount(firstBatchTasks)).isEqualTo(3_500_000_000_000L); + + Offset secondEndOffset = stream.latestOffset(firstEndOffset, stream.getDefaultReadLimit()); + assertThat(secondEndOffset).isNotNull(); + assertThat(((StreamingOffset) secondEndOffset).position()).isEqualTo(4L); + + List secondBatchTasks = plannedTasks(stream, firstEndOffset, secondEndOffset); + assertThat(secondBatchTasks).hasSize(1); + assertThat(plannedRecordCount(secondBatchTasks)).isEqualTo(1_000_000_000_000L); + } finally { + stream.stop(); + } + } + @TestTemplate public void testReadStreamWithCompositeReadLimit() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); @@ -1127,6 +1183,24 @@ private void appendData(List data, String format) { .save(tableName); } + private void appendDataFilesWithRecordCounts(long... recordCounts) throws IOException { + AppendFiles append = table.newFastAppend(); + + for (long recordCount : recordCounts) { + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) + .withFileSizeInBytes(10) + .withRecordCount(recordCount) + .withFormat(FileFormat.PARQUET) + .build(); + + append.appendFile(dataFile); + } + + append.commit(); + } + private static final String MEMORY_TABLE = "_stream_view_mem"; private StreamingQuery startStream(Map options) throws TimeoutException { @@ -1214,4 +1288,27 @@ private SparkMicroBatchStream newMicroBatchStream( table.schema(), temp.resolve(checkpointDirName).toString()); } + + private List plannedTasks( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) { + List tasks = Lists.newArrayList(); + + for (InputPartition partition : stream.planInputPartitions(startOffset, endOffset)) { + SparkInputPartition sparkInputPartition = (SparkInputPartition) partition; + for (FileScanTask task : sparkInputPartition.taskGroup().tasks()) { + tasks.add(task); + } + } + + return tasks; + } + + private long plannedRecordCount(List tasks) { + long recordCount = 0; + for (FileScanTask task : tasks) { + recordCount += task.file().recordCount(); + } + + return recordCount; + } } From d84c1c693569b51d73ca2bf9a2a30d6326c4dadc Mon Sep 17 00:00:00 2001 From: Colin Reid Date: Tue, 26 May 2026 13:06:08 -0700 Subject: [PATCH 2/2] Spark: Preserve legacy streaming row-limit accessor --- .../apache/iceberg/spark/SparkReadConf.java | 15 +++++++++- .../source/AsyncSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 2 +- .../iceberg/spark/TestSparkReadConf.java | 26 ++++++++++++++++- .../apache/iceberg/spark/SparkReadConf.java | 15 +++++++++- .../source/AsyncSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 2 +- .../iceberg/spark/TestSparkReadConf.java | 26 ++++++++++++++++- .../apache/iceberg/spark/SparkReadConf.java | 15 +++++++++- .../source/AsyncSparkMicroBatchPlanner.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 2 +- .../iceberg/spark/TestSparkReadConf.java | 28 ++++++++++++++++++- 12 files changed, 125 insertions(+), 12 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 333481f701c8..a9c233c5b235 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -254,7 +254,20 @@ public int maxFilesPerMicroBatch() { .parse(); } - public long maxRecordsPerMicroBatch() { + /** + * @deprecated use {@link #maxRecordsPerMicroBatchLong()} instead. This method cannot represent + * values greater than {@link Integer#MAX_VALUE}. + */ + @Deprecated + public int maxRecordsPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + + public long maxRecordsPerMicroBatchLong() { return confParser .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java index 3e442f9917d4..2d38d638ebeb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -87,7 +87,7 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements StreamingOffset lastOffsetForTriggerAvailableNow) { super(table, readConf); this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); - this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); + this.minQueuedRows = readConf().maxRecordsPerMicroBatchLong(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build(); this.queue = new LinkedBlockingDeque<>(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 1e1c79a41086..fb2f3641960c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -102,7 +102,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.splitOpenFileCost = readConf.splitOpenFileCost(); this.fromTimestamp = readConf.streamFromTimestamp(); this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); - this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatchLong(); this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); InitialOffsetStore initialOffsetStore = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index c8051b6b464d..7a100c58e9bd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -93,6 +93,30 @@ public void testSplitParallelismRejectsNegative() { }); } + @TestTemplate + public void testMaxRecordsPerMicroBatchRetainsLegacyIntValue() { + Table table = validationCatalog.loadTable(tableIdent); + int maxRecords = 1000; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } + + @TestTemplate + public void testMaxRecordsPerMicroBatchLongUsesUnlimitedDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkReadConf conf = new SparkReadConf(spark, table, ImmutableMap.of()); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(Integer.MAX_VALUE); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(Long.MAX_VALUE); + } + @TestTemplate public void testMaxRecordsPerMicroBatchAllowsLongValue() { Table table = validationCatalog.loadTable(tableIdent); @@ -105,6 +129,6 @@ public void testMaxRecordsPerMicroBatchAllowsLongValue() { ImmutableMap.of( SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); - assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(maxRecords); } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 3dbbdd7623a9..ab574784c856 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -254,7 +254,20 @@ public int maxFilesPerMicroBatch() { .parse(); } - public long maxRecordsPerMicroBatch() { + /** + * @deprecated use {@link #maxRecordsPerMicroBatchLong()} instead. This method cannot represent + * values greater than {@link Integer#MAX_VALUE}. + */ + @Deprecated + public int maxRecordsPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + + public long maxRecordsPerMicroBatchLong() { return confParser .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java index 3e442f9917d4..2d38d638ebeb 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -87,7 +87,7 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements StreamingOffset lastOffsetForTriggerAvailableNow) { super(table, readConf); this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); - this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); + this.minQueuedRows = readConf().maxRecordsPerMicroBatchLong(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build(); this.queue = new LinkedBlockingDeque<>(); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 1e1c79a41086..fb2f3641960c 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -102,7 +102,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.splitOpenFileCost = readConf.splitOpenFileCost(); this.fromTimestamp = readConf.streamFromTimestamp(); this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); - this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatchLong(); this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); InitialOffsetStore initialOffsetStore = diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index c8051b6b464d..7a100c58e9bd 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -93,6 +93,30 @@ public void testSplitParallelismRejectsNegative() { }); } + @TestTemplate + public void testMaxRecordsPerMicroBatchRetainsLegacyIntValue() { + Table table = validationCatalog.loadTable(tableIdent); + int maxRecords = 1000; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } + + @TestTemplate + public void testMaxRecordsPerMicroBatchLongUsesUnlimitedDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkReadConf conf = new SparkReadConf(spark, table, ImmutableMap.of()); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(Integer.MAX_VALUE); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(Long.MAX_VALUE); + } + @TestTemplate public void testMaxRecordsPerMicroBatchAllowsLongValue() { Table table = validationCatalog.loadTable(tableIdent); @@ -105,6 +129,6 @@ public void testMaxRecordsPerMicroBatchAllowsLongValue() { ImmutableMap.of( SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords))); - assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(maxRecords); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 7b314488c5ab..5b562d64c0b5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -218,7 +218,20 @@ public int maxFilesPerMicroBatch() { .parse(); } - public long maxRecordsPerMicroBatch() { + /** + * @deprecated use {@link #maxRecordsPerMicroBatchLong()} instead. This method cannot represent + * values greater than {@link Integer#MAX_VALUE}. + */ + @Deprecated + public int maxRecordsPerMicroBatch() { + return confParser + .intConf() + .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) + .defaultValue(Integer.MAX_VALUE) + .parse(); + } + + public long maxRecordsPerMicroBatchLong() { return confParser .longConf() .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java index 3e442f9917d4..2d38d638ebeb 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -87,7 +87,7 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements StreamingOffset lastOffsetForTriggerAvailableNow) { super(table, readConf); this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); - this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); + this.minQueuedRows = readConf().maxRecordsPerMicroBatchLong(); this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build(); this.queue = new LinkedBlockingDeque<>(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 0102bfc05c0b..5a49757709d2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -100,7 +100,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA this.splitOpenFileCost = readConf.splitOpenFileCost(); this.fromTimestamp = readConf.streamFromTimestamp(); this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); - this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); + this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatchLong(); this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); InitialOffsetStore initialOffsetStore = diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java index d5876d964db9..ff91519c3cc4 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java @@ -93,6 +93,32 @@ public void testSplitParallelismRejectsNegative() { }); } + @TestTemplate + public void testMaxRecordsPerMicroBatchRetainsLegacyIntValue() { + Table table = validationCatalog.loadTable(tableIdent); + int maxRecords = 1000; + + SparkReadConf conf = + new SparkReadConf( + spark, + table, + new CaseInsensitiveStringMap( + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, + String.valueOf(maxRecords)))); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + } + + @TestTemplate + public void testMaxRecordsPerMicroBatchLongUsesUnlimitedDefault() { + Table table = validationCatalog.loadTable(tableIdent); + SparkReadConf conf = new SparkReadConf(spark, table, CaseInsensitiveStringMap.empty()); + + assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(Integer.MAX_VALUE); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(Long.MAX_VALUE); + } + @TestTemplate public void testMaxRecordsPerMicroBatchAllowsLongValue() { Table table = validationCatalog.loadTable(tableIdent); @@ -107,6 +133,6 @@ public void testMaxRecordsPerMicroBatchAllowsLongValue() { SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords)))); - assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords); + assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(maxRecords); } }