diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java index 68539563351a6..db6c1018397a9 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java @@ -122,8 +122,13 @@ public RecordAndPosition next() { if (reachLimit()) { return null; } - numRead.incrementAndGet(); - return iterator.next(); + + RecordAndPosition ret = iterator.next(); + if (ret != null) { + numRead.incrementAndGet(); + } + + return ret; } @Override diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java index 69f7288493abe..92a1d22074c4b 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java @@ -19,9 +19,11 @@ package org.apache.flink.table.filesystem; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.impl.StreamFormatAdapter; import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.connector.file.src.reader.StreamFormat; import org.apache.flink.connector.file.src.reader.TextLineFormat; import org.apache.flink.connector.file.src.util.Utils; import org.apache.flink.core.fs.Path; @@ -65,4 +67,36 @@ public void test() throws IOException { Utils.forEachRemaining(reader, s -> i.incrementAndGet()); Assert.assertEquals(22, i.get()); } + + @Test + public void testLimitOverBatches() throws IOException { + // prepare file + File file = TEMP_FOLDER.newFile(); + file.createNewFile(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + builder.append(i).append("\n"); + } + FileUtils.writeFileUtf8(file, builder.toString()); + + // set limit + Long limit = 2048L; + + // configuration for small batches + Configuration conf = new Configuration(); + conf.set(StreamFormat.FETCH_IO_SIZE, MemorySize.parse("4k")); + + // read + BulkFormat format = + LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), limit); + + BulkFormat.Reader reader = + format.createReader( + conf, new FileSourceSplit("id", new Path(file.toURI()), 0, file.length())); + + // check + AtomicInteger i = new AtomicInteger(0); + Utils.forEachRemaining(reader, s -> i.incrementAndGet()); + Assert.assertEquals(limit.intValue(), i.get()); + } }