Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/coalesce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl LimitedBatchCoalescer {
Ok(())
}

pub(crate) fn is_finished(&self) -> bool {
self.finished
}

/// Return the next completed batch, if any
pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
self.inner.next_completed_batch()
Expand Down
73 changes: 28 additions & 45 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::coalesce::PushBatchStatus::LimitReached;
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
Expand Down Expand Up @@ -711,23 +710,6 @@ impl FilterExecMetrics {
}
}

impl FilterExecStream {
fn flush_remaining_batches(
&mut self,
) -> Poll<Option<std::result::Result<RecordBatch, DataFusionError>>> {
// Flush any remaining buffered batch
match self.batch_coalescer.finish() {
Ok(()) => {
Poll::Ready(self.batch_coalescer.next_completed_batch().map(|batch| {
self.metrics.selectivity.add_part(batch.num_rows());
Ok(batch)
}))
}
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}

pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -767,10 +749,26 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll;
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
// If there is a completed batch ready, return it
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
self.metrics.selectivity.add_part(batch.num_rows());
let poll = Poll::Ready(Some(Ok(batch)));
return self.metrics.baseline_metrics.record_poll(poll);
}

if self.batch_coalescer.is_finished() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the key change -- specifically that on subsequent calls to poll_next() we just drain the coalescer and don't get the next input / try and put it in.

Previously, if the input returns another batch, the FilterExec will try and add it to the BatchCoalescer (which is what triggers the assert)

This new code correctly drains the coalescer state 👍 .

I suspect that we haven't seen this on other queries because most/all the ExecutionPlans that can feed a FilterExec also implement Limit. Thus when the FilterExec calls poll_next on the input, the input returns None and no additional batch is pushed to the BatchCoalescer.

The default batch size for the memory exec means that it will most often only return a single batch.

I can't really figure out why setting the number of target partitions to 1 makes any difference (though I verified it is required to trigger the reproducer)

Perhaps the reason @bert-beyondloops saw this in his system is that you have some custom execution plan (that doesn't implement limit pushdown 🤔 ). This is fine I am just trying to explain why we haven't hit this issue before / in our other tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We indeed specify target partition 1 for some specific case since we know upfront our input is sorted and we do not want overhead of a SortMergeExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the target partition set to 1, you get another physical plan (with multiple partitions) and where the limit is foreseen on another type of Exec.

// If input is done and no batches are ready, return None to signal end of stream.
return Poll::Ready(None);
}

// Attempt to pull the next batch from the input stream.
match ready!(self.input.poll_next_unpin(cx)) {
None => {
self.batch_coalescer.finish()?;
// continue draining the coalescer
}
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
let status = self.predicate.as_ref()
Expand Down Expand Up @@ -802,37 +800,22 @@ impl Stream for FilterExecStream {
})?;
timer.done();

if let LimitReached = status {
poll = self.flush_remaining_batches();
break;
}

if let Some(batch) = self.batch_coalescer.next_completed_batch() {
self.metrics.selectivity.add_part(batch.num_rows());
poll = Poll::Ready(Some(Ok(batch)));
break;
}
continue;
}
None => {
// Flush any remaining buffered batch
match self.batch_coalescer.finish() {
Ok(()) => {
poll = self.flush_remaining_batches();
match status {
PushBatchStatus::Continue => {
// Keep pushing more batches
}
Err(e) => {
poll = Poll::Ready(Some(Err(e)));
PushBatchStatus::LimitReached => {
// limit was reached, so stop early
self.batch_coalescer.finish()?;
// continue draining the coalescer
}
}
break;
}
value => {
poll = Poll::Ready(value);
break;
}

// Error case
other => return Poll::Ready(other),
}
}
self.metrics.baseline_metrics.record_poll(poll)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -871,4 +871,4 @@ DROP TABLE test_limit_with_partitions;

# Tear down src_table table:
statement ok
DROP TABLE src_table;
DROP TABLE src_table;
22 changes: 22 additions & 0 deletions datafusion/sqllogictest/test_files/limit_single_row_batches.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

# minimize batch size to 1 in order to trigger different code paths
statement ok
set datafusion.execution.batch_size = '1';

# ----
# tests with target partition set to 1
# ----
statement ok
set datafusion.execution.target_partitions = '1';


statement ok
CREATE TABLE filter_limit (i INT) as values (1), (2);

query I
SELECT COUNT(*) FROM (SELECT i FROM filter_limit WHERE i <> 0 LIMIT 1);
----
1

statement ok
DROP TABLE filter_limit;