diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 8570f81700c5..3734cfbe313c 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -143,6 +143,26 @@ fn criterion_benchmark(c: &mut Criterion) { ) }) }); + + c.bench_function("aggregate_query_approx_percentile_cont_on_u64", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \ + FROM t GROUP BY utf8", + ) + }) + }); + + c.bench_function("aggregate_query_approx_percentile_cont_on_f32", |b| { + b.iter(|| { + query( + ctx.clone(), + "SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \ + FROM t GROUP BY utf8", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index ee32b0a6a433..fc5cc920efc2 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -370,9 +370,10 @@ impl Accumulator for ApproxPercentileAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - let unsorted_values = - ApproxPercentileAccumulator::convert_to_ordered_float(values)?; - self.digest = self.digest.merge_unsorted_f64(unsorted_values); + let sorted_values = &arrow::compute::sort(values, None)?; + let sorted_values = + ApproxPercentileAccumulator::convert_to_ordered_float(sorted_values)?; + self.digest = self.digest.merge_sorted_f64(&sorted_values); Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs index fa937d3e159b..6314a2af6ca3 100644 --- a/datafusion/physical-expr/src/aggregate/tdigest.rs +++ b/datafusion/physical-expr/src/aggregate/tdigest.rs @@ -260,6 +260,7 @@ impl TDigest { } } + #[cfg(test)] pub(crate) fn merge_unsorted_f64( &self, unsorted_values: Vec>, @@ -269,7 +270,10 @@ impl TDigest { self.merge_sorted_f64(&values) } - fn merge_sorted_f64(&self, sorted_values: &[OrderedFloat]) -> TDigest { + pub(crate) fn merge_sorted_f64( + &self, + sorted_values: &[OrderedFloat], + ) -> TDigest { #[cfg(debug_assertions)] debug_assert!(is_sorted(sorted_values), "unsorted input to TDigest");