Skip to content

[Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split path.#7991

Open
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:fix_readoperator_bug_0527
Open

[Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split path.#7991
wwj6591812 wants to merge 1 commit into
apache:masterfrom
wwj6591812:fix_readoperator_bug_0527

Conversation

@wwj6591812
Copy link
Copy Markdown
Contributor

@wwj6591812 wwj6591812 commented May 27, 2026

Problem

With scan.dedicated-split-generation=true, LIMIT N returns correct results but scans the entire ORC file. Flink UI shows the Limit stage ~19s for 10 records.

/*+ config(job_name=hl_0527, cluster=hongli-duibi-0528) */ 
select
  *
from
  `PAIMON-MAINSE-DUMP-EA-20250514`.`PAIMON-MAINSE-DUMP-EA-20250514_MAINSE_DUMP`.`APP10000004119_MAINSE_PAIMON_D20260526194436`
  /*+ `OPTIONS`('scan.dedicated-split-generation'='true', 'parallelism' = '1') */
limit
  10;

A related core-layer fix is in #7994: ApplyBitmapIndexRecordReader does not signal reader exhaustion after the bitmap selection is consumed, so RecordReaderIterator keeps calling readBatch() until EOF. That PR fixes the root cause for all toCloseableIterator() / forEachRemaining() callers. This PR (#7991) fixes the same symptom on the Flink dedicated split path by stopping ReadOperator from calling hasNext() after the limit is reached.

Root Cause --- Short Summary

When scan.dedicated-split-generation=true, ReadOperator.processElement() uses while (iterator.hasNext()) with reachLimit() checked inside the loop body. That does not stop the extra hasNext() after N rows, because hasNext() is evaluated in the while condition before the body runs — including reachLimit(). Even after 10 rows are emitted, the loop enters a new iteration and calls hasNext() first, which triggers RecordReaderIterator.advanceIfNeeded() and repeatedly calls RecordReader.readBatch() until EOF.

The old reachLimit() also could not reliably stop at N: it used numRecordsIn > limit (not >=), and skipped numRecordsIn.inc() on the first row — so after emitting 10 rows, numRecordsIn was still 9 and reachLimit() returned false anyway.

Although ApplyBitmapIndexRecordReader (from ReadBuilder.withLimit(N)) stops yielding rows after N, the iterator layer keeps calling readBatch(). The FLIP-27 path avoids this via RecordLimiter in FileStoreSourceSplitReader.fetch(). This PR applies the same pattern in ReadOperator.

Root Cause --- Detailed explanation

1. Read path when dedicated-split-generation=true

Flink SQL LIMIT N is pushed down to Paimon in two places:

  • ReadBuilder.withLimit(N) — may wrap the storage reader (e.g. FileIndexEvaluatorApplyBitmapIndexRecordReader). Because file-index.read.enabled defaults to true, a query with only LIMIT N (no filter) is pushed down as a bitmap selection, and the ORC reader is wrapped by ApplyBitmapIndexRecordReader.
  • ReadOperator(limit=N) — reads splits on the dedicated path (MonitorSource → shuffle → ReadOperator).

When the wrapper returns null after row N, RecordReaderIterator treats it as batch exhaustion and keeps calling readBatch() until EOF.

The slow case is in ReadOperator.processElement(), which reads via:

TableRead.createReader(split).toCloseableIterator()  →  RecordReaderIterator

2. What the old loop did

Previously, ReadOperator.processElement() used:

while (iterator.hasNext()) {          // (A) hasNext() runs FIRST
    if (firstRecord) { ... }
    else { numRecordsIn.inc(); }      // (B) first row is NOT counted

    if (reachLimit()) return;         // (C) reachLimit() runs AFTER (A)

    output.collect(iterator.next());  // (D) emit
}

Execution order on each iteration: (A) → (B) → (C) → (D).


3. Why reachLimit() inside the loop does not prevent the extra hasNext()

A common question: after 10 rows are emitted and numRecordsIn.inc() has run, shouldn't reachLimit() return true and stop the loop?

No — for two reasons:

hasNext() runs before reachLimit()

The limit check is in the loop body, but hasNext() is in the while condition. After the 10th row is emitted, the loop starts iteration 11:

Step 1: while (iterator.hasNext())   ← runs FIRST, triggers advanceIfNeeded()
Step 2: numRecordsIn.inc()
Step 3: if (reachLimit()) return     ← too late; hasNext() already ran
Step 4: output.collect(...)

So even if reachLimit() would eventually return true, hasNext() has already been called and the expensive scan may have started.

② The old reachLimit() condition was wrong for this purpose

// old ReadOperator.reachLimit()
return limit != null && numRecordsIn.getCount() > limit;  // '>', not '>='

For limit = 10, after emitting 10 rows:

Row emitted numRecordsIn after inc() numRecordsIn > 10
1st 0 (first row skipped) false
2nd–10th 1–9 false

After the 10th row, numRecordsIn is 9, not 10 — so reachLimit() is still false. The loop proceeds to call hasNext() again.


4. Why one extra hasNext() scans the whole file

RecordReaderIterator.advanceIfNeeded() does:

currentResult = currentIterator.next();
if (currentResult == null) {
    currentIterator.releaseBatch();
    currentIterator = reader.readBatch();  // repeat until EOF
}

With withLimit(N), ApplyBitmapIndexFileRecordIterator.next() returns null once position > last. RecordReaderIterator treats that as batch exhaustion and keeps calling readBatch() until EOF.

Arthas on LIMIT 10:

RecordReaderIterator.advanceIfNeeded()  cost ≈ 11.2s

This matches the ~19s Limit stage in Flink UI.


5. Why the non-dedicated path does not hit this

FileStoreSourceSplitReader.fetch() checks the limit before readBatch():

nextBatch = reachLimit() ? null : currentReader.recordReader().readBatch();

ReadOperator had no equivalent guard before hasNext().


Evidence

Flink UI: Limit operator ~19s, 10 records.
image

Arthas:

[arthas@1]$ trace org.apache.paimon.reader.RecordReaderIterator advanceIfNeeded '#cost>500' -n 5
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 73 ms, listenerId: 40
`---ts=2026-05-27 16:52:28.536;thread_name=app10000004119_mainse_paimon_d20260526194436[127764] -> Limit[127765] (1/1)#0;id=778;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@5d9e511d
    `---[18665.861763ms] org.apache.paimon.reader.RecordReaderIterator:advanceIfNeeded()
        +---[0.00% min=0.003656ms,max=0.017421ms,total=0.61503ms,count=96] org.apache.paimon.reader.RecordReader$RecordIterator:next() #76
        +---[0.00% min=0.002026ms,max=0.059614ms,total=0.502581ms,count=96] org.apache.paimon.reader.RecordReader$RecordIterator:releaseBatch() #80
        `---[99.99% min=0.060719ms,max=1588.668747ms,total=18664.03164ms,count=96] org.apache.paimon.reader.RecordReader:readBatch() #84

trace RecordReaderIterator.advanceIfNeeded shows a single call (~18.7s) containing 96 invocations of RecordReader.readBatch() (and 96 next()/releaseBatch() pairs), confirming the iterator loops until EOF after the limit bitmap stops yielding rows.

Fix

Align ReadOperator with FileStoreSourceSplitReader:

  1. Use RecordLimiter — increment after each emit (1 row = 1 count), with counter >= limit.
  2. Check limit before hasNext() via && short-circuit:
while (!reachLimit() && iterator.hasNext()) {
    output.collect(...);
    recordLimiter.increment();   // after emit; reachLimit() true on next iteration
}

private boolean reachLimit() {
    return recordLimiter != null && recordLimiter.reachLimit();
}

When recordLimiter.reachLimit() is true, iterator.hasNext() is not evaluated, so RecordReaderIterator.advanceIfNeeded() is never called and the full-file ORC scan is avoided.

Testing

  • DedicatedSplitReadLimitTest — 100-row split, LIMIT 10 → 10 rows, readBatch called once
  • OperatorSourceTest.testReadOperatorWithLimit — limit=2 → exactly 2 rows
  • BatchFileStoreITCase.testDedicatedPathLimitTenOnManyRows — 100 rows INSERT, LIMIT 10 → 10 rows
mvn test -pl paimon-flink/paimon-flink-common \
  -Dtest=DedicatedSplitReadLimitTest,OperatorSourceTest#testReadOperatorWithLimit,BatchFileStoreITCase#testDedicatedPathLimitTenOnManyRows

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from code review.

Moving the limiter state to the operator level and checking it before iterator.hasNext() fixes the dedicated split path correctly: once the emitted-record limit is reached, the short-circuit prevents RecordReaderIterator.advanceIfNeeded() from opening more batches. The updated OperatorSourceTest expectation also matches the intended semantics that the limit is enforced on emitted rows, not on the input split counter.

I tried to run the focused Flink tests locally, but this environment could not resolve the current 1.5-SNAPSHOT artifacts (paimon-bundle, paimon-service-client, paimon-service-runtime), so this approval is based on code review only.

@@ -68,6 +69,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove numRecordsIn?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants