diff --git a/docs/changelog/123197.yaml b/docs/changelog/123197.yaml new file mode 100644 index 0000000000000..ffb4bab79fe8c --- /dev/null +++ b/docs/changelog/123197.yaml @@ -0,0 +1,5 @@ +pr: 123197 +summary: Fix early termination in `LuceneSourceOperator` +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 3d34067e1a839..61a7cbad3e8af 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -140,7 +140,7 @@ public void collect(int doc) throws IOException { @Override public boolean isFinished() { - return doneCollecting; + return doneCollecting || remainingDocs <= 0; } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index b7114bb4e9b54..cba8722a384e3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.test.AnyOperatorTestCase; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.TestResultPageSinkOperator; @@ -117,6 +118,27 @@ public void testShardDataPartitioning() { testSimple(driverContext(), size, limit); } + public void testEarlyTermination() { + int size = between(1_000, 20_000); + int limit = between(10, size); + LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), size, limit, scoring); + try (SourceOperator sourceOperator = factory.get(driverContext())) { + assertFalse(sourceOperator.isFinished()); + int collected = 0; + while (sourceOperator.isFinished() == false) { + Page page = sourceOperator.getOutput(); + if (page != null) { + collected += page.getPositionCount(); + page.releaseBlocks(); + } + if (collected >= limit) { + assertTrue("source operator is not finished after reaching limit", sourceOperator.isFinished()); + assertThat(collected, equalTo(limit)); + } + } + } + } + public void testEmpty() { testSimple(driverContext(), 0, between(10, 10_000)); }