Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions datafusion/physical-plan/benches/aggregate_vectorized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::ArrayRef;
use arrow::datatypes::{Int32Type, StringViewType};
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{Field, Int32Type, Schema, StringViewType};
use arrow::util::bench_util::{
create_primitive_array, create_string_view_array_with_len,
create_string_view_array_with_max_len,
Expand All @@ -30,6 +30,8 @@ use criterion::{
use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
use datafusion_physical_plan::aggregates::group_values::new_group_values;
use datafusion_physical_plan::aggregates::order::GroupOrdering;
use rand::distr::{Bernoulli, Distribution};
use std::hint::black_box;
use std::sync::Arc;
Expand All @@ -40,6 +42,7 @@ const NULL_DENSITIES: [f32; 3] = [0.0, 0.1, 0.5];
fn bench_vectorized_append(c: &mut Criterion) {
byte_view_vectorized_append(c);
primitive_vectorized_append(c);
single_group_by_primitive_intern(c);
}

fn byte_view_vectorized_append(c: &mut Criterion) {
Expand Down Expand Up @@ -179,6 +182,53 @@ fn primitive_vectorized_append(c: &mut Criterion) {
group.finish();
}

fn single_group_by_primitive_intern(c: &mut Criterion) {
const BATCH_SIZE: usize = 4096;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be batch size be like default?

const BATCHES: usize = 16;

let cases = [
("low_cardinality", 128),
("high_cardinality", BATCH_SIZE * BATCHES),
];

let schema = Arc::new(Schema::new(vec![Field::new(
"group_key",
DataType::UInt64,
true,
)]));
let mut group = c.benchmark_group("GroupValuesPrimitive_intern");

for (case_name, distinct) in cases {
let batches = (0..BATCHES)
.map(|batch| {
let start = batch * BATCH_SIZE;
let values = (start..start + BATCH_SIZE)
.map(|value| Some((value % distinct) as u64));
Arc::new(UInt64Array::from_iter(values)) as ArrayRef
})
.collect::<Vec<_>>();

group.bench_function(case_name, |b| {
b.iter(|| {
let mut group_values =
new_group_values(Arc::clone(&schema), &GroupOrdering::None).unwrap();
let mut groups = Vec::with_capacity(BATCH_SIZE);

for batch in &batches {
group_values
.intern(std::slice::from_ref(batch), &mut groups)
.unwrap();
black_box(&groups);
}

black_box(group_values.len());
});
});
}

group.finish();
}

fn bench_single_primitive<const NULLABLE: bool>(
group: &mut BenchmarkGroup<WallTime>,
size: usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
use crate::aggregates::group_values::GroupValues;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::{
ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, NullBufferBuilder, PrimitiveArray,
cast::AsArray,
Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, NullBufferBuilder,
PrimitiveArray, cast::AsArray,
};
use arrow::datatypes::{DataType, i256};
use datafusion_common::Result;
use datafusion_common::hash_utils::RandomState;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use half::f16;
use hashbrown::hash_table::HashTable;
Expand Down Expand Up @@ -82,17 +81,12 @@ hash_float!(f16, f32, f64);
pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
/// The data type of the output array
data_type: DataType,
/// Stores the `(group_index, hash)` based on the hash of its value
///
/// We also store `hash` is for reducing cost of rehashing. Such cost
/// is obvious in high cardinality group by situation.
/// More details can see:
/// <https://github.com/apache/datafusion/issues/15961>
map: HashTable<(usize, u64)>,
/// Stores the group index and value, keyed by the hash of the value
map: HashTable<(usize, T::Native)>,
/// The group index of the null value if any
null_group: Option<usize>,
/// The values for each group index
values: Vec<T::Native>,
/// The number of groups, including the null group if present
num_groups: usize,
/// The random state used to generate hashes
random_state: RandomState,
}
Expand All @@ -103,7 +97,7 @@ impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
Self {
data_type,
map: HashTable::with_capacity(128),
values: Vec::with_capacity(128),
num_groups: 0,
null_group: None,
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
}
Expand All @@ -118,50 +112,73 @@ where
assert_eq!(cols.len(), 1);
groups.clear();

for v in cols[0].as_primitive::<T>() {
let group_id = match v {
None => *self.null_group.get_or_insert_with(|| {
let group_id = self.values.len();
self.values.push(Default::default());
group_id
}),
Some(key) => {
let state = &self.random_state;
let hash = key.hash(state);
let insert = self.map.entry(
hash,
|&(g, h)| unsafe {
hash == h && self.values.get_unchecked(g).is_eq(key)
},
|&(_, h)| h,
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.values.len();
v.insert((g, hash));
self.values.push(key);
g
}
let values = cols[0].as_primitive::<T>();

if values.null_count() == 0 {
let state = &self.random_state;
groups.extend(values.values().iter().map(|&key| {
let hash = key.hash(state);
let insert = self.map.entry(
hash,
|&(_, value)| value.is_eq(key),
|&(_, value)| value.hash(state),
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.num_groups;
v.insert((g, key));
self.num_groups += 1;
g
}
}
};
groups.push(group_id)
}));
} else {
for v in values {
let group_id = match v {
None => *self.null_group.get_or_insert_with(|| {
let group_id = self.num_groups;
self.num_groups += 1;
group_id
}),
Some(key) => {
let state = &self.random_state;
let hash = key.hash(state);
let insert = self.map.entry(
hash,
|&(_, value)| value.is_eq(key),
|&(_, value)| value.hash(state),
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => o.get().0,
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.num_groups;
v.insert((g, key));
self.num_groups += 1;
g
}
}
}
};
groups.push(group_id)
}
}

Ok(())
}

fn size(&self) -> usize {
self.map.capacity() * size_of::<(usize, u64)>() + self.values.allocated_size()
self.map.capacity() * size_of::<(usize, T::Native)>()
}

fn is_empty(&self) -> bool {
self.values.is_empty()
self.num_groups == 0
}

fn len(&self) -> usize {
self.values.len()
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
Expand All @@ -182,21 +199,30 @@ where

let array: PrimitiveArray<T> = match emit_to {
EmitTo::All => {
self.map.clear();
build_primitive(std::mem::take(&mut self.values), self.null_group.take())
let mut values = vec![T::default_value(); self.num_groups];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can values be reused? 🤔

for (group_idx, value) in std::mem::take(&mut self.map) {
values[group_idx] = value;
}
self.num_groups = 0;

build_primitive(values, self.null_group.take())
}
EmitTo::First(n) => {
self.map.retain(|entry| {
let mut values = vec![T::default_value(); n];

self.map.retain(|(group_idx, value)| {
// Decrement group index by n
let group_idx = entry.0;
match group_idx.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
entry.0 = sub;
*group_idx = sub;
true
}
// Group index was < n, so remove from table
None => false,
None => {
values[*group_idx] = *value;
false
}
}
});
let null_group = match &mut self.null_group {
Expand All @@ -207,19 +233,84 @@ where
Some(_) => self.null_group.take(),
None => None,
};
let mut split = self.values.split_off(n);
std::mem::swap(&mut self.values, &mut split);
build_primitive(split, null_group)
self.num_groups -= n;
build_primitive(values, null_group)
}
};

Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
}

fn clear_shrink(&mut self, num_rows: usize) {
self.values.clear();
self.values.shrink_to(num_rows);
self.num_groups = 0;
self.null_group = None;
self.map.clear();
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Array, UInt64Array};
use arrow::datatypes::UInt64Type;

fn values(array: &ArrayRef) -> Vec<Option<u64>> {
let array = array.as_primitive::<UInt64Type>();
(0..array.len())
.map(|idx| {
if array.is_null(idx) {
None
} else {
Some(array.value(idx))
}
})
.collect()
}

#[test]
fn primitive_emit_all_reconstructs_group_order() -> Result<()> {
let input = Arc::new(UInt64Array::from(vec![Some(10), Some(20), None, Some(10)]))
as ArrayRef;
let mut group_values = GroupValuesPrimitive::<UInt64Type>::new(DataType::UInt64);
let mut groups = vec![];

group_values.intern(&[input], &mut groups)?;
assert_eq!(groups, vec![0, 1, 2, 0]);

let output = group_values.emit(EmitTo::All)?;
assert_eq!(values(&output[0]), vec![Some(10), Some(20), None]);
assert!(group_values.is_empty());

Ok(())
}

#[test]
fn primitive_emit_first_reindexes_remaining_groups() -> Result<()> {
let input = Arc::new(UInt64Array::from(vec![
Some(10),
None,
Some(20),
Some(30),
Some(10),
])) as ArrayRef;
let mut group_values = GroupValuesPrimitive::<UInt64Type>::new(DataType::UInt64);
let mut groups = vec![];

group_values.intern(&[input], &mut groups)?;
assert_eq!(groups, vec![0, 1, 2, 3, 0]);

let output = group_values.emit(EmitTo::First(2))?;
assert_eq!(values(&output[0]), vec![Some(10), None]);
assert_eq!(group_values.len(), 2);

let input = Arc::new(UInt64Array::from(vec![Some(20), Some(40)])) as ArrayRef;
group_values.intern(&[input], &mut groups)?;
assert_eq!(groups, vec![0, 2]);

let output = group_values.emit(EmitTo::All)?;
assert_eq!(values(&output[0]), vec![Some(20), Some(30), Some(40)]);

Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4629,7 +4629,7 @@ mod tests {

// Pool must be large enough for accumulation to start but too small for
// sort_memory after clearing.
let task_ctx = new_spill_ctx(1, 500);
let task_ctx = new_spill_ctx(1, 400);
let result = collect(aggr.execute(0, Arc::clone(&task_ctx))?).await;

match &result {
Expand Down
Loading