diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 465fc86cfbee8..75f127ffee897 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -121,3 +121,7 @@ required-features = ["test_utils"] [[bench]] harness = false name = "dictionary_group_values" + +[[bench]] +harness = false +name = "single_group_by_primitive" diff --git a/datafusion/physical-plan/benches/single_group_by_primitive.rs b/datafusion/physical-plan/benches/single_group_by_primitive.rs new file mode 100644 index 0000000000000..e189065a2cbc1 --- /dev/null +++ b/datafusion/physical-plan/benches/single_group_by_primitive.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks comparing hash-based `GroupValuesPrimitive` vs direct-indexed +//! `GroupValuesFlatPrimitive` for single-column integer GROUP BY. +//! +//! Measures only `GroupValues::intern()` — construction and emit are excluded +//! from timing to isolate the group lookup hot path. + +use arrow::array::{ArrayRef, UInt64Array}; +use arrow::datatypes::{DataType, UInt64Type}; +use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_physical_plan::aggregates::group_values::GroupValues; +use datafusion_physical_plan::aggregates::group_values::single_group_by::{ + flat_primitive::GroupValuesFlatPrimitive, primitive::GroupValuesPrimitive, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const SEED: u64 = 0xBEEF; +const NUM_ROWS: usize = 1_000_000; +const BATCH_SIZE: usize = 8192; + +fn generate_batches( + num_groups: u64, + num_rows: usize, + batch_size: usize, +) -> Vec { + let mut rng = StdRng::seed_from_u64(SEED); + let mut batches = Vec::new(); + let mut remaining = num_rows; + + while remaining > 0 { + let this_batch = remaining.min(batch_size); + let values: Vec = (0..this_batch) + .map(|_| rng.random_range(0..num_groups)) + .collect(); + batches.push(Arc::new(UInt64Array::from(values)) as ArrayRef); + remaining -= this_batch; + } + batches +} + +fn bench_intern( + gv: &mut Box, + batches: &[ArrayRef], + groups: &mut Vec, +) { + for batch in batches { + gv.intern(std::slice::from_ref(batch), groups).unwrap(); + } + black_box(&*groups); +} + +/// Group count sweep: measures intern() only, construction in setup. +/// Tests how both implementations scale with increasing group cardinality. +fn bench_group_count(c: &mut Criterion) { + let group_counts: Vec = vec![10, 100, 1_000, 10_000, 100_000]; + + let mut group = c.benchmark_group("single_group_by_u64"); + group.sample_size(20); + + for &num_groups in &group_counts { + let batches = generate_batches(num_groups, NUM_ROWS, BATCH_SIZE); + + group.bench_with_input( + BenchmarkId::new("hash", num_groups), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesPrimitive::::new( + DataType::UInt64, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + + group.bench_with_input( + BenchmarkId::new("flat", num_groups), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesFlatPrimitive::::new( + DataType::UInt64, + 0, + num_groups - 1, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + } + group.finish(); +} + +/// Density sweep: fixed 10K distinct groups, varying the key range. +/// Tests how flat degrades as the array becomes sparser. +fn bench_density(c: &mut Criterion) { + let mut group = c.benchmark_group("single_group_by_u64_density"); + group.sample_size(20); + + let num_distinct: u64 = 10_000; + let densities: Vec<(f64, u64)> = vec![ + (1.0, num_distinct), + (0.5, num_distinct * 2), + (0.1, num_distinct * 10), + ]; + + for (density, range) in &densities { + let mut rng = StdRng::seed_from_u64(SEED); + let mut keys: Vec = Vec::new(); + let mut seen = std::collections::HashSet::new(); + while keys.len() < num_distinct as usize { + let k = rng.random_range(0..*range); + if seen.insert(k) { + keys.push(k); + } + } + + let mut batches = Vec::new(); + let mut remaining = NUM_ROWS; + while remaining > 0 { + let this_batch = remaining.min(BATCH_SIZE); + let values: Vec = (0..this_batch) + .map(|_| keys[rng.random_range(0..keys.len())]) + .collect(); + batches.push(Arc::new(UInt64Array::from(values)) as ArrayRef); + remaining -= this_batch; + } + + let label = format!("density_{:.0}pct", density * 100.0); + + group.bench_with_input( + BenchmarkId::new("hash", &label), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesPrimitive::::new( + DataType::UInt64, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + + group.bench_with_input( + BenchmarkId::new("flat", &label), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesFlatPrimitive::::new( + DataType::UInt64, + 0, + range - 1, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + } + group.finish(); +} + +/// Row count scaling: fixed 10K groups, varying input size. +/// Shows how the per-row cost difference compounds. +fn bench_row_scaling(c: &mut Criterion) { + let mut group = c.benchmark_group("single_group_by_u64_rows"); + group.sample_size(10); + + let num_groups: u64 = 10_000; + + for num_rows in [1_000_000, 5_000_000, 10_000_000] { + let batches = generate_batches(num_groups, num_rows, BATCH_SIZE); + + group.bench_with_input( + BenchmarkId::new("hash", format!("{num_rows}_rows")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesPrimitive::::new( + DataType::UInt64, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + + group.bench_with_input( + BenchmarkId::new("flat", format!("{num_rows}_rows")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + Box::new(GroupValuesFlatPrimitive::::new( + DataType::UInt64, + 0, + num_groups - 1, + )) as Box, + Vec::::with_capacity(BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + BatchSize::LargeInput, + ); + }, + ); + } + group.finish(); +} + +criterion_group!(benches, bench_group_count, bench_density, bench_row_scaling); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..7d71f1b1464eb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -31,7 +31,7 @@ use datafusion_expr::EmitTo; pub mod multi_group_by; mod row; -mod single_group_by; +pub mod single_group_by; use datafusion_physical_expr::binary_map::OutputType; use multi_group_by::GroupValuesColumn; use row::GroupValuesRows; diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/flat_primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/flat_primitive.rs new file mode 100644 index 0000000000000..93eba11d9d707 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/flat_primitive.rs @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, NullBufferBuilder, PrimitiveArray, cast::AsArray, +}; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_expr::EmitTo; +use num_traits::AsPrimitive; +use std::mem::size_of; +use std::sync::Arc; + +/// Sentinel value indicating an unoccupied slot in the flat array. +const EMPTY: u32 = u32::MAX; + +/// A [`GroupValues`] implementation using direct array indexing for integer-typed +/// GROUP BY columns with a known, bounded value range. +/// +/// Instead of hashing each key and probing a hash table, this computes +/// `index = key_as_u64.wrapping_sub(offset)` to directly index into a flat array, +/// yielding O(1) group lookups with no hashing, no collisions, and excellent +/// cache locality for small ranges. +/// +/// Inspired by the `ArrayMap` used for perfect hash joins (see `joins/array_map.rs`). +pub struct GroupValuesFlatPrimitive +where + T::Native: AsPrimitive, +{ + data_type: DataType, + /// Maps `key - offset` -> group_id (u32). EMPTY means no group assigned yet. + slots: Vec, + /// The minimum value (as u64 via two's complement) used as the base offset. + offset: u64, + /// The group index assigned to NULL values, if any. + null_group: Option, + /// Ordered group values for emit. `values[group_id]` is the original key. + values: Vec, +} + +impl GroupValuesFlatPrimitive +where + T::Native: AsPrimitive, +{ + /// Creates a new flat-indexed GroupValues. + /// + /// `min_val` and `max_val` define the key range (as u64 via two's complement cast). + /// The allocated flat array has `max_val - min_val + 1` slots. + pub fn new(data_type: DataType, min_val: u64, max_val: u64) -> Self { + assert!(PrimitiveArray::::is_compatible(&data_type)); + let range = max_val.wrapping_sub(min_val); + let size = (range as usize) + 1; + Self { + data_type, + slots: vec![EMPTY; size], + offset: min_val, + null_group: None, + values: Vec::new(), + } + } + + /// Returns the range (number of slots) of this flat map. + pub fn range(&self) -> usize { + self.slots.len() + } +} + +impl GroupValues for GroupValuesFlatPrimitive +where + T::Native: AsPrimitive + Default + Copy + Send, +{ + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + groups.clear(); + + let arr = cols[0].as_primitive::(); + + if arr.null_count() == 0 { + for i in 0..arr.len() { + // SAFETY: null_count == 0 guarantees all values are valid + let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); + let idx = key.wrapping_sub(self.offset) as usize; + debug_assert!(idx < self.slots.len()); + + let slot = unsafe { self.slots.get_unchecked_mut(idx) }; + let group_id = if *slot == EMPTY { + let g = self.values.len() as u32; + *slot = g; + self.values.push(unsafe { arr.value_unchecked(i) }); + g as usize + } else { + *slot as usize + }; + groups.push(group_id); + } + } else { + for i in 0..arr.len() { + let group_id = if arr.is_null(i) { + *self.null_group.get_or_insert_with(|| { + let g = self.values.len(); + self.values.push(Default::default()); + g + }) + } else { + let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); + let idx = key.wrapping_sub(self.offset) as usize; + debug_assert!(idx < self.slots.len()); + + let slot = unsafe { self.slots.get_unchecked_mut(idx) }; + if *slot == EMPTY { + let g = self.values.len() as u32; + *slot = g; + self.values.push(unsafe { arr.value_unchecked(i) }); + g as usize + } else { + *slot as usize + } + }; + groups.push(group_id); + } + } + Ok(()) + } + + fn size(&self) -> usize { + self.slots.len() * size_of::() + + self.values.capacity() * size_of::() + } + + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn len(&self) -> usize { + self.values.len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + let array: PrimitiveArray = match emit_to { + EmitTo::All => { + self.slots.fill(EMPTY); + let null_idx = self.null_group.take(); + let values = std::mem::take(&mut self.values); + build_primitive::(values, null_idx) + } + EmitTo::First(n) => { + // Shift all slot references down by n, remove those < n + for slot in self.slots.iter_mut() { + if *slot != EMPTY { + let g = *slot as usize; + if g < n { + *slot = EMPTY; + } else { + *slot = (g - n) as u32; + } + } + } + let null_group = match &mut self.null_group { + Some(v) if *v >= n => { + *v -= n; + None + } + Some(_) => self.null_group.take(), + None => None, + }; + let mut split = self.values.split_off(n); + std::mem::swap(&mut self.values, &mut split); + build_primitive::(split, null_group) + } + }; + + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) + } + + fn clear_shrink(&mut self, _num_rows: usize) { + self.slots.fill(EMPTY); + self.values.clear(); + self.null_group = None; + } +} + +fn build_primitive( + values: Vec, + null_idx: Option, +) -> PrimitiveArray { + let nulls = null_idx.map(|null_idx| { + let mut buffer = NullBufferBuilder::new(values.len()); + buffer.append_n_non_nulls(null_idx); + buffer.append_null(); + buffer.append_n_non_nulls(values.len() - null_idx - 1); + buffer.finish().unwrap() + }); + PrimitiveArray::::new(values.into(), nulls) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, UInt64Array}; + use arrow::datatypes::{Int32Type, UInt64Type}; + + #[test] + fn test_basic_interning() { + let mut gv = GroupValuesFlatPrimitive::::new(DataType::Int32, 0, 9); + let arr: ArrayRef = Arc::new(Int32Array::from(vec![3, 1, 3, 7, 1])); + let mut groups = Vec::new(); + gv.intern(&[arr], &mut groups).unwrap(); + // First time seeing 3 -> group 0, first 1 -> group 1, 3 again -> 0, 7 -> group 2, 1 again -> 1 + assert_eq!(groups, vec![0, 1, 0, 2, 1]); + assert_eq!(gv.len(), 3); + } + + #[test] + fn test_with_nulls() { + let mut gv = GroupValuesFlatPrimitive::::new(DataType::Int32, 0, 9); + let arr: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(2), + None, + Some(2), + None, + Some(5), + ])); + let mut groups = Vec::new(); + gv.intern(&[arr], &mut groups).unwrap(); + // 2 -> group 0, null -> group 1, 2 -> 0, null -> 1, 5 -> group 2 + assert_eq!(groups, vec![0, 1, 0, 1, 2]); + assert_eq!(gv.len(), 3); + } + + #[test] + fn test_negative_range() { + // Range [-5, 5] using wrapping arithmetic + let min_val = (-5_i32) as u64; + let max_val = 5_i32 as u64; + let mut gv = + GroupValuesFlatPrimitive::::new(DataType::Int32, min_val, max_val); + let arr: ArrayRef = Arc::new(Int32Array::from(vec![-5, 0, 5, -5, 0])); + let mut groups = Vec::new(); + gv.intern(&[arr], &mut groups).unwrap(); + assert_eq!(groups, vec![0, 1, 2, 0, 1]); + assert_eq!(gv.len(), 3); + } + + #[test] + fn test_emit_all() { + let mut gv = GroupValuesFlatPrimitive::::new(DataType::UInt64, 0, 9); + let arr: ArrayRef = Arc::new(UInt64Array::from(vec![3, 1, 7])); + let mut groups = Vec::new(); + gv.intern(&[arr], &mut groups).unwrap(); + + let emitted = gv.emit(EmitTo::All).unwrap(); + let result = emitted[0].as_primitive::(); + assert_eq!(result.values().as_ref(), &[3, 1, 7]); + assert_eq!(gv.len(), 0); + } + + #[test] + fn test_emit_first() { + let mut gv = GroupValuesFlatPrimitive::::new(DataType::Int32, 0, 9); + let arr: ArrayRef = Arc::new(Int32Array::from(vec![3, 1, 7, 5])); + let mut groups = Vec::new(); + gv.intern(&[arr], &mut groups).unwrap(); + assert_eq!(groups, vec![0, 1, 2, 3]); + + let emitted = gv.emit(EmitTo::First(2)).unwrap(); + let result = emitted[0].as_primitive::(); + assert_eq!(result.values().as_ref(), &[3, 1]); + assert_eq!(gv.len(), 2); + + // Intern more - existing groups should still work + let arr2: ArrayRef = Arc::new(Int32Array::from(vec![7, 5, 9])); + let mut groups2 = Vec::new(); + gv.intern(&[arr2], &mut groups2).unwrap(); + // 7 is now group 0 (was 2, shifted by 2), 5 is group 1, 9 is new -> group 2 + assert_eq!(groups2, vec![0, 1, 2]); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs index 89c6b624e8e0a..dd2c4f7b5ada5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs @@ -20,4 +20,5 @@ pub(crate) mod boolean; pub(crate) mod bytes; pub(crate) mod bytes_view; -pub(crate) mod primitive; +pub mod flat_primitive; +pub mod primitive;