Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ config_namespace! {
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000

/// Should partial hash aggregation repartition and coalesce output locally
/// before sending it to the upstream repartition operator.
pub enable_partial_aggregation_local_repartition: bool, default = true

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
Expand Down
22 changes: 18 additions & 4 deletions datafusion/physical-plan/benches/dictionary_group_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,17 @@ fn bench_intern_emit(c: &mut Criterion) {
new_group_values(schema.clone(), &GroupOrdering::None)
.unwrap(),
Vec::<usize>::with_capacity(size),
Vec::<u64>::with_capacity(size),
)
},
|(gv, groups)| {
gv.intern(std::slice::from_ref(&array), groups).unwrap();
|(gv, groups, hashes)| {
gv.intern(
std::slice::from_ref(&array),
groups,
hashes,
&mut vec![],
)
.unwrap();
black_box(&*groups);
black_box(gv.emit(EmitTo::All).unwrap());
},
Expand Down Expand Up @@ -154,11 +161,18 @@ fn bench_repeated_intern_emit(c: &mut Criterion) {
new_group_values(schema.clone(), &GroupOrdering::None)
.unwrap(),
Vec::<usize>::with_capacity(size),
Vec::<u64>::with_capacity(size),
)
},
|(gv, groups)| {
|(gv, groups, hashes)| {
for arr in &batches {
gv.intern(std::slice::from_ref(arr), groups).unwrap();
gv.intern(
std::slice::from_ref(arr),
groups,
hashes,
&mut vec![],
)
.unwrap();
black_box(&*groups);
}
black_box(gv.emit(EmitTo::All).unwrap());
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/benches/multi_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ fn bench_intern(
batches: &[Vec<ArrayRef>],
groups: &mut Vec<usize>,
) {
let mut hashes = vec![];
for batch in batches {
groups.clear();
gv.intern(batch, groups).unwrap();
gv.intern(batch, groups, &mut hashes, &mut vec![]).unwrap();
}
black_box(&*groups);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
group_by: Arc::clone(&agg.group_by),
group_values,
batch_group_indices: Default::default(),
batch_hashes: Default::default(),
group_hashes: Default::default(),
new_group_rows: Default::default(),
accumulators,
}),
_mode: PhantomData,
Expand Down Expand Up @@ -181,8 +184,11 @@ impl<AggrMode> AggregateHashTable<AggrMode> {

acc + state.group_values.size()
+ state.batch_group_indices.allocated_size()
+ state.batch_hashes.allocated_size()
+ state.group_hashes.allocated_size()
+ state.new_group_rows.allocated_size()
}
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
AggregateHashTableState::OutputtingMaterialized(output) => {
output.memory_size()
}
AggregateHashTableState::Done => 0,
Expand Down Expand Up @@ -212,14 +218,19 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
state.batch_group_indices = Vec::new();
self.state = AggregateHashTableState::Outputting(state);
}
}

pub(super) fn emit_to_for_batch_size(batch_size: usize, group_count: usize) -> EmitTo {
debug_assert!(batch_size > 0);
if group_count <= batch_size {
EmitTo::All
} else {
EmitTo::First(batch_size)
pub(super) fn emit_next_materialized_batch(
&mut self,
mut output: MaterializedOutput,
batch_size: usize,
) -> Option<RecordBatch> {
let batch = output.next_batch(batch_size);
if output.is_exhausted() {
self.state = AggregateHashTableState::Done;
} else {
self.state = AggregateHashTableState::OutputtingMaterialized(output);
}
batch
}
}

Expand Down Expand Up @@ -292,6 +303,15 @@ pub(super) struct AggregateHashTableBuffer {
/// accumulator to update that group's aggregate state.
pub(super) batch_group_indices: Vec<usize>,

/// Hash for each row in the current input batch.
pub(super) batch_hashes: Vec<u64>,

/// Hash for each accumulated group.
pub(super) group_hashes: Vec<u64>,

/// Input rows that created new groups in the current input batch.
pub(super) new_group_rows: Vec<usize>,

/// One item per aggregate expression.
///
/// Example: `COUNT(x), SUM(y)` creates two items. Each item owns the input
Expand All @@ -304,24 +324,21 @@ pub(super) enum AggregateHashTableState {
Building(AggregateHashTableBuffer),
/// Emitting results directly from group keys and aggregate state.
Outputting(AggregateHashTableBuffer),
/// Materialize all the output results, and then incrementally output in the `OutputtingMaterializedFinal` state.
///
/// Note this is a temporary solution until the `GroupValues` issue is solved:
/// Issue: <https://github.com/apache/datafusion/issues/23178>
OutputtingMaterializedFinal(MaterializedFinalOutput),
/// Materialized output rows sliced across output polls.
OutputtingMaterialized(MaterializedOutput),
Done,
}

/// Fully evaluated final aggregate output and the next row offset to emit.
/// Materialized aggregate output and the next row offset to emit.
///
/// Final aggregate evaluation consumes accumulator state, so final output is
/// materialized once and then sliced to honor `batch_size` across output polls.
pub(super) struct MaterializedFinalOutput {
/// Some output paths can only emit all rows from their backing state at once.
/// The materialized batch is sliced to honor `batch_size` across output polls.
pub(super) struct MaterializedOutput {
batch: RecordBatch,
offset: usize,
}

impl MaterializedFinalOutput {
impl MaterializedOutput {
pub(super) fn new(batch: RecordBatch) -> Self {
Self { batch, offset: 0 }
}
Expand Down Expand Up @@ -495,8 +512,10 @@ mod tests {

use super::*;

// Covers materialized output slicing until all rows are emitted.
// Example: a five-row batch with batch size two emits 2, 2, then 1 row.
#[test]
fn materialized_final_output_slices_batches_until_exhausted() -> Result<()> {
fn test_materialized_output_slices_batches_until_exhausted() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"group_col",
DataType::Int32,
Expand All @@ -506,7 +525,7 @@ mod tests {
schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
)?;
let mut output = MaterializedFinalOutput::new(batch);
let mut output = MaterializedOutput::new(batch);

assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![1, 2]);
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![3, 4]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::aggregates::AggregateExec;

use super::common::{
AggregateHashTable, AggregateHashTableBuffer, AggregateHashTableState, FinalMarker,
MaterializedFinalOutput,
MaterializedOutput,
};

/// Methods specific to the aggregate hash table used in the final aggregation stage.
Expand Down Expand Up @@ -68,7 +68,7 @@ impl AggregateHashTable<FinalMarker> {
let output = self.materialize_final_output(state, output_schema)?;
Ok(self.emit_next_materialized_batch(output, batch_size))
}
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
AggregateHashTableState::OutputtingMaterialized(output) => {
Ok(self.emit_next_materialized_batch(output, batch_size))
}
AggregateHashTableState::Done => Ok(None),
Expand All @@ -82,7 +82,7 @@ impl AggregateHashTable<FinalMarker> {
&self,
mut state: AggregateHashTableBuffer,
output_schema: SchemaRef,
) -> Result<MaterializedFinalOutput> {
) -> Result<MaterializedOutput> {
// Final aggregate evaluation consumes accumulator state. Evaluate all
// groups once, then slice the materialized batch on subsequent polls.
let emit_to = EmitTo::All;
Expand All @@ -96,21 +96,7 @@ impl AggregateHashTable<FinalMarker> {

let batch = RecordBatch::try_new(output_schema, output)?;
debug_assert!(batch.num_rows() > 0);
Ok(MaterializedFinalOutput::new(batch))
}

fn emit_next_materialized_batch(
&mut self,
mut output: MaterializedFinalOutput,
batch_size: usize,
) -> Option<RecordBatch> {
let batch = output.next_batch(batch_size);
if output.is_exhausted() {
self.state = AggregateHashTableState::Done;
} else {
self.state = AggregateHashTableState::OutputtingMaterializedFinal(output);
}
batch
Ok(MaterializedOutput::new(batch))
}

pub(in crate::aggregates) fn aggregate_batch(
Expand All @@ -122,9 +108,12 @@ impl AggregateHashTable<FinalMarker> {

let timer = self.group_by_metrics.aggregation_time.timer();
for group_values in &evaluated_batch.grouping_set_args {
state
.group_values
.intern(group_values, &mut state.batch_group_indices)?;
state.group_values.intern(
group_values,
&mut state.batch_group_indices,
&mut state.batch_hashes,
&mut state.new_group_rows,
)?;
let group_indices = &state.batch_group_indices;
let total_num_groups = state.group_values.len();

Expand Down
Loading
Loading