Skip to content

Make BatchPartitioner::partition_iter public for downstream async consumers #21311

@hcrosse

Description

@hcrosse

Is your feature request related to a problem or challenge?

BatchPartitioner::partition takes a sync FnMut closure, which means consumers that need to do I/O with the partitioned batches have to do it inline. In Ballista's shuffle writer, this blocks tokio worker threads because file I/O happens inside the closure.

The workaround is to move the entire partition call into spawn_blocking, but that also moves the CPU-bound partitioning work off the tokio workers, which is wasteful.

partition_iter already exists as a private method and returns an iterator over (partition_index, RecordBatch) pairs. DataFusion's own RepartitionExec uses it directly to iterate results and send them through async channels. The doc comment on the method says this separation was intentional:

"we need to have a variant of partition that works w/ sync functions, and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve this"

But since partition_iter is private, downstream crates can't use this pattern.

Describe the solution you'd like

Make partition_iter public (or add a public equivalent). The signature is already suitable:

pub fn partition_iter(
    &mut self,
    batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_>

This lets consumers partition on the async side and only push the I/O into spawn_blocking:

// async side
for (partition, batch) in partitioner.partition_iter(input_batch)? {
    tx.send((partition, batch)).await?;
}

// blocking side
while let Some((partition, batch)) = rx.blocking_recv() {
    writers[partition].write(&batch)?;
}

Describe alternatives you've considered

In apache/datafusion-ballista#1537 we moved both partitioning and I/O into spawn_blocking together. It works, but it's leaving performance on the table by running CPU work on the blocking pool.

Additional context

Ballista PR: apache/datafusion-ballista#1537

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions