Describe the bug
FairSpillPool divides spillable memory by the number of currently-registered
spillable MemoryConsumers. Combined with two facts about how DataFusion
plans execute, this means the operator actually doing work can be capped far
below its fair share:
SortExec::execute (and other operators that build an ExternalSorter)
register their MemoryConsumer synchronously during execute(),
before the returned stream is ever polled
(sort.rs:1248-1259).
Parent operators call children's execute() synchronously too. So all
spillable consumers in a plan typically register before any data flows.
SortExec is a blocking operator: the first poll on its output stream
runs the entire input-consumption phase before yielding. So in a chain of
blocking spillable operators, only the bottom-most operator is doing work
at any given moment, even though every operator above it is already
registered with the pool.
The result is that the operator currently doing the work is artificially
capped at pool_size / num_spill, where num_spill includes operators
that have not yet started.
Example
Consider a plan with five stacked SortExecs sharing a FairSpillPool sized
so that a single sort fits comfortably at cap == pool_size, but not at
cap == pool_size / 5:
SortExec (top, level 4)
└── SortExec (level 3)
└── SortExec (level 2)
└── SortExec (level 1)
└── SortExec (bottom, level 0)
└── <source>
When the root stream is polled:
- All five
execute() calls run recursively and synchronously, so all five
ExternalSorter consumers register up-front. num_spill == 5.
- The first poll drives the bottom sort's input phase. It is the only
operator doing real work, but its per-reservation cap is pool_size / 5,
so it spills repeatedly.
- As the bottom sort finishes its input phase and unregisters, the next
level up starts its input phase — now under num_spill == 4, with a
larger fair share.
- This continues up the chain. The top sort runs last under
num_spill == 1
and gets the entire pool, never spilling at all.
A representative spill-count distribution from bottom to top looks like:
Even though every level processes the same data shape against the same pool,
the operator that runs first is forced to spill the most, and the operator
that runs last does not spill at all.
Expected behavior
For operators in a pipeline of blocking spillable operators processing
identically-shaped inputs against the same pool, the spill behaviour should
not depend on execution order. The per-reservation fair share should
reflect operators that are currently competing for memory, not all
operators that have been registered but are still waiting their turn.
Possible directions for discussion
(Not a proposal — just to start the conversation.)
One worth-considering reference point is Velox's hierarchical memory pool
structure. Instead of a single flat pool with a counter of registered
consumers, Velox organises memory pools as a tree: a root pool per query,
with child pools per task / pipeline / operator. Each pool tracks its own
reservation and propagates usage up to its parent, and memory arbitration
is performed by walking the tree rather than by dividing a global capacity
by a global consumer count.
I've created a PR that explores the solution with a SubPool for blocking operators. This shows how a MemoryPool hierarchy can be used to allow for more complex scenarios than the current MemoryPools.
To Reproduce
No response
Expected behavior
No response
Additional context
No response
Describe the bug
FairSpillPooldivides spillable memory by the number of currently-registeredspillable
MemoryConsumers. Combined with two facts about how DataFusionplans execute, this means the operator actually doing work can be capped far
below its fair share:
SortExec::execute(and other operators that build anExternalSorter)register their
MemoryConsumersynchronously duringexecute(),before the returned stream is ever polled
(
sort.rs:1248-1259).Parent operators call children's
execute()synchronously too. So allspillable consumers in a plan typically register before any data flows.
SortExecis a blocking operator: the first poll on its output streamruns the entire input-consumption phase before yielding. So in a chain of
blocking spillable operators, only the bottom-most operator is doing work
at any given moment, even though every operator above it is already
registered with the pool.
The result is that the operator currently doing the work is artificially
capped at
pool_size / num_spill, wherenum_spillincludes operatorsthat have not yet started.
Example
Consider a plan with five stacked
SortExecs sharing aFairSpillPoolsizedso that a single sort fits comfortably at
cap == pool_size, but not atcap == pool_size / 5:When the root stream is polled:
execute()calls run recursively and synchronously, so all fiveExternalSorterconsumers register up-front.num_spill == 5.operator doing real work, but its per-reservation cap is
pool_size / 5,so it spills repeatedly.
level up starts its input phase — now under
num_spill == 4, with alarger fair share.
num_spill == 1and gets the entire pool, never spilling at all.
A representative spill-count distribution from bottom to top looks like:
Even though every level processes the same data shape against the same pool,
the operator that runs first is forced to spill the most, and the operator
that runs last does not spill at all.
Expected behavior
For operators in a pipeline of blocking spillable operators processing
identically-shaped inputs against the same pool, the spill behaviour should
not depend on execution order. The per-reservation fair share should
reflect operators that are currently competing for memory, not all
operators that have been registered but are still waiting their turn.
Possible directions for discussion
(Not a proposal — just to start the conversation.)
One worth-considering reference point is Velox's hierarchical memory pool
structure. Instead of a single flat pool with a counter of registered
consumers, Velox organises memory pools as a tree: a root pool per query,
with child pools per task / pipeline / operator. Each pool tracks its own
reservation and propagates usage up to its parent, and memory arbitration
is performed by walking the tree rather than by dividing a global capacity
by a global consumer count.
I've created a PR that explores the solution with a SubPool for blocking operators. This shows how a MemoryPool hierarchy can be used to allow for more complex scenarios than the current MemoryPools.
To Reproduce
No response
Expected behavior
No response
Additional context
No response