-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
During the initialization of cursors of SPM (SortPreservingMerge), it needs to get Poll::Ready() for each partition, to initiate the next partition cursor.
datafusion/datafusion/physical-plan/src/sorts/merge.rs
Lines 161 to 166 in 8db30e2
| for i in 0..self.streams.partitions() { | |
| if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) { | |
| self.aborted = true; | |
| return Poll::Ready(Some(Err(e))); | |
| } | |
| } |
We have experienced a scenario in our downstream where SPM is polling 1st partition continuously but never get a
Poll::Ready(). Those polls get all upstream operator buffers and channels grow. However, if SPM have skipped that partition and tried to initiate other partitions before all those retrials, we did not experience this problem (congestion in partition 1 is cleared once the other partitions are pulled).
Describe the solution you'd like
SPM does not wait initiating the same cursor partition. Instead, it iterates over all partitions continuously whether it is OK or not, until they are all successfully initiated.
Describe alternatives you've considered
No response
Additional context
I have a test which actually needs to pass:
https://github.com/synnada-ai/datafusion-upstream/blob/eb068a07020c44f5212e087269390652bd47e397/datafusion/core/tests/fuzz_cases/merge_fuzz.rs#L333
It simply tries to collect results of a plan which consist of an SPM and a mock source exec. That mock source returns pending from the 1st partition until the 2nd partition is polled. Once the 2nd is polled, 1st one also starts to send its results. In the current version of poll_next_inner() of SortPreservingMergeStream, once the maybe_poll_stream() returns s a pending for the first partition, it never enters poll_next_inner() again.