Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ spark.read
| batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
| streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch |
| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit |
| streaming-max-rows-per-micro-batch | (none) | Optional long-valued "soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit |
| async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks |
| streaming-snapshot-polling-interval-ms | 30000 | Overrides the polling time for async planner to refresh and detect new snapshots. Only affects when async-micro-batch-planning-enabled is set |
| async-queue-preload-file-limit | 100 | Overrides the number of files loaded to background queue initially. Tune to prevent queue starvation. Only affects when async-micro-batch-planning-enabled is set |
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/spark-structured-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ val df = spark.readStream
To control the size of micro-batches in the DataFrame API, Iceberg supports two read options:

* `streaming-max-files-per-micro-batch` Maximum number of files to be processed in every micro-batch.
* `streaming-max-rows-per-micro-batch` A "soft max" on the number of rows to be processed in every micro-batch. A batch will always include all the rows in the next unprocessed data file but additional files will not be included if doing so would exceed the soft max limit.
* `streaming-max-rows-per-micro-batch` An optional, long-valued "soft max" on the number of rows to be processed in every micro-batch. A batch will always include all the rows in the next unprocessed data file but additional files will not be included if doing so would exceed the soft max limit. Large values can be used to let one streaming job process a substantial initial backlog before continuing with ongoing incremental micro-batches.

If both options are set, the micro-batch size will be limited by whichever option is reached first.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ public int maxFilesPerMicroBatch() {
.parse();
}

/**
* @deprecated use {@link #maxRecordsPerMicroBatchLong()} instead. This method cannot represent
* values greater than {@link Integer#MAX_VALUE}.
*/
@Deprecated
public int maxRecordsPerMicroBatch() {
return confParser
.intConf()
Expand All @@ -262,6 +267,14 @@ public int maxRecordsPerMicroBatch() {
.parse();
}

public long maxRecordsPerMicroBatchLong() {
return confParser
.longConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Long.MAX_VALUE)
.parse();
}

public boolean asyncMicroBatchPlanningEnabled() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements
StreamingOffset lastOffsetForTriggerAvailableNow) {
super(table, readConf);
this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
this.minQueuedRows = readConf().maxRecordsPerMicroBatchLong();
this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
this.queue = new LinkedBlockingDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
}

static class UnpackedLimits {
private long maxRows = Integer.MAX_VALUE;
private long maxRows = Long.MAX_VALUE;
private long maxFiles = Integer.MAX_VALUE;

UnpackedLimits(ReadLimit limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
private final StreamingOffset initialOffset;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final long maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
private SparkMicroBatchPlanner planner;
private StreamingOffset lastOffsetForTriggerAvailableNow;
Expand All @@ -102,7 +102,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
this.splitOpenFileCost = readConf.splitOpenFileCost();
this.fromTimestamp = readConf.streamFromTimestamp();
this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatchLong();
this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors();

InitialOffsetStore initialOffsetStore =
Expand Down Expand Up @@ -232,15 +232,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {

@Override
public ReadLimit getDefaultReadLimit() {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE
&& maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE && maxRecordsPerMicroBatch != Long.MAX_VALUE) {
ReadLimit[] readLimits = new ReadLimit[2];
readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
return ReadLimit.compositeLimit(readLimits);
} else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
return ReadLimit.maxFiles(maxFilesPerMicroBatch);
} else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
} else if (maxRecordsPerMicroBatch != Long.MAX_VALUE) {
return ReadLimit.maxRows(maxRecordsPerMicroBatch);
} else {
return ReadLimit.allAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,43 @@ public void testSplitParallelismRejectsNegative() {
.withMessageContaining("Split parallelism must be > 0");
});
}

@TestTemplate
public void testMaxRecordsPerMicroBatchRetainsLegacyIntValue() {
Table table = validationCatalog.loadTable(tableIdent);
int maxRecords = 1000;

SparkReadConf conf =
new SparkReadConf(
spark,
table,
ImmutableMap.of(
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords)));

assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(maxRecords);
}

@TestTemplate
public void testMaxRecordsPerMicroBatchLongUsesUnlimitedDefault() {
Table table = validationCatalog.loadTable(tableIdent);
SparkReadConf conf = new SparkReadConf(spark, table, ImmutableMap.of());

assertThat(conf.maxRecordsPerMicroBatch()).isEqualTo(Integer.MAX_VALUE);
assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(Long.MAX_VALUE);
}

@TestTemplate
public void testMaxRecordsPerMicroBatchAllowsLongValue() {
Table table = validationCatalog.loadTable(tableIdent);
long maxRecords = 3_000_000_000L;

SparkReadConf conf =
new SparkReadConf(
spark,
table,
ImmutableMap.of(
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, String.valueOf(maxRecords)));

assertThat(conf.maxRecordsPerMicroBatchLong()).isEqualTo(maxRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,30 @@ public void dropTable() {
public void testUnpackedLimitsCompositeChoosesMinimum() {
ReadLimit[] limits =
new ReadLimit[] {
ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), ReadLimit.maxFiles(2)
ReadLimit.maxRows(4_000_000_000L),
ReadLimit.maxRows(3_000_000_000L),
ReadLimit.maxFiles(8),
ReadLimit.maxFiles(2)
};

ReadLimit composite = ReadLimit.compositeLimit(limits);

BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
new BaseSparkMicroBatchPlanner.UnpackedLimits(composite);

assertThat(unpacked.getMaxRows()).isEqualTo(4);
assertThat(unpacked.getMaxRows()).isEqualTo(3_000_000_000L);
assertThat(unpacked.getMaxFiles()).isEqualTo(2);
}

@TestTemplate
public void testUnpackedLimitsDoNotApplyDefaultRowCap() {
BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
new BaseSparkMicroBatchPlanner.UnpackedLimits(ReadLimit.allAvailable());

assertThat(unpacked.getMaxRows()).isEqualTo(Long.MAX_VALUE);
assertThat(unpacked.getMaxFiles()).isEqualTo(Integer.MAX_VALUE);
}

@TestTemplate
public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() {
sql("INSERT INTO %s VALUES (1, 'one')", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -307,6 +308,61 @@ public void testReadStreamWithMaxRows4() throws Exception {
Trigger.AvailableNow());
}

@TestTemplate
public void testReadStreamWithNoRowLimitPlansMultiTrillionBackfill() throws Exception {
appendDataFilesWithRecordCounts(1_500_000_000_000L, 1_500_000_000_000L, 2_000_000_000_000L);

SparkMicroBatchStream stream =
newMicroBatchStream(ImmutableMap.of(), "multi-trillion-backfill-checkpoint");

try {
Offset startOffset = stream.initialOffset();
Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit());

assertThat(endOffset).isNotNull();
assertThat(((StreamingOffset) endOffset).position()).isEqualTo(3L);

List<FileScanTask> tasks = plannedTasks(stream, startOffset, endOffset);
assertThat(tasks).hasSize(3);
assertThat(plannedRecordCount(tasks)).isEqualTo(5_000_000_000_000L);
} finally {
stream.stop();
}
}

@TestTemplate
public void testReadStreamWithLongBackfillRowLimitUsesSoftLimit() throws Exception {
appendDataFilesWithRecordCounts(
1_000_000_000_000L, 1_000_000_000_000L, 1_500_000_000_000L, 1_000_000_000_000L);

SparkMicroBatchStream stream =
newMicroBatchStream(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "3000000000000"),
"long-backfill-row-limit-checkpoint");

try {
Offset startOffset = stream.initialOffset();
Offset firstEndOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit());

assertThat(firstEndOffset).isNotNull();
assertThat(((StreamingOffset) firstEndOffset).position()).isEqualTo(3L);

List<FileScanTask> firstBatchTasks = plannedTasks(stream, startOffset, firstEndOffset);
assertThat(firstBatchTasks).hasSize(3);
assertThat(plannedRecordCount(firstBatchTasks)).isEqualTo(3_500_000_000_000L);

Offset secondEndOffset = stream.latestOffset(firstEndOffset, stream.getDefaultReadLimit());
assertThat(secondEndOffset).isNotNull();
assertThat(((StreamingOffset) secondEndOffset).position()).isEqualTo(4L);

List<FileScanTask> secondBatchTasks = plannedTasks(stream, firstEndOffset, secondEndOffset);
assertThat(secondBatchTasks).hasSize(1);
assertThat(plannedRecordCount(secondBatchTasks)).isEqualTo(1_000_000_000_000L);
} finally {
stream.stop();
}
}

@TestTemplate
public void testReadStreamWithCompositeReadLimit() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
Expand Down Expand Up @@ -1114,6 +1170,24 @@ private void appendData(List<SimpleRecord> data, String format) {
.save(tableName);
}

private void appendDataFilesWithRecordCounts(long... recordCounts) throws IOException {
AppendFiles append = table.newFastAppend();

for (long recordCount : recordCounts) {
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(File.createTempFile("junit", null, temp.toFile()).getPath())
.withFileSizeInBytes(10)
.withRecordCount(recordCount)
.withFormat(FileFormat.PARQUET)
.build();

append.appendFile(dataFile);
}

append.commit();
}

private static final String MEMORY_TABLE = "_stream_view_mem";

private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
Expand Down Expand Up @@ -1202,4 +1276,27 @@ private SparkMicroBatchStream newMicroBatchStream(
table.schema(),
temp.resolve(checkpointDirName).toString());
}

private List<FileScanTask> plannedTasks(
SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) {
List<FileScanTask> tasks = Lists.newArrayList();

for (InputPartition partition : stream.planInputPartitions(startOffset, endOffset)) {
SparkInputPartition sparkInputPartition = (SparkInputPartition) partition;
for (FileScanTask task : sparkInputPartition.<FileScanTask>taskGroup().tasks()) {
tasks.add(task);
}
}

return tasks;
}

private long plannedRecordCount(List<FileScanTask> tasks) {
long recordCount = 0;
for (FileScanTask task : tasks) {
recordCount += task.file().recordCount();
}

return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ public int maxFilesPerMicroBatch() {
.parse();
}

/**
* @deprecated use {@link #maxRecordsPerMicroBatchLong()} instead. This method cannot represent
* values greater than {@link Integer#MAX_VALUE}.
*/
@Deprecated
public int maxRecordsPerMicroBatch() {
return confParser
.intConf()
Expand All @@ -262,6 +267,14 @@ public int maxRecordsPerMicroBatch() {
.parse();
}

public long maxRecordsPerMicroBatchLong() {
return confParser
.longConf()
.option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
.defaultValue(Long.MAX_VALUE)
.parse();
}

public boolean asyncMicroBatchPlanningEnabled() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements
StreamingOffset lastOffsetForTriggerAvailableNow) {
super(table, readConf);
this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
this.minQueuedRows = readConf().maxRecordsPerMicroBatchLong();
this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
this.queue = new LinkedBlockingDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
}

static class UnpackedLimits {
private long maxRows = Integer.MAX_VALUE;
private long maxRows = Long.MAX_VALUE;
private long maxFiles = Integer.MAX_VALUE;

UnpackedLimits(ReadLimit limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
private final StreamingOffset initialOffset;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final long maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
private SparkMicroBatchPlanner planner;
private StreamingOffset lastOffsetForTriggerAvailableNow;
Expand All @@ -102,7 +102,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA
this.splitOpenFileCost = readConf.splitOpenFileCost();
this.fromTimestamp = readConf.streamFromTimestamp();
this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatchLong();
this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors();

InitialOffsetStore initialOffsetStore =
Expand Down Expand Up @@ -232,15 +232,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {

@Override
public ReadLimit getDefaultReadLimit() {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE
&& maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE && maxRecordsPerMicroBatch != Long.MAX_VALUE) {
ReadLimit[] readLimits = new ReadLimit[2];
readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
return ReadLimit.compositeLimit(readLimits);
} else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
return ReadLimit.maxFiles(maxFilesPerMicroBatch);
} else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
} else if (maxRecordsPerMicroBatch != Long.MAX_VALUE) {
return ReadLimit.maxRows(maxRecordsPerMicroBatch);
} else {
return ReadLimit.allAvailable();
Expand Down
Loading