Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Fetch Support to CoalesceBatchesExec #9792

Closed
berkaysynnada opened this issue Mar 25, 2024 · 11 comments · Fixed by #11652
Closed

Adding Fetch Support to CoalesceBatchesExec #9792

berkaysynnada opened this issue Mar 25, 2024 · 11 comments · Fixed by #11652
Labels
enhancement New feature or request

Comments

@berkaysynnada
Copy link
Contributor

berkaysynnada commented Mar 25, 2024

Is your feature request related to a problem or challenge?

EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Filter: sink_table.c3 > Int16(0)
----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

The example query in repartition.slt waits until the target_batch_size of CoalesceBatchesExec fills. That causes a delay in the observation of the query result. We can push-down limit into CoalesceBatchesExec here.

Describe the solution you'd like

There exists a similar rule in logical planning. We can have a physical optimizer rule that pushes down the limit count until facing with some limit breaker operators (joins, windows, sorts). Once the limit hits a CoalesceBatchesExec before that, it can set a new requested batch size.

Describe alternatives you've considered

No response

Additional context

No response

@berkaysynnada berkaysynnada added the enhancement New feature or request label Mar 25, 2024
@Lordworms
Copy link
Contributor

I can do this one

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

Cross posting from #9815 as I am not sure about this proposal

It seems like adding a limit to CoalesceBatches seems like somewhat of a workaround for a limit in StreamingTableExec -- it seems like if we handled the limit in StreamingTableExec then

  1. It could be more efficient as the StreamingTableExec could stop as soon as the limit was hit
  2. We would not need any changes to CoalesceBatches
EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Filter: sink_table.c3 > Int16(0)
----TableScan: sink_table projection=[c1, c2, c3].     <--- Why not apply the LIMIT here?
physical_plan
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1,      <---if the LIMIT was applied here there would be no need to apply it later?

@berkaysynnada
Copy link
Contributor Author

Cross posting from #9815 as I am not sure about this proposal

It seems like adding a limit to CoalesceBatches seems like somewhat of a workaround for a limit in StreamingTableExec -- it seems like if we handled the limit in StreamingTableExec then

  1. It could be more efficient as the StreamingTableExec could stop as soon as the limit was hit
  2. We would not need any changes to CoalesceBatches

We can of course inform the StreamingTableExec about how many rows it needs to read for a Limit query. It enhances our position but still CoalesceBatches may need a fetch value. CoalesceBatches fetch count is directly related with the Limit operator above it. However, StreamingTableExec have to read more value than that limit fetch count. Am I missing something?

Limit: fetch=5
--CoalesceBatches: target_size=1000
----Aggregate: to produce 5 rows, needs 500 rows
------StreamingTableExec

Assuming the plan above, I think CoalesceBatches must know fetch count.

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

What I was suggesting is that

Limit: fetch=5
--CoalesceBatches: target_size=1000
----Aggregate: to produce 5 rows, needs 500 rows
------StreamingTableExec fetch=5.     <--- if there was a fetch=5 here, there is no need to limit in CoalesceBatches
                                           as it would see end of stream after `StreamingTableExec` is done

@berkaysynnada
Copy link
Contributor Author

<--- if there was a fetch=5 here, there is no need to limit in CoalesceBatches
as it would see end of stream after StreamingTableExec is done

Aggregate would need 500 rows to produce 5 rows, but we don't know that until 501st row comes to the Aggregate. So, we cannot limit the StreamingTableExec here during planning, but limit the CoalesceBatches.

@Lordworms
Copy link
Contributor

Yeah, in my implementation process, I was planning to directly pass fetch to StreamTableExec but found out the number should pass via CoalesceBatchesExec.

@alamb
Copy link
Contributor

alamb commented Apr 4, 2024

I see -- thank you @berkaysynnada #9792 (comment) makes sense

Something still feels a little off with limiting in CoalesceBatches as it seems it would always be better to do the fetch below that ExecutionPlan

For example, in this plan it seems like it would be best to have the Aggregate stop after 5 rows:

Limit: fetch=5
--CoalesceBatches: target_size=1000
----Aggregate: to produce 5 rows, needs 500 rows <--- should stop after it has created 5 rows. 
------StreamingTableExec

This looks like there is something similar:
https://github.com/apache/arrow-datafusion/blob/63888e853b7b094f2f47f53192a94f38327f5f5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L272-L276

@Lordworms
Copy link
Contributor

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

@Dandandan
Copy link
Contributor

Dandandan commented Apr 7, 2024

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

Even better would be if every operator accept fetch, like @alamb suggests for Aggregate.

I wonder for the purpose of this ticket, we can also put limit below CoalesceBatchesExec instead of after.

CoalesceBatches: target_size=1000
--Limit fetch=5
----Aggregate:
------StreamingTableExec

@berkaysynnada
Copy link
Contributor Author

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

Even better would be if every operator accept fetch, like @alamb suggests for Aggregate.

If every operators accept fetch, I guess there will be no need for LimitExec's at the final plan. It may get plans more complicated. There should be a few operator affected by internal fetching mechanism, and maybe adding them that support could be more straightforward.

@alamb
Copy link
Contributor

alamb commented Apr 15, 2024

If every operators accept fetch, I guess there will be no need for LimitExec's at the final plan. It may get plans more complicated. There should be a few operator affected by internal fetching mechanism, and maybe adding them that support could be more straightforward.

well said: I think this is exactly the tradeoff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
4 participants