Skip to content

Flink: Source fetcher threads leak when Iceberg reader is cancelled #16426

@shanzi

Description

@shanzi

Apache Iceberg version

1.10.2

Query engine

Flink

Please describe the bug 🐞

Source Data Fetcher threads can leak when a Flink task using the Iceberg source is cancelled. Each leaked thread keeps its stack and object references alive, so memory reachable from the thread cannot be released. In our case, the leaked fetchers retained parquet buffers, S3 input streams, Iceberg reader state, and references back into the Flink operator chain. Repeated failovers accumulated these leaked threads until TaskManagers became GC-bound and eventually died.

Diagnosis

The leak happens when the fetcher is blocked waiting for an array-pool entry during cancellation. ArrayPoolDataIteratorBatcher.ArrayPoolBatchIterator#getCachedEntry() calls Pool#pollEntry(), which blocks indefinitely, while IcebergSourceSplitReader#wakeUp() is currently a no-op.

Flink’s shutdown path depends on cooperative wakeup. In Flink 1.19, SourceReaderBase#close() delegates to splitFetcherManager.close(options.sourceReaderCloseTimeout). SplitFetcherManager#close(long timeoutMs) calls SplitFetcher::shutdown, shuts down the executor, and waits up to the timeout. If the fetcher does not exit, it only logs a warning:

Failed to close the source reader in 30000 ms. There are still 1 split fetchers running

It does not forcefully stop or interrupt the stuck fetcher thread. Therefore, if SplitReader#wakeUp() does not unblock the active read, the Source Data Fetcher can remain alive indefinitely.

Example stuck stack:

"Source Data Fetcher ..." WAITING (parking)
  at java.util.concurrent.ArrayBlockingQueue.take
  at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
  at org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.getCachedEntry(...)
  at org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(...)
  at org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(...)
  at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(...)
  at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(...)
  at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(...)

Relevant Iceberg code:

@Override
public void wakeUp() {}

and:

private T[] getCachedEntry() {
  try {
    return pool.pollEntry();
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Interrupted while waiting for array pool entry", e);
  }
}

Proposed fix

Make the Iceberg Flink source reader wakeable while waiting for array-pool entries.

Apache Paimon appears to have fixed the same class of issue in apache/paimon#4098, titled [flink] fix the source data fetcher thread leak. That fix added a timed Pool#pollEntry(Duration) method, added a wakeup flag to the split reader, and changed the blocking pool wait into a timed poll loop that exits when wakeup is requested.

A similar Iceberg fix could:

  1. Add a wakeup flag to the source reader and/or current batch iterator.
  2. Implement IcebergSourceSplitReader#wakeUp() to set the flag and propagate it to the active iterator.
  3. Replace the indefinite pool.pollEntry() wait with a bounded poll loop that periodically checks the wakeup flag.
  4. On wakeup, throw or return control so the closed SplitFetcher can exit cleanly.

Conceptually:

while (!wakeUp) {
  T[] entry = pool.pollEntry(Duration.ofSeconds(10));
  if (entry != null) {
    return entry;
  }
}
throw new RuntimeException("Woken up while waiting for array pool entry");

If the supported Flink Pool API does not expose timed polling, Iceberg may need an equivalent wakeable wrapper or a bounded tryPollEntry() loop. The important property is that IcebergSourceSplitReader#wakeUp() must unblock a fetcher parked in the array-pool wait; otherwise Flink’s close path can only wait and warn, leaving the thread and its referenced memory alive.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions