diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index b38c041507bb..61b1db160457 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -261,6 +261,39 @@ public int maxRecordsPerMicroBatch() { .parse(); } + public boolean asyncMicroBatchPlanningEnabled() { + return confParser + .booleanConf() + .option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED) + .sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED) + .defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT) + .parse(); + } + + public long streamingSnapshotPollingIntervalMs() { + return confParser + .longConf() + .option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS) + .defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT) + .parse(); + } + + public long asyncQueuePreloadFileLimit() { + return confParser + .longConf() + .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT) + .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT) + .parse(); + } + + public long asyncQueuePreloadRowLimit() { + return confParser + .longConf() + .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT) + .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT) + .parse(); + } + public boolean preserveDataGrouping() { return confParser .booleanConf() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69b8..5262310e2c5e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -87,6 +87,21 @@ private SparkReadOptions() {} public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH = "streaming-max-rows-per-micro-batch"; + // Enable async micro batch planning + public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED = + "async-micro-batch-planning-enabled"; + + // Polling interval for async planner to refresh table metadata (ms) + public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS = + "streaming-snapshot-polling-interval-ms"; + public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 30000L; + + // Initial queue preload limits for async micro batch planner + public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = "async-queue-preload-file-limit"; + public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L; + public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = "async-queue-preload-row-limit"; + public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L; + // Table path public static final String PATH = "path"; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index e3ee288affbe..74adb0bc95da 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -103,4 +103,9 @@ private SparkSQLProperties() {} // Controls whether to report available column statistics to Spark for query optimization. public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; + + // Controls whether to enable async micro batch planning for session + public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED = + "spark.sql.iceberg.async-micro-batch-planning-enabled"; + public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java new file mode 100644 index 000000000000..3e442f9917d4 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncSparkMicroBatchPlanner.class); + private static final int PLAN_FILES_CACHE_MAX_SIZE = 10; + private static final long QUEUE_POLL_TIMEOUT_MS = 100L; // 100 ms + + private final long minQueuedFiles; + private final long minQueuedRows; + + // Cache for planFiles results to handle duplicate calls + private final Cache, List> planFilesCache; + + // Queue to buffer pre-fetched file scan tasks + private final LinkedBlockingDeque> queue; + + // Background executor for async operations + private final ScheduledExecutorService executor; + + // Error tracking + private volatile Throwable refreshFailedThrowable; + private volatile Throwable fillQueueFailedThrowable; + + // Tracking queue state + private final AtomicLong queuedFileCount = new AtomicLong(0); + private final AtomicLong queuedRowCount = new AtomicLong(0); + private Snapshot lastQueuedSnapshot; + private boolean stopped; + + // Cap for Trigger.AvailableNow - don't process beyond this offset + private final StreamingOffset lastOffsetForTriggerAvailableNow; + + /** + * This class manages a queue of FileScanTask + StreamingOffset. On creation, it starts up an + * asynchronous polling process which populates the queue when a new snapshot arrives or the + * minimum amount of queued data is too low. + * + *

Note: this will capture the state of the table when snapshots are added to the queue. If a + * snapshot is expired after being added to the queue, the job will still process it. + */ + AsyncSparkMicroBatchPlanner( + Table table, + SparkReadConf readConf, + StreamingOffset initialOffset, + StreamingOffset maybeEndOffset, + StreamingOffset lastOffsetForTriggerAvailableNow) { + super(table, readConf); + this.minQueuedFiles = readConf().maxFilesPerMicroBatch(); + this.minQueuedRows = readConf().maxRecordsPerMicroBatch(); + this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; + this.planFilesCache = Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build(); + this.queue = new LinkedBlockingDeque<>(); + + table().refresh(); + + // Synchronously add data to the queue to meet our initial constraints. + // For Trigger.AvailableNow, constructor-time preload is normally initialized from + // latestOffset(...) with no explicit end offset, so bounded preload must stop at + // Trigger.AvailableNow snapshot. + fillQueue(initialOffset, maybeEndOffset); + + this.executor = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread thread = new Thread(r, "iceberg-async-planner-" + table().name()); + thread.setDaemon(true); + return thread; + }); + // Schedule table refresh at configured interval + long pollingIntervalMs = readConf().streamingSnapshotPollingIntervalMs(); + this.executor.scheduleWithFixedDelay( + this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs, TimeUnit.MILLISECONDS); + // Schedule queue fill to run frequently (use polling interval for tests, cap at 100ms for + // production) + long queueFillIntervalMs = Math.min(QUEUE_POLL_TIMEOUT_MS, pollingIntervalMs); + executor.scheduleWithFixedDelay( + () -> fillQueueAndTrapException(lastQueuedSnapshot), + 0, + queueFillIntervalMs, + TimeUnit.MILLISECONDS); + + LOG.info( + "Started AsyncSparkMicroBatchPlanner for {} from initialOffset: {}", + table().name(), + initialOffset); + } + + @Override + public synchronized void stop() { + Preconditions.checkArgument( + !stopped, "AsyncSparkMicroBatchPlanner for {} was already stopped", table().name()); + stopped = true; + LOG.info("Stopping AsyncSparkMicroBatchPlanner for table: {}", table().name()); + executor.shutdownNow(); + boolean terminated = false; + try { + terminated = + executor.awaitTermination( + readConf().streamingSnapshotPollingIntervalMs() * 2, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + // Restore interrupt status + Thread.currentThread().interrupt(); + } + LOG.info("AsyncSparkMicroBatchPlanner for table: {}, stopped: {}", table().name(), terminated); + } + + @Override + public void close() { + stop(); + } + + /** + * Spark can call this multiple times; it should produce the same answer every time. + * + * @param startOffset the starting offset of this microbatch, position is inclusive + * @param endOffset the end offset of this microbatch, position is exclusive + * @return the list of files to scan between these offsets + */ + @Override + public synchronized List planFiles( + StreamingOffset startOffset, StreamingOffset endOffset) { + return planFilesCache.get( + Pair.of(startOffset, endOffset), + key -> { + LOG.info( + "running planFiles for {}, startOffset: {}, endOffset: {}", + table().name(), + startOffset, + endOffset); + List result = new LinkedList<>(); + Pair elem; + StreamingOffset currentOffset; + boolean shouldTerminate = false; + long filesInPlan = 0; + long rowsInPlan = 0; + + do { + try { + elem = queue.pollFirst(QUEUE_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while polling queue", e); + } + + if (elem != null) { + currentOffset = elem.first(); + LOG.debug("planFiles consumed: {}", currentOffset); + FileScanTask currentTask = elem.second(); + filesInPlan += 1; + long elemRows = currentTask.file().recordCount(); + rowsInPlan += elemRows; + queuedFileCount.decrementAndGet(); + queuedRowCount.addAndGet(-elemRows); + result.add(currentTask); + + // try to peek at the next entry of the queue and see if we should stop + Pair nextElem = queue.peekFirst(); + boolean endOffsetPeek = false; + if (nextElem != null) { + endOffsetPeek = endOffset.equals(nextElem.first()); + } + // end offset may be synthetic and not exist in the queue + boolean endOffsetSynthetic = + currentOffset.snapshotId() == endOffset.snapshotId() + && (currentOffset.position() + 1) == endOffset.position(); + shouldTerminate = endOffsetPeek || endOffsetSynthetic; + } else { + LOG.trace("planFiles hasn't reached {}, waiting", endOffset); + } + } while (!shouldTerminate + && refreshFailedThrowable == null + && fillQueueFailedThrowable == null); + + if (refreshFailedThrowable != null) { + throw new RuntimeException("Table refresh failed", refreshFailedThrowable); + } + + if (fillQueueFailedThrowable != null) { + throw new RuntimeException("Queue filling failed", fillQueueFailedThrowable); + } + + LOG.info( + "completed planFiles for {}, startOffset: {}, endOffset: {}, files: {}, rows: {}", + table().name(), + startOffset, + endOffset, + filesInPlan, + rowsInPlan); + return result; + }); + } + + /** + * This needs to be non destructive on the queue as spark could call this multiple times. Each + * time, depending on the table state it could return something different + * + * @param startOffset the starting offset of the next microbatch + * @param limit a limit for how many files/bytes/rows the next microbatch should include + * @return The end offset to use for the next microbatch, null signals that no data is available + */ + @Override + public synchronized StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit) { + LOG.info( + "running latestOffset for {}, startOffset: {}, limit: {}", + table().name(), + startOffset, + limit); + + if (table().currentSnapshot() == null) { + LOG.info("latestOffset returning START_OFFSET, currentSnapshot() is null"); + return StreamingOffset.START_OFFSET; + } + + if (table().currentSnapshot().timestampMillis() < readConf().streamFromTimestamp()) { + LOG.info("latestOffset returning START_OFFSET, currentSnapshot() < fromTimestamp"); + return StreamingOffset.START_OFFSET; + } + + // if any exceptions were encountered in the background process, raise them here + if (refreshFailedThrowable != null) { + throw new RuntimeException(refreshFailedThrowable); + } + if (fillQueueFailedThrowable != null) { + throw new RuntimeException(fillQueueFailedThrowable); + } + + // if we want to read all available we don't need to scan files, just snapshots + if (limit instanceof ReadAllAvailable) { + // If Trigger.AvailableNow cap is set, return it directly + if (this.lastOffsetForTriggerAvailableNow != null) { + return this.lastOffsetForTriggerAvailableNow; + } + Snapshot lastValidSnapshot = table().snapshot(startOffset.snapshotId()); + Snapshot nextValidSnapshot; + do { + nextValidSnapshot = nextValidSnapshot(lastValidSnapshot); + if (nextValidSnapshot != null) { + lastValidSnapshot = nextValidSnapshot; + } + } while (nextValidSnapshot != null); + return new StreamingOffset( + lastValidSnapshot.snapshotId(), + MicroBatchUtils.addedFilesCount(table(), lastValidSnapshot), + false); + } + + return computeLimitedOffset(limit); + } + + private StreamingOffset computeLimitedOffset(ReadLimit limit) { + UnpackedLimits unpackedLimits = new UnpackedLimits(limit); + long rowsSeen = 0; + long filesSeen = 0; + LOG.debug( + "latestOffset queue status, queuedFiles: {}, queuedRows: {}", + queuedFileCount.get(), + queuedRowCount.get()); + + List> queueSnapshot = Lists.newArrayList(queue); + Pair queueTail = + queueSnapshot.isEmpty() ? null : queueSnapshot.get(queueSnapshot.size() - 1); + + for (int i = 0; i < queueSnapshot.size(); i++) { + Pair elem = queueSnapshot.get(i); + long fileRows = elem.second().file().recordCount(); + + // Hard limit on files - stop BEFORE exceeding + if (filesSeen + 1 > unpackedLimits.getMaxFiles()) { + if (filesSeen == 0) { + return null; + } + LOG.debug( + "latestOffset hit file limit at {}, rows: {}, files: {}", + elem.first(), + rowsSeen, + filesSeen); + return elem.first(); + } + + // Soft limit on rows - include file FIRST, then check + rowsSeen += fileRows; + filesSeen += 1; + + // Check if we've hit the row limit after including this file + if (rowsSeen >= unpackedLimits.getMaxRows()) { + if (filesSeen == 1 && rowsSeen > unpackedLimits.getMaxRows()) { + LOG.warn( + "File {} at offset {} contains {} records, exceeding maxRecordsPerMicroBatch limit of {}. " + + "This file will be processed entirely to guarantee forward progress. " + + "Consider increasing the limit or writing smaller files to avoid unexpected memory usage.", + elem.second().file().location(), + elem.first(), + fileRows, + unpackedLimits.getMaxRows()); + } + // Return the offset of the NEXT element (or synthesize tail+1) + if (i + 1 < queueSnapshot.size()) { + LOG.debug( + "latestOffset hit row limit at {}, rows: {}, files: {}", + queueSnapshot.get(i + 1).first(), + rowsSeen, + filesSeen); + return queueSnapshot.get(i + 1).first(); + } else { + // This is the last element - return tail+1 + StreamingOffset current = elem.first(); + StreamingOffset result = + new StreamingOffset( + current.snapshotId(), current.position() + 1, current.shouldScanAllFiles()); + LOG.debug( + "latestOffset hit row limit at tail {}, rows: {}, files: {}", + result, + rowsSeen, + filesSeen); + return result; + } + } + } + + // if we got here there aren't enough files to exceed our limits + if (queueTail != null) { + StreamingOffset tailOffset = queueTail.first(); + // we have to increment the position by 1 since we want to include the tail in the read and + // position is non-inclusive + StreamingOffset latestOffset = + new StreamingOffset( + tailOffset.snapshotId(), tailOffset.position() + 1, tailOffset.shouldScanAllFiles()); + LOG.debug("latestOffset returning all queued data {}", latestOffset); + return latestOffset; + } + + // if we got here the queue is empty + LOG.debug("latestOffset no data, returning null"); + return null; + } + + // Background task wrapper that traps exceptions + private void refreshAndTrapException() { + try { + table().refresh(); + } catch (Throwable t) { + LOG.error("Failed to refresh table {}", table().name(), t); + refreshFailedThrowable = t; + } + } + + // Background task wrapper that traps exceptions + private void fillQueueAndTrapException(Snapshot snapshot) { + try { + fillQueue(snapshot); + } catch (Throwable t) { + LOG.error("Failed to fill queue for table {}", table().name(), t); + fillQueueFailedThrowable = t; + } + } + + /** Generate a MicroBatch based on input parameters and add to the queue */ + private void addMicroBatchToQueue( + Snapshot snapshot, long startFileIndex, long endFileIndex, boolean shouldScanAllFile) { + LOG.info("Adding MicroBatch for snapshot: {} to the queue", snapshot.snapshotId()); + MicroBatches.MicroBatch microBatch = + MicroBatches.from(snapshot, table().io()) + .caseSensitive(readConf().caseSensitive()) + .specsById(table().specs()) + .generate(startFileIndex, endFileIndex, Long.MAX_VALUE, shouldScanAllFile); + + long position = startFileIndex; + for (FileScanTask task : microBatch.tasks()) { + Pair elem = + Pair.of(new StreamingOffset(microBatch.snapshotId(), position, shouldScanAllFile), task); + queuedFileCount.incrementAndGet(); + queuedRowCount.addAndGet(task.file().recordCount()); + queue.addLast(elem); + position += 1; + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("\n"); + for (Pair elem : queue) { + sb.append(elem.first()).append("\n"); + } + LOG.debug(sb.toString()); + } + lastQueuedSnapshot = snapshot; + } + + private void fillQueue(StreamingOffset fromOffset, StreamingOffset toOffset) { + LOG.debug("filling queue from {}, to: {}", fromOffset, toOffset); + Snapshot currentSnapshot = table().snapshot(fromOffset.snapshotId()); + // this could be a partial snapshot so add it outside the loop + if (currentSnapshot != null) { + addMicroBatchToQueue( + currentSnapshot, + fromOffset.position(), + MicroBatchUtils.addedFilesCount(table(), currentSnapshot), + fromOffset.shouldScanAllFiles()); + } + if (toOffset != null) { + if (currentSnapshot != null) { + while (currentSnapshot.snapshotId() != toOffset.snapshotId()) { + currentSnapshot = nextValidSnapshot(currentSnapshot); + if (currentSnapshot != null) { + addMicroBatchToQueue( + currentSnapshot, + 0, + MicroBatchUtils.addedFilesCount(table(), currentSnapshot), + false); + } else { + break; + } + } + } + // toOffset snapshot already added in loop when currentSnapshot == toOffset + } else { + fillQueueInitialBuffer(currentSnapshot); + } + } + + private void fillQueueInitialBuffer(Snapshot startSnapshot) { + // toOffset is null - fill initial buffer to prevent queue starvation before background + // thread starts. Use configured limits to avoid loading all snapshots + // (which could cause OOM on tables with thousands of snapshots). + long targetRows = readConf().asyncQueuePreloadRowLimit(); + long targetFiles = readConf().asyncQueuePreloadFileLimit(); + + Snapshot preloadEndSnapshot = initialPreloadEndSnapshot(); + if (preloadEndSnapshot == null) { + return; // Empty table + } + + // START_OFFSET case: initialize using nextValidSnapshot which respects timestamp filtering + Snapshot current = startSnapshot; + if (current == null) { + current = nextValidSnapshot(null); + if (current != null) { + addMicroBatchToQueue(current, 0, MicroBatchUtils.addedFilesCount(table(), current), false); + } + } + + // Continue loading more snapshots within safety limits + if (current != null) { + while ((queuedRowCount.get() < targetRows || queuedFileCount.get() < targetFiles) + && current.snapshotId() != preloadEndSnapshot.snapshotId()) { + current = nextValidSnapshot(current); + if (current != null) { + addMicroBatchToQueue( + current, 0, MicroBatchUtils.addedFilesCount(table(), current), false); + } else { + break; + } + } + } + } + + private Snapshot initialPreloadEndSnapshot() { + if (lastOffsetForTriggerAvailableNow != null) { + return table().snapshot(lastOffsetForTriggerAvailableNow.snapshotId()); + } + + return table().currentSnapshot(); + } + + @VisibleForTesting + static boolean reachedAvailableNowCap( + Snapshot readFrom, StreamingOffset lastOffsetForTriggerAvailableNow) { + return lastOffsetForTriggerAvailableNow != null + && readFrom != null + && readFrom.snapshotId() == lastOffsetForTriggerAvailableNow.snapshotId(); + } + + /** Try to populate the queue with data from unread snapshots */ + private void fillQueue(Snapshot readFrom) { + // Don't add beyond cap for Trigger.AvailableNow + if (reachedAvailableNowCap(readFrom, lastOffsetForTriggerAvailableNow)) { + LOG.debug( + "Reached cap snapshot {}, not adding more", + this.lastOffsetForTriggerAvailableNow.snapshotId()); + return; + } + + if ((queuedRowCount.get() > minQueuedRows) || (queuedFileCount.get() > minQueuedFiles)) { + // we have enough data buffered, check back shortly + LOG.debug( + "Buffer is full, {} > {} or {} > {}", + queuedRowCount.get(), + minQueuedRows, + queuedFileCount.get(), + minQueuedFiles); + } else { + // add an entire snapshot to the queue + Snapshot nextValidSnapshot = nextValidSnapshot(readFrom); + if (nextValidSnapshot != null) { + addMicroBatchToQueue( + nextValidSnapshot, + 0, + MicroBatchUtils.addedFilesCount(table(), nextValidSnapshot), + false); + } else { + LOG.debug("No snapshots ready to be read"); + } + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java new file mode 100644 index 000000000000..9298c2bbdfcc --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Locale; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles; +import org.apache.spark.sql.connector.read.streaming.ReadMaxRows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner { + private static final Logger LOG = LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class); + private final Table table; + private final SparkReadConf readConf; + + BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) { + this.table = table; + this.readConf = readConf; + } + + protected Table table() { + return table; + } + + protected SparkReadConf readConf() { + return readConf; + } + + protected boolean shouldProcess(Snapshot snapshot) { + String op = snapshot.operation(); + switch (op) { + case DataOperations.APPEND: + return true; + case DataOperations.REPLACE: + return false; + case DataOperations.DELETE: + Preconditions.checkState( + readConf.streamingSkipDeleteSnapshots(), + "Cannot process delete snapshot: %s, to ignore deletes, set %s=true", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); + return false; + case DataOperations.OVERWRITE: + Preconditions.checkState( + readConf.streamingSkipOverwriteSnapshots(), + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", + snapshot.snapshotId(), + SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); + return false; + default: + throw new IllegalStateException( + String.format( + "Cannot process unknown snapshot operation: %s (snapshot id %s)", + op.toLowerCase(Locale.ROOT), snapshot.snapshotId())); + } + } + + /** + * Get the next snapshot skipping over rewrite and delete snapshots. Async must handle nulls. + * + * @param curSnapshot the current snapshot + * @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all + * remaining snapshots should be skipped. + */ + protected Snapshot nextValidSnapshot(Snapshot curSnapshot) { + Snapshot nextSnapshot; + // if there were no valid snapshots, check for an initialOffset again + if (curSnapshot == null) { + StreamingOffset startingOffset = + MicroBatchUtils.determineStartingOffset(table, readConf.streamFromTimestamp()); + LOG.debug("determineStartingOffset picked startingOffset: {}", startingOffset); + if (StreamingOffset.START_OFFSET.equals(startingOffset)) { + return null; + } + nextSnapshot = table.snapshot(startingOffset.snapshotId()); + } else { + if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { + return null; + } + nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + } + // skip over rewrite and delete snapshots + while (!shouldProcess(nextSnapshot)) { + LOG.debug("Skipping snapshot: {}", nextSnapshot); + // if the currentSnapShot was also the mostRecentSnapshot then break + // avoids snapshotAfter throwing exception since there are no more snapshots to process + if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { + return null; + } + nextSnapshot = SnapshotUtil.snapshotAfter(table, nextSnapshot.snapshotId()); + } + return nextSnapshot; + } + + static class UnpackedLimits { + private long maxRows = Integer.MAX_VALUE; + private long maxFiles = Integer.MAX_VALUE; + + UnpackedLimits(ReadLimit limit) { + if (limit instanceof CompositeReadLimit) { + ReadLimit[] compositeLimits = ((CompositeReadLimit) limit).getReadLimits(); + for (ReadLimit individualLimit : compositeLimits) { + if (individualLimit instanceof ReadMaxRows) { + ReadMaxRows readMaxRows = (ReadMaxRows) individualLimit; + this.maxRows = Math.min(this.maxRows, readMaxRows.maxRows()); + } else if (individualLimit instanceof ReadMaxFiles) { + ReadMaxFiles readMaxFiles = (ReadMaxFiles) individualLimit; + this.maxFiles = Math.min(this.maxFiles, readMaxFiles.maxFiles()); + } + } + } else if (limit instanceof ReadMaxRows) { + this.maxRows = ((ReadMaxRows) limit).maxRows(); + } else if (limit instanceof ReadMaxFiles) { + this.maxFiles = ((ReadMaxFiles) limit).maxFiles(); + } + } + + public long getMaxRows() { + return maxRows; + } + + public long getMaxFiles() { + return maxFiles; + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java new file mode 100644 index 000000000000..7c73e3f416e3 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotChanges; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; + +class MicroBatchUtils { + + private MicroBatchUtils() {} + + static StreamingOffset determineStartingOffset(Table table, long fromTimestamp) { + if (table.currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } + + if (fromTimestamp == Long.MIN_VALUE) { + // start from the oldest snapshot, since default value is MIN_VALUE + // avoids looping to find first snapshot + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + } + + if (table.currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } + + try { + Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); + if (snapshot != null) { + return new StreamingOffset(snapshot.snapshotId(), 0, false); + } else { + return StreamingOffset.START_OFFSET; + } + } catch (IllegalStateException e) { + // could not determine the first snapshot after the timestamp. use the oldest ancestor instead + return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + } + } + + static long addedFilesCount(Table table, Snapshot snapshot) { + long addedFilesCount = + PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); + return addedFilesCount == -1 + ? Iterables.size( + SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles()) + : addedFilesCount; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java new file mode 100644 index 000000000000..1986ddac5d8e --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileScanTask; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; + +interface SparkMicroBatchPlanner { + /** + * Return the {@link FileScanTask}s for data added between the start and end offsets. + * + * @param startOffset the offset to start planning from + * @param endOffset the offset to plan up to + * @return file scan tasks for data in the offset range + */ + List planFiles(StreamingOffset startOffset, StreamingOffset endOffset); + + /** + * Return the latest offset the stream can advance to from {@code startOffset}, respecting the + * given {@link ReadLimit}. + * + * @param startOffset the current offset of the stream + * @param limit the read limit bounding how far ahead to advance + * @return the latest available offset, or {@code null} if no new data is available + */ + StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit); + + /** Stop the planner and release any resources. */ + void stop(); +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index a82583747a64..a1ff767fe2a0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -26,48 +26,32 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Locale; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.MicroBatches; -import org.apache.iceberg.MicroBatches.MicroBatch; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotChanges; -import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; -import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.connector.read.streaming.ReadLimit; -import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles; -import org.apache.spark.sql.connector.read.streaming.ReadMaxRows; import org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +63,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final Table table; private final Supplier fileIO; + private final SparkReadConf readConf; private final String branch; private final boolean caseSensitive; private final String expectedSchema; @@ -89,12 +74,11 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA private final long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; - private final boolean skipDelete; - private final boolean skipOverwrite; private final long fromTimestamp; private final int maxFilesPerMicroBatch; private final int maxRecordsPerMicroBatch; private final boolean cacheDeleteFilesOnExecutors; + private SparkMicroBatchPlanner planner; private StreamingOffset lastOffsetForTriggerAvailableNow; SparkMicroBatchStream( @@ -106,6 +90,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA String checkpointLocation) { this.table = table; this.fileIO = fileIO; + this.readConf = readConf; this.branch = readConf.branch(); this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = SchemaParser.toJson(expectedSchema); @@ -124,9 +109,6 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA new InitialOffsetStore( table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration()); this.initialOffset = initialOffsetStore.initialOffset(); - - this.skipDelete = readConf.streamingSkipDeleteSnapshots(); - this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); } @Override @@ -141,8 +123,8 @@ public Offset latestOffset() { } Snapshot latestSnapshot = table.currentSnapshot(); - - return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false); + return new StreamingOffset( + latestSnapshot.snapshotId(), MicroBatchUtils.addedFilesCount(table, latestSnapshot), false); } @Override @@ -161,7 +143,11 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { StreamingOffset endOffset = (StreamingOffset) end; StreamingOffset startOffset = (StreamingOffset) start; - List fileScanTasks = planFiles(startOffset, endOffset); + if (planner == null) { + initializePlanner(startOffset, endOffset); + } + + List fileScanTasks = planner.planFiles(startOffset, endOffset); CloseableIterable splitTasks = TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), splitSize); @@ -171,7 +157,6 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { String[][] locations = computePreferredLocations(combinedScanTasks); InputPartition[] partitions = new InputPartition[combinedScanTasks.size()]; - for (int index = 0; index < combinedScanTasks.size(); index++) { partitions[index] = new SparkInputPartition( @@ -214,318 +199,35 @@ public Offset deserializeOffset(String json) { public void commit(Offset end) {} @Override - public void stop() {} - - private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { - List fileScanTasks = Lists.newArrayList(); - StreamingOffset batchStartOffset = - StreamingOffset.START_OFFSET.equals(startOffset) - ? determineStartingOffset(table, fromTimestamp) - : startOffset; - - StreamingOffset currentOffset = null; - - // [(startOffset : startFileIndex), (endOffset : endFileIndex) ) - do { - long endFileIndex; - if (currentOffset == null) { - currentOffset = batchStartOffset; - } else { - Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); - // it may happen that we need to read this snapshot partially in case it's equal to - // endOffset. - if (currentOffset.snapshotId() != endOffset.snapshotId()) { - currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); - } else { - currentOffset = endOffset; - } - } - - Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); - - validateCurrentSnapshotExists(snapshot, currentOffset); - - if (!shouldProcess(snapshot)) { - LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); - continue; - } - - Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId()); - if (currentOffset.snapshotId() == endOffset.snapshotId()) { - endFileIndex = endOffset.position(); - } else { - endFileIndex = addedFilesCount(currentSnapshot); - } - - MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) - .caseSensitive(caseSensitive) - .specsById(table.specs()) - .generate( - currentOffset.position(), - endFileIndex, - Long.MAX_VALUE, - currentOffset.shouldScanAllFiles()); - - fileScanTasks.addAll(latestMicroBatch.tasks()); - } while (currentOffset.snapshotId() != endOffset.snapshotId()); - - return fileScanTasks; - } - - private boolean shouldProcess(Snapshot snapshot) { - String op = snapshot.operation(); - switch (op) { - case DataOperations.APPEND: - return true; - case DataOperations.REPLACE: - return false; - case DataOperations.DELETE: - Preconditions.checkState( - skipDelete, - "Cannot process delete snapshot: %s, to ignore deletes, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); - return false; - case DataOperations.OVERWRITE: - Preconditions.checkState( - skipOverwrite, - "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", - snapshot.snapshotId(), - SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); - return false; - default: - throw new IllegalStateException( - String.format( - "Cannot process unknown snapshot operation: %s (snapshot id %s)", - op.toLowerCase(Locale.ROOT), snapshot.snapshotId())); - } - } - - private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { - if (table.currentSnapshot() == null) { - return StreamingOffset.START_OFFSET; - } - - if (fromTimestamp == null) { - // match existing behavior and start from the oldest snapshot - return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); - } - - if (table.currentSnapshot().timestampMillis() < fromTimestamp) { - return StreamingOffset.START_OFFSET; - } - - try { - Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp); - if (snapshot != null) { - return new StreamingOffset(snapshot.snapshotId(), 0, false); - } else { - return StreamingOffset.START_OFFSET; - } - } catch (IllegalStateException e) { - // could not determine the first snapshot after the timestamp. use the oldest ancestor instead - return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false); + public void stop() { + if (planner != null) { + planner.stop(); } } - private static int getMaxFiles(ReadLimit readLimit) { - if (readLimit instanceof ReadMaxFiles) { - return ((ReadMaxFiles) readLimit).maxFiles(); - } - - if (readLimit instanceof CompositeReadLimit) { - // We do not expect a CompositeReadLimit to contain a nested CompositeReadLimit. - // In fact, it should only be a composite of two or more of ReadMinRows, ReadMaxRows and - // ReadMaxFiles, with no more than one of each. - ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits(); - for (ReadLimit limit : limits) { - if (limit instanceof ReadMaxFiles) { - return ((ReadMaxFiles) limit).maxFiles(); - } - } - } - - // there is no ReadMaxFiles, so return the default - return Integer.MAX_VALUE; - } - - private static int getMaxRows(ReadLimit readLimit) { - if (readLimit instanceof ReadMaxRows) { - long maxRows = ((ReadMaxRows) readLimit).maxRows(); - return Math.toIntExact(maxRows); - } - - if (readLimit instanceof CompositeReadLimit) { - ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits(); - for (ReadLimit limit : limits) { - if (limit instanceof ReadMaxRows) { - long maxRows = ((ReadMaxRows) limit).maxRows(); - return Math.toIntExact(maxRows); - } - } + private void initializePlanner(StreamingOffset startOffset, StreamingOffset endOffset) { + if (readConf.asyncMicroBatchPlanningEnabled()) { + this.planner = + new AsyncSparkMicroBatchPlanner( + table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow); + } else { + this.planner = + new SyncSparkMicroBatchPlanner(table, readConf, lastOffsetForTriggerAvailableNow); } - - // There is no ReadMaxRows, so return the default - return Integer.MAX_VALUE; } @Override - @SuppressWarnings("checkstyle:CyclomaticComplexity") public Offset latestOffset(Offset startOffset, ReadLimit limit) { - // calculate end offset get snapshotId from the startOffset Preconditions.checkArgument( startOffset instanceof StreamingOffset, "Invalid start offset: %s is not a StreamingOffset", startOffset); - table.refresh(); - if (table.currentSnapshot() == null) { - return StreamingOffset.START_OFFSET; - } - - if (table.currentSnapshot().timestampMillis() < fromTimestamp) { - return StreamingOffset.START_OFFSET; + if (planner == null) { + initializePlanner((StreamingOffset) startOffset, null); } - // end offset can expand to multiple snapshots - StreamingOffset startingOffset = (StreamingOffset) startOffset; - - if (startOffset.equals(StreamingOffset.START_OFFSET)) { - startingOffset = determineStartingOffset(table, fromTimestamp); - } - - Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId()); - validateCurrentSnapshotExists(curSnapshot, startingOffset); - - // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled. - long latestSnapshotId = - lastOffsetForTriggerAvailableNow != null - ? lastOffsetForTriggerAvailableNow.snapshotId() - : table.currentSnapshot().snapshotId(); - - int startPosOfSnapOffset = (int) startingOffset.position(); - - boolean scanAllFiles = startingOffset.shouldScanAllFiles(); - - boolean shouldContinueReading = true; - int curFilesAdded = 0; - long curRecordCount = 0; - int curPos = 0; - - // Note : we produce nextOffset with pos as non-inclusive - while (shouldContinueReading) { - // generate manifest index for the curSnapshot - List> indexedManifests = - MicroBatches.skippedManifestIndexesFromSnapshot( - table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles); - // this is under assumption we will be able to add at-least 1 file in the new offset - for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) { - // be rest assured curPos >= startFileIndex - curPos = indexedManifests.get(idx).second(); - try (CloseableIterable taskIterable = - MicroBatches.openManifestFile( - table.io(), - table.specs(), - caseSensitive, - curSnapshot, - indexedManifests.get(idx).first(), - scanAllFiles); - CloseableIterator taskIter = taskIterable.iterator()) { - while (taskIter.hasNext()) { - FileScanTask task = taskIter.next(); - if (curPos >= startPosOfSnapOffset) { - if ((curFilesAdded + 1) > getMaxFiles(limit)) { - // On including the file it might happen that we might exceed, the configured - // soft limit on the number of records, since this is a soft limit its acceptable. - shouldContinueReading = false; - break; - } - - curFilesAdded += 1; - curRecordCount += task.file().recordCount(); - - if (curRecordCount >= getMaxRows(limit)) { - // we included the file, so increment the number of files - // read in the current snapshot. - ++curPos; - shouldContinueReading = false; - break; - } - } - ++curPos; - } - } catch (IOException ioe) { - LOG.warn("Failed to close task iterable", ioe); - } - } - // if the currentSnapShot was also the latestSnapshot then break - if (curSnapshot.snapshotId() == latestSnapshotId) { - break; - } - - // if everything was OK and we consumed complete snapshot then move to next snapshot - if (shouldContinueReading) { - Snapshot nextValid = nextValidSnapshot(curSnapshot); - if (nextValid == null) { - // nextValid implies all the remaining snapshots should be skipped. - break; - } - // we found the next available snapshot, continue from there. - curSnapshot = nextValid; - startPosOfSnapOffset = -1; - // if anyhow we are moving to next snapshot we should only scan addedFiles - scanAllFiles = false; - } - } - - StreamingOffset latestStreamingOffset = - new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles); - - // if no new data arrived, then return null. - return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; - } - - /** - * Get the next snapshot skiping over rewrite and delete snapshots. - * - * @param curSnapshot the current snapshot - * @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all - * remaining snapshots should be skipped. - */ - private Snapshot nextValidSnapshot(Snapshot curSnapshot) { - Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); - // skip over rewrite and delete snapshots - while (!shouldProcess(nextSnapshot)) { - LOG.debug("Skipping snapshot: {} of table {}", nextSnapshot.snapshotId(), table.name()); - // if the currentSnapShot was also the mostRecentSnapshot then break - if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { - return null; - } - nextSnapshot = SnapshotUtil.snapshotAfter(table, nextSnapshot.snapshotId()); - } - return nextSnapshot; - } - - private long addedFilesCount(Snapshot snapshot) { - long addedFilesCount = - PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); - // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, - // iterate through addedFiles iterator to find addedFilesCount. - return addedFilesCount == -1 - ? Iterables.size( - SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles()) - : addedFilesCount; - } - - private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) { - if (snapshot == null) { - throw new IllegalStateException( - String.format( - Locale.ROOT, - "Cannot load current offset at snapshot %d, the snapshot was expired or removed", - currentOffset.snapshotId())); - } + return planner.latestOffset((StreamingOffset) startOffset, limit); } @Override @@ -553,6 +255,11 @@ public void prepareForTriggerAvailableNow() { (StreamingOffset) latestOffset(initialOffset, ReadLimit.allAvailable()); LOG.info("lastOffset for Trigger.AvailableNow is {}", lastOffsetForTriggerAvailableNow.json()); + + if (planner != null) { + planner.stop(); + planner = null; + } } private static class InitialOffsetStore { @@ -576,7 +283,7 @@ public StreamingOffset initialOffset() { } table.refresh(); - StreamingOffset offset = determineStartingOffset(table, fromTimestamp); + StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, fromTimestamp); OutputFile outputFile = io.newOutputFile(initialOffsetLocation); writeOffset(offset, outputFile); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java new file mode 100644 index 000000000000..f1b0029c5432 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner { + private static final Logger LOG = LoggerFactory.getLogger(SyncSparkMicroBatchPlanner.class); + + private final boolean caseSensitive; + private final long fromTimestamp; + private final StreamingOffset lastOffsetForTriggerAvailableNow; + + SyncSparkMicroBatchPlanner( + Table table, SparkReadConf readConf, StreamingOffset lastOffsetForTriggerAvailableNow) { + super(table, readConf); + this.caseSensitive = readConf().caseSensitive(); + this.fromTimestamp = readConf().streamFromTimestamp(); + this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow; + } + + @Override + public List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { + List fileScanTasks = Lists.newArrayList(); + StreamingOffset batchStartOffset = + StreamingOffset.START_OFFSET.equals(startOffset) + ? MicroBatchUtils.determineStartingOffset(table(), fromTimestamp) + : startOffset; + + StreamingOffset currentOffset = null; + + // [(startOffset : startFileIndex), (endOffset : endFileIndex) ) + do { + long endFileIndex; + if (currentOffset == null) { + currentOffset = batchStartOffset; + } else { + Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table(), currentOffset.snapshotId()); + // it may happen that we need to read this snapshot partially in case it's equal to + // endOffset. + if (currentOffset.snapshotId() != endOffset.snapshotId()) { + currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + } else { + currentOffset = endOffset; + } + } + + Snapshot snapshot = table().snapshot(currentOffset.snapshotId()); + + validateCurrentSnapshotExists(snapshot, currentOffset); + + if (!shouldProcess(snapshot)) { + LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table().name()); + continue; + } + + Snapshot currentSnapshot = table().snapshot(currentOffset.snapshotId()); + if (currentOffset.snapshotId() == endOffset.snapshotId()) { + endFileIndex = endOffset.position(); + } else { + endFileIndex = MicroBatchUtils.addedFilesCount(table(), currentSnapshot); + } + + MicroBatch latestMicroBatch = + MicroBatches.from(currentSnapshot, table().io()) + .caseSensitive(caseSensitive) + .specsById(table().specs()) + .generate( + currentOffset.position(), + endFileIndex, + Long.MAX_VALUE, + currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (currentOffset.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit) { + table().refresh(); + if (table().currentSnapshot() == null) { + return StreamingOffset.START_OFFSET; + } + + if (table().currentSnapshot().timestampMillis() < fromTimestamp) { + return StreamingOffset.START_OFFSET; + } + + // end offset can expand to multiple snapshots + StreamingOffset startingOffset = startOffset; + + if (startOffset.equals(StreamingOffset.START_OFFSET)) { + startingOffset = MicroBatchUtils.determineStartingOffset(table(), fromTimestamp); + } + + Snapshot curSnapshot = table().snapshot(startingOffset.snapshotId()); + validateCurrentSnapshotExists(curSnapshot, startingOffset); + + // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled. + long latestSnapshotId = + lastOffsetForTriggerAvailableNow != null + ? lastOffsetForTriggerAvailableNow.snapshotId() + : table().currentSnapshot().snapshotId(); + + int startPosOfSnapOffset = (int) startingOffset.position(); + + boolean scanAllFiles = startingOffset.shouldScanAllFiles(); + + boolean shouldContinueReading = true; + int curFilesAdded = 0; + long curRecordCount = 0; + int curPos = 0; + + // Extract limits once to avoid repeated calls in tight loop + UnpackedLimits unpackedLimits = new UnpackedLimits(limit); + long maxFiles = unpackedLimits.getMaxFiles(); + long maxRows = unpackedLimits.getMaxRows(); + + // Note : we produce nextOffset with pos as non-inclusive + while (shouldContinueReading) { + // generate manifest index for the curSnapshot + List> indexedManifests = + MicroBatches.skippedManifestIndexesFromSnapshot( + table().io(), curSnapshot, startPosOfSnapOffset, scanAllFiles); + // this is under assumption we will be able to add at-least 1 file in the new offset + for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) { + // be rest assured curPos >= startFileIndex + curPos = indexedManifests.get(idx).second(); + try (CloseableIterable taskIterable = + MicroBatches.openManifestFile( + table().io(), + table().specs(), + caseSensitive, + curSnapshot, + indexedManifests.get(idx).first(), + scanAllFiles); + CloseableIterator taskIter = taskIterable.iterator()) { + while (taskIter.hasNext()) { + FileScanTask task = taskIter.next(); + if (curPos >= startPosOfSnapOffset) { + if ((curFilesAdded + 1) > maxFiles) { + // On including the file it might happen that we might exceed, the configured + // soft limit on the number of records, since this is a soft limit its acceptable. + shouldContinueReading = false; + break; + } + + curFilesAdded += 1; + curRecordCount += task.file().recordCount(); + + if (curRecordCount >= maxRows) { + // we included the file, so increment the number of files + // read in the current snapshot. + if (curFilesAdded == 1 && curRecordCount > maxRows) { + LOG.warn( + "File {} contains {} records, exceeding maxRecordsPerMicroBatch limit of {}. " + + "This file will be processed entirely to guarantee forward progress. " + + "Consider increasing the limit or writing smaller files to avoid unexpected memory usage.", + task.file().location(), + task.file().recordCount(), + maxRows); + } + ++curPos; + shouldContinueReading = false; + break; + } + } + ++curPos; + } + } catch (IOException ioe) { + LOG.warn("Failed to close task iterable", ioe); + } + } + // if the currentSnapShot was also the latestSnapshot then break + if (curSnapshot.snapshotId() == latestSnapshotId) { + break; + } + + // if everything was OK and we consumed complete snapshot then move to next snapshot + if (shouldContinueReading) { + Snapshot nextValid = nextValidSnapshot(curSnapshot); + if (nextValid == null) { + // nextValid implies all the remaining snapshots should be skipped. + break; + } + // we found the next available snapshot, continue from there. + curSnapshot = nextValid; + startPosOfSnapOffset = -1; + // if anyhow we are moving to next snapshot we should only scan addedFiles + scanAllFiles = false; + } + } + + StreamingOffset latestStreamingOffset = + new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles); + + // if no new data arrived, then return null. + return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; + } + + @Override + public void stop() {} + + private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) { + if (snapshot == null) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + currentOffset.snapshotId())); + } + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java new file mode 100644 index 000000000000..b6017e2001e7 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.Snapshot; +import org.junit.jupiter.api.Test; + +class TestAsyncSparkMicroBatchPlanner { + + @Test + void reachedAvailableNowCapReturnsTrueOnlyForExactCapSnapshot() { + Snapshot capSnapshot = mockSnapshot(10L); + Snapshot laterSnapshotWithHigherId = mockSnapshot(20L); + Snapshot laterSnapshotWithLowerId = mockSnapshot(5L); + StreamingOffset capOffset = new StreamingOffset(10L, 3L, false); + + assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(capSnapshot, capOffset)).isTrue(); + assertThat( + AsyncSparkMicroBatchPlanner.reachedAvailableNowCap( + laterSnapshotWithHigherId, capOffset)) + .isFalse(); + assertThat( + AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(laterSnapshotWithLowerId, capOffset)) + .isFalse(); + } + + @Test + void reachedAvailableNowCapReturnsFalseWhenCapOrSnapshotIsMissing() { + Snapshot readFrom = mockSnapshot(10L); + StreamingOffset capOffset = new StreamingOffset(10L, 1L, false); + + assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(readFrom, null)).isFalse(); + assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(null, capOffset)).isFalse(); + } + + private Snapshot mockSnapshot(long snapshotId) { + Snapshot snapshot = mock(Snapshot.class); + when(snapshot.snapshotId()).thenReturn(snapshotId); + return snapshot; + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java new file mode 100644 index 000000000000..a9ce340fd4ec --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestMicroBatchPlanningUtils extends CatalogTestBase { + + private Table table; + + @BeforeEach + public void setupTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql( + "CREATE TABLE %s " + + "(id INT, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(3, id))", + tableName); + this.table = validationCatalog.loadTable(tableIdent); + } + + @AfterEach + public void dropTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testUnpackedLimitsCompositeChoosesMinimum() { + ReadLimit[] limits = + new ReadLimit[] { + ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), ReadLimit.maxFiles(2) + }; + + ReadLimit composite = ReadLimit.compositeLimit(limits); + + BaseSparkMicroBatchPlanner.UnpackedLimits unpacked = + new BaseSparkMicroBatchPlanner.UnpackedLimits(composite); + + assertThat(unpacked.getMaxRows()).isEqualTo(4); + assertThat(unpacked.getMaxFiles()).isEqualTo(2); + } + + @TestTemplate + public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() { + sql("INSERT INTO %s VALUES (1, 'one')", tableName); + table.refresh(); + long snapshot1Time = table.currentSnapshot().timestampMillis(); + + sql("INSERT INTO %s VALUES (2, 'two')", tableName); + table.refresh(); + long snapshot2Id = table.currentSnapshot().snapshotId(); + + StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, snapshot1Time + 1); + + assertThat(offset.snapshotId()).isEqualTo(snapshot2Id); + assertThat(offset.position()).isEqualTo(0L); + assertThat(offset.shouldScanAllFiles()).isFalse(); + } + + @TestTemplate + public void testAddedFilesCountUsesSummaryWhenPresent() { + sql("INSERT INTO %s VALUES (1, 'one')", tableName); + table.refresh(); + + long expectedAddedFiles = + Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.ADDED_FILES_PROP)); + + long actual = MicroBatchUtils.addedFilesCount(table, table.currentSnapshot()); + + assertThat(actual).isEqualTo(expectedAddedFiles); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 5f54c832aa93..d97e6ec00d7f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -31,13 +31,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -50,15 +54,22 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; @@ -73,10 +84,73 @@ @ExtendWith(ParameterizedTestExtension.class) public final class TestStructuredStreamingRead3 extends CatalogTestBase { + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, async = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + false + }, + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + true + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + false + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + true + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build(), + false + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build(), + true + }, + { + SparkCatalogConfig.SPARK_SESSION.catalogName(), + SparkCatalogConfig.SPARK_SESSION.implementation(), + SparkCatalogConfig.SPARK_SESSION.properties(), + false + }, + { + SparkCatalogConfig.SPARK_SESSION.catalogName(), + SparkCatalogConfig.SPARK_SESSION.implementation(), + SparkCatalogConfig.SPARK_SESSION.properties(), + true + } + }; + } + private Table table; private final AtomicInteger microBatches = new AtomicInteger(); + @Parameter(index = 3) + private Boolean async; + /** * test data to be used by multiple writes each write creates a snapshot and writes a list of * records @@ -250,15 +324,41 @@ public void testReadStreamWithCompositeReadLimit() throws Exception { Trigger.AvailableNow()); } + @TestTemplate + public void testReadStreamWithLowAsyncQueuePreload() throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + // Set low preload limits to test async queue behavior - background thread should load + // remaining data + + StreamingQuery query = + startStream( + ImmutableMap.of( + SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT, + "5", + SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT, + "5")); + + List actual = rowsAvailable(query); + assertThat(actual) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS)); + } + @TestTemplate public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws Exception { File writerCheckpointFolder = temp.resolve("writer-checkpoint-folder").toFile(); File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); File output = temp.resolve("junit").toFile(); + Map options = Maps.newHashMap(); + options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, async.toString()); + if (async) { + options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, "1"); + } + DataStreamWriter querySource = spark .readStream() + .options(options) .format("iceberg") .load(tableName) .writeStream() @@ -313,10 +413,17 @@ public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() throws Ex long expectedSnapshotId = table.currentSnapshot().snapshotId(); String sinkTable = "availablenow_sink"; + Map options = Maps.newHashMap(); + options.put(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"); + options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, async.toString()); + if (async) { + options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, "1"); + } + StreamingQuery query = spark .readStream() - .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1") + .options(options) .format("iceberg") .load(tableName) .writeStream() @@ -358,6 +465,142 @@ public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() throws Ex assertThat(actualResults).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedData)); } + @TestTemplate + public void testTriggerAvailableNowCapsAsyncPreloadAfterPrepare() { + List> initialData = + List.of(List.of(new SimpleRecord(1, "one")), List.of(new SimpleRecord(2, "two"))); + appendDataAsMultipleSnapshots(initialData); + + table.refresh(); + long expectedCapSnapshotId = table.currentSnapshot().snapshotId(); + + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + JavaSparkContext.fromSparkContext(spark.sparkContext()), + table, + table::io, + new SparkReadConf( + spark, + table, + ImmutableMap.of( + SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, + async.toString(), + SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, + "1", + SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, + "1", + SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT, + "10", + SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT, + "10")), + table.schema(), + temp.resolve("available-now-cap-checkpoint").toString()); + + try { + stream.prepareForTriggerAvailableNow(); + + appendData(List.of(new SimpleRecord(3, "three"))); + + Offset startOffset = stream.initialOffset(); + Offset firstEndOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + assertThat(firstEndOffset).isNotNull(); + stream.planInputPartitions(startOffset, firstEndOffset); + + Offset secondEndOffset = stream.latestOffset(firstEndOffset, stream.getDefaultReadLimit()); + assertThat(secondEndOffset).isNotNull(); + stream.planInputPartitions(firstEndOffset, secondEndOffset); + + assertThat(stream.latestOffset(secondEndOffset, stream.getDefaultReadLimit())).isNull(); + assertThat(((StreamingOffset) secondEndOffset).snapshotId()).isEqualTo(expectedCapSnapshotId); + } finally { + stream.stop(); + } + } + + @TestTemplate + public void testLatestOffsetReturnsNullAfterFinalBatchIsConsumed() throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + table.refresh(); + int expectedBatchCount; + try (CloseableIterable tasks = table.newScan().planFiles()) { + expectedBatchCount = Iterables.size(tasks); + } + + SparkMicroBatchStream stream = + newMicroBatchStream( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"), + "drain-to-null-checkpoint"); + + try { + int plannedBatchCount = 0; + Offset startOffset = stream.initialOffset(); + Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + while (endOffset != null) { + InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset); + assertThat(partitions).isNotEmpty(); + plannedBatchCount += 1; + startOffset = endOffset; + endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + } + + assertThat(endOffset).isNull(); + assertThat(plannedBatchCount).isEqualTo(expectedBatchCount); + } finally { + stream.stop(); + } + } + + @TestTemplate + public void testPlanInputPartitionsIsIdempotentForSameOffsets() { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + SparkMicroBatchStream stream = + newMicroBatchStream( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"), + "idempotent-plan-files-checkpoint"); + + try { + Offset startOffset = stream.initialOffset(); + Offset endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + + assertThat(endOffset).isNotNull(); + + InputPartition[] firstPartitions = stream.planInputPartitions(startOffset, endOffset); + InputPartition[] secondPartitions = stream.planInputPartitions(startOffset, endOffset); + + List firstFileLocations = Lists.newArrayList(); + for (InputPartition partition : firstPartitions) { + SparkInputPartition sparkInputPartition = (SparkInputPartition) partition; + for (FileScanTask task : sparkInputPartition.taskGroup().tasks()) { + firstFileLocations.add(task.file().location()); + } + } + + List secondFileLocations = Lists.newArrayList(); + for (InputPartition partition : secondPartitions) { + SparkInputPartition sparkInputPartition = (SparkInputPartition) partition; + for (FileScanTask task : sparkInputPartition.taskGroup().tasks()) { + secondFileLocations.add(task.file().location()); + } + } + + assertThat(firstFileLocations).containsExactlyInAnyOrderElementsOf(secondFileLocations); + + startOffset = endOffset; + endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + while (endOffset != null) { + assertThat(stream.planInputPartitions(startOffset, endOffset)).isNotEmpty(); + startOffset = endOffset; + endOffset = stream.latestOffset(startOffset, stream.getDefaultReadLimit()); + } + + assertThat(endOffset).isNull(); + } finally { + stream.stop(); + } + } + @TestTemplate public void testReadStreamOnIcebergThenAddData() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -425,6 +668,8 @@ public void testReadingStreamFromFutureTimetsamp() throws Exception { // Data appended after the timestamp should appear appendData(data); + // Allow async background thread to refresh, else test sometimes fails + Thread.sleep(50); actual = rowsAvailable(query); assertThat(actual).containsExactlyInAnyOrderElementsOf(data); } @@ -872,13 +1117,18 @@ private void appendData(List data, String format) { private static final String MEMORY_TABLE = "_stream_view_mem"; private StreamingQuery startStream(Map options) throws TimeoutException { + Map allOptions = Maps.newHashMap(options); + allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, async.toString()); + if (async) { + allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, "1"); + } return spark .readStream() - .options(options) + .options(allOptions) .format("iceberg") .load(tableName) .writeStream() - .options(options) + .options(allOptions) .format("memory") .queryName(MEMORY_TABLE) .outputMode(OutputMode.Append()) @@ -903,11 +1153,17 @@ private void assertMicroBatchRecordSizes( private void assertMicroBatchRecordSizes( Map options, List expectedMicroBatchRecordSize, Trigger trigger) throws TimeoutException { - Dataset ds = spark.readStream().options(options).format("iceberg").load(tableName); + Map allOptions = Maps.newHashMap(options); + allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, async.toString()); + if (async) { + allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, "1"); + } + + Dataset ds = spark.readStream().options(allOptions).format("iceberg").load(tableName); List syncList = Collections.synchronizedList(Lists.newArrayList()); ds.writeStream() - .options(options) + .options(allOptions) .trigger(trigger) .foreachBatch( (VoidFunction2, Long>) @@ -929,4 +1185,21 @@ private List rowsAvailable(StreamingQuery query) { .as(Encoders.bean(SimpleRecord.class)) .collectAsList(); } + + private SparkMicroBatchStream newMicroBatchStream( + Map options, String checkpointDirName) { + Map allOptions = Maps.newHashMap(options); + allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, async.toString()); + if (async) { + allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, "1"); + } + + return new SparkMicroBatchStream( + JavaSparkContext.fromSparkContext(spark.sparkContext()), + table, + table::io, + new SparkReadConf(spark, table, allOptions), + table.schema(), + temp.resolve(checkpointDirName).toString()); + } }