From bce1f1b7b441ca86ec8184fc9ce3f7763010a4ee Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 18 May 2024 15:34:32 +0800 Subject: [PATCH 1/8] ahash workspace Signed-off-by: jayzhan211 --- Cargo.toml | 3 +++ datafusion/common/Cargo.toml | 4 +--- datafusion/core/Cargo.toml | 2 +- datafusion/expr/Cargo.toml | 4 +--- datafusion/physical-expr/Cargo.toml | 4 +--- datafusion/physical-plan/Cargo.toml | 4 +--- 6 files changed, 8 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e78fe4767d80..6ff4ee99875d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,9 @@ version = "38.0.0" # for the inherited dependency but cannot do the reverse (override from true to false). # # See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } arrow = { version = "51.0.0", features = ["prettyprint"] } arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "51.0.0", default-features = false } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 2391b2f83087..d67db3b45cdc 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,9 +41,7 @@ backtrace = [] pyarrow = ["pyo3", "arrow/pyarrow", "parquet"] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } apache-avro = { version = "0.16", default-features = false, features = [ "bzip", "snappy", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 2bd552aacc44..b4874d922a03 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -77,7 +77,7 @@ unicode_expressions = [ ] [dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +ahash = { workspace = true } apache-avro = { version = "0.16", optional = true } arrow = { workspace = true } arrow-array = { workspace = true } diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 2759572581ea..df91d9313746 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -38,9 +38,7 @@ path = "src/lib.rs" [features] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } chrono = { workspace = true } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 5261f1c8968d..17770b28d83e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -44,9 +44,7 @@ encoding_expressions = ["base64", "hex"] regex_expressions = ["regex"] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 25e1a6ad5bd3..f1aae4cc9881 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -36,9 +36,7 @@ name = "datafusion_physical_plan" path = "src/lib.rs" [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } From 03388655597ea1d14fdea9f708a52112b9a62ef6 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 19 May 2024 13:22:45 +0800 Subject: [PATCH 2/8] move other utils Signed-off-by: jayzhan211 --- .../src/aggregate/utils.rs | 81 +++++++++++++++- .../physical-expr/src/aggregate/utils.rs | 93 ++----------------- 2 files changed, 86 insertions(+), 88 deletions(-) diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs b/datafusion/physical-expr-common/src/aggregate/utils.rs index 9821ba626b18..a39ef25763e3 100644 --- a/datafusion/physical-expr-common/src/aggregate/utils.rs +++ b/datafusion/physical-expr-common/src/aggregate/utils.rs @@ -18,9 +18,16 @@ use std::{any::Any, sync::Arc}; use arrow::{ + array::{ArrayRef, ArrowNativeTypeOp, AsArray}, compute::SortOptions, - datatypes::{DataType, Field}, + datatypes::{ + DataType, Decimal128Type, Field, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, + ToByteSlice, + }, }; +use datafusion_common::Result; +use datafusion_expr::Accumulator; use crate::sort_expr::PhysicalSortExpr; @@ -43,6 +50,60 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { } } +/// Convert scalar values from an accumulator into arrays. +pub fn get_accum_scalar_values_as_arrays( + accum: &mut dyn Accumulator, +) -> Result> { + accum + .state()? + .iter() + .map(|s| s.to_array_of_size(1)) + .collect() +} + +/// Adjust array type metadata if needed +/// +/// Since `Decimal128Arrays` created from `Vec` have +/// default precision and scale, this function adjusts the output to +/// match `data_type`, if necessary +pub fn adjust_output_array(data_type: &DataType, array: ArrayRef) -> Result { + let array = match data_type { + DataType::Decimal128(p, s) => Arc::new( + array + .as_primitive::() + .clone() + .with_precision_and_scale(*p, *s)?, + ) as ArrayRef, + DataType::Timestamp(TimeUnit::Nanosecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Microsecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Millisecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Second, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + // no adjustment needed for other arrays + _ => array, + }; + Ok(array) +} + /// Construct corresponding fields for lexicographical ordering requirement expression pub fn ordering_fields( ordering_req: &[PhysicalSortExpr], @@ -67,3 +128,21 @@ pub fn ordering_fields( pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { ordering_req.iter().map(|item| item.options).collect() } + +/// A wrapper around a type to provide hash for floats +#[derive(Copy, Clone, Debug)] +pub struct Hashable(pub T); + +impl std::hash::Hash for Hashable { + fn hash(&self, state: &mut H) { + self.0.to_byte_slice().hash(state) + } +} + +impl PartialEq for Hashable { + fn eq(&self, other: &Self) -> bool { + self.0.is_eq(other.0) + } +} + +impl Eq for Hashable {} diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index 6d97ad3da6de..7713b91b8bd4 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -17,35 +17,18 @@ //! Utilities used in aggregates -use std::sync::Arc; - // For backwards compatibility pub use datafusion_physical_expr_common::aggregate::utils::{ - down_cast_any_ref, get_sort_options, ordering_fields, + adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, + get_sort_options, ordering_fields, Hashable, }; -use arrow::array::{ArrayRef, ArrowNativeTypeOp}; -use arrow_array::cast::AsArray; -use arrow_array::types::{ - Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, -}; -use arrow_buffer::{ArrowNativeType, ToByteSlice}; -use arrow_schema::DataType; +use arrow::array::ArrowNativeTypeOp; +use arrow_array::types::DecimalType; +use arrow_buffer::ArrowNativeType; use datafusion_common::{exec_err, DataFusionError, Result}; -use datafusion_expr::Accumulator; - -/// Convert scalar values from an accumulator into arrays. -pub fn get_accum_scalar_values_as_arrays( - accum: &mut dyn Accumulator, -) -> Result> { - accum - .state()? - .iter() - .map(|s| s.to_array_of_size(1)) - .collect() -} +// TODO: Move to functions-aggregate crate /// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow /// /// This is needed because different precisions for Decimal128/Decimal256 can @@ -125,67 +108,3 @@ impl DecimalAverager { } } } - -/// Adjust array type metadata if needed -/// -/// Since `Decimal128Arrays` created from `Vec` have -/// default precision and scale, this function adjusts the output to -/// match `data_type`, if necessary -pub fn adjust_output_array( - data_type: &DataType, - array: ArrayRef, -) -> Result { - let array = match data_type { - DataType::Decimal128(p, s) => Arc::new( - array - .as_primitive::() - .clone() - .with_precision_and_scale(*p, *s)?, - ) as ArrayRef, - DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Second, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - // no adjustment needed for other arrays - _ => array, - }; - Ok(array) -} - -/// A wrapper around a type to provide hash for floats -#[derive(Copy, Clone, Debug)] -pub(crate) struct Hashable(pub T); - -impl std::hash::Hash for Hashable { - fn hash(&self, state: &mut H) { - self.0.to_byte_slice().hash(state) - } -} - -impl PartialEq for Hashable { - fn eq(&self, other: &Self) -> bool { - self.0.is_eq(other.0) - } -} - -impl Eq for Hashable {} From 076573040c84f87b1db1b9b9d74808d1886a047b Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 19 May 2024 13:31:09 +0800 Subject: [PATCH 3/8] move NullState Signed-off-by: jayzhan211 --- datafusion-cli/Cargo.lock | 1 + datafusion/physical-expr-common/Cargo.toml | 1 + .../aggregate/groups_accumulator/accumulate.rs | 8 ++++---- .../src/aggregate/groups_accumulator/mod.rs | 18 ++++++++++++++++++ .../physical-expr-common/src/aggregate/mod.rs | 1 + .../src/aggregate/groups_accumulator/mod.rs | 8 ++++++-- 6 files changed, 31 insertions(+), 6 deletions(-) rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/accumulate.rs (99%) create mode 100644 datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index fd471e750194..52a2824e2944 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr", + "rand", ] [[package]] diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index d1202c83d526..637b8775112e 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -39,3 +39,4 @@ path = "src/lib.rs" arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +rand = { workspace = true } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs similarity index 99% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs index 9850b002e40e..f109079f6a26 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,9 +19,9 @@ //! //! [`GroupsAccumulator`]: datafusion_expr::GroupsAccumulator +use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; +use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; -use arrow_array::{Array, BooleanArray, PrimitiveArray}; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use datafusion_expr::EmitTo; /// Track the accumulator null state per row: if any values for that @@ -462,9 +462,9 @@ fn initialize_builder( mod test { use super::*; - use arrow_array::UInt32Array; - use hashbrown::HashSet; + use arrow::array::UInt32Array; use rand::{rngs::ThreadRng, Rng}; + use std::collections::HashSet; #[test] fn accumulate() { diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs new file mode 100644 index 000000000000..5ce10cef67c5 --- /dev/null +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod accumulate; diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index da24f335b2f8..4ef0d58046f8 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod groups_accumulator; pub mod stats; pub mod utils; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index de090badd349..ed385aba514c 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod accumulate; mod adapter; -pub use accumulate::NullState; + +pub(crate) mod accumulate { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::{accumulate_indices, NullState}; +} + pub use adapter::GroupsAccumulatorAdapter; +pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; pub(crate) mod bool_op; pub(crate) mod prim_op; From 48d39379423d2fb6761cc92a5e3654eee0944c50 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 19 May 2024 13:40:05 +0800 Subject: [PATCH 4/8] move PrimitiveGroupsAccumulator Signed-off-by: jayzhan211 --- .../src/aggregate/groups_accumulator/mod.rs | 1 + .../src/aggregate/groups_accumulator/prim_op.rs | 6 +++--- .../physical-expr/src/aggregate/groups_accumulator/mod.rs | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/prim_op.rs (96%) diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs index 5ce10cef67c5..fffdae11bec2 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -16,3 +16,4 @@ // under the License. pub mod accumulate; +pub mod prim_op; \ No newline at end of file diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs similarity index 96% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index 994f5447d7c0..6ea603951360 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,9 +17,9 @@ use std::sync::Arc; -use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; -use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; -use arrow_schema::DataType; +use arrow::datatypes::ArrowPrimitiveType; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_expr::{EmitTo, GroupsAccumulator}; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index ed385aba514c..d40dec634862 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -25,4 +25,6 @@ pub use adapter::GroupsAccumulatorAdapter; pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; pub(crate) mod bool_op; -pub(crate) mod prim_op; +pub(crate) mod prim_op { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +} From adb5d2bae394afe9eddc5f0df6b8683da220b618 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 19 May 2024 13:43:33 +0800 Subject: [PATCH 5/8] move boolop Signed-off-by: jayzhan211 --- .../src/aggregate/groups_accumulator/bool_op.rs | 5 ++--- .../src/aggregate/groups_accumulator/mod.rs | 3 ++- .../src/aggregate/groups_accumulator/prim_op.rs | 2 +- .../physical-expr/src/aggregate/groups_accumulator/mod.rs | 4 +++- 4 files changed, 8 insertions(+), 6 deletions(-) rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/bool_op.rs (97%) diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs similarity index 97% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs index f40c661a7a2f..8498d69dd333 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs @@ -17,9 +17,8 @@ use std::sync::Arc; -use arrow::array::AsArray; -use arrow_array::{ArrayRef, BooleanArray}; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; +use arrow::buffer::BooleanBuffer; use datafusion_common::Result; use datafusion_expr::{EmitTo, GroupsAccumulator}; diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs index fffdae11bec2..f7c048871475 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -16,4 +16,5 @@ // under the License. pub mod accumulate; -pub mod prim_op; \ No newline at end of file +pub mod bool_op; +pub mod prim_op; diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index 6ea603951360..debb36852b22 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,8 +17,8 @@ use std::sync::Arc; -use arrow::datatypes::ArrowPrimitiveType; use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_expr::{EmitTo, GroupsAccumulator}; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index d40dec634862..983572c1b7b9 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -24,7 +24,9 @@ pub(crate) mod accumulate { pub use adapter::GroupsAccumulatorAdapter; pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; -pub(crate) mod bool_op; +pub(crate) mod bool_op { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; +} pub(crate) mod prim_op { pub use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; } From 0405a37f3cea324d763c515e32b5b09037eb8427 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 19 May 2024 13:55:01 +0800 Subject: [PATCH 6/8] move deciamlavg Signed-off-by: jayzhan211 --- .../src/aggregate/utils.rs | 85 +++++++++++++- datafusion/physical-expr/src/aggregate/mod.rs | 7 +- .../physical-expr/src/aggregate/utils.rs | 110 ------------------ 3 files changed, 89 insertions(+), 113 deletions(-) delete mode 100644 datafusion/physical-expr/src/aggregate/utils.rs diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs b/datafusion/physical-expr-common/src/aggregate/utils.rs index a39ef25763e3..c59c29a139d8 100644 --- a/datafusion/physical-expr-common/src/aggregate/utils.rs +++ b/datafusion/physical-expr-common/src/aggregate/utils.rs @@ -17,16 +17,17 @@ use std::{any::Any, sync::Arc}; +use arrow::datatypes::ArrowNativeType; use arrow::{ array::{ArrayRef, ArrowNativeTypeOp, AsArray}, compute::SortOptions, datatypes::{ - DataType, Decimal128Type, Field, TimeUnit, TimestampMicrosecondType, + DataType, Decimal128Type, DecimalType, Field, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, ToByteSlice, }, }; -use datafusion_common::Result; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; use crate::sort_expr::PhysicalSortExpr; @@ -146,3 +147,83 @@ impl PartialEq for Hashable { } impl Eq for Hashable {} + +/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow +/// +/// This is needed because different precisions for Decimal128/Decimal256 can +/// store different ranges of values and thus sum/count may not fit in +/// the target type. +/// +/// For example, the precision is 3, the max of value is `999` and the min +/// value is `-999` +pub struct DecimalAverager { + /// scale factor for sum values (10^sum_scale) + sum_mul: T::Native, + /// scale factor for target (10^target_scale) + target_mul: T::Native, + /// the output precision + target_precision: u8, +} + +impl DecimalAverager { + /// Create a new `DecimalAverager`: + /// + /// * sum_scale: the scale of `sum` values passed to [`Self::avg`] + /// * target_precision: the output precision + /// * target_scale: the output scale + /// + /// Errors if the resulting data can not be stored + pub fn try_new( + sum_scale: i8, + target_precision: u8, + target_scale: i8, + ) -> Result { + let sum_mul = T::Native::from_usize(10_usize) + .map(|b| b.pow_wrapping(sum_scale as u32)) + .ok_or(DataFusionError::Internal( + "Failed to compute sum_mul in DecimalAverager".to_string(), + ))?; + + let target_mul = T::Native::from_usize(10_usize) + .map(|b| b.pow_wrapping(target_scale as u32)) + .ok_or(DataFusionError::Internal( + "Failed to compute target_mul in DecimalAverager".to_string(), + ))?; + + if target_mul >= sum_mul { + Ok(Self { + sum_mul, + target_mul, + target_precision, + }) + } else { + // can't convert the lit decimal to the returned data type + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } + + /// Returns the `sum`/`count` as a i128/i256 Decimal128/Decimal256 with + /// target_scale and target_precision and reporting overflow. + /// + /// * sum: The total sum value stored as Decimal128 with sum_scale + /// (passed to `Self::try_new`) + /// * count: total count, stored as a i128/i256 (*NOT* a Decimal128/Decimal256 value) + #[inline(always)] + pub fn avg(&self, sum: T::Native, count: T::Native) -> Result { + if let Ok(value) = sum.mul_checked(self.target_mul.div_wrapping(self.sum_mul)) { + let new_value = value.div_wrapping(count); + + let validate = + T::validate_decimal_precision(new_value, self.target_precision); + + if validate.is_ok() { + Ok(new_value) + } else { + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } else { + // can't convert the lit decimal to the returned data type + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } +} diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index eff008e8f825..93ecf0655e51 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -54,7 +54,12 @@ pub(crate) mod variance; pub mod build_in; pub mod moving_min_max; -pub mod utils; +pub mod utils { + pub use datafusion_physical_expr_common::aggregate::utils::{ + adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, + get_sort_options, ordering_fields, DecimalAverager, Hashable, + }; +} /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs deleted file mode 100644 index 7713b91b8bd4..000000000000 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utilities used in aggregates - -// For backwards compatibility -pub use datafusion_physical_expr_common::aggregate::utils::{ - adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, - get_sort_options, ordering_fields, Hashable, -}; - -use arrow::array::ArrowNativeTypeOp; -use arrow_array::types::DecimalType; -use arrow_buffer::ArrowNativeType; -use datafusion_common::{exec_err, DataFusionError, Result}; - -// TODO: Move to functions-aggregate crate -/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow -/// -/// This is needed because different precisions for Decimal128/Decimal256 can -/// store different ranges of values and thus sum/count may not fit in -/// the target type. -/// -/// For example, the precision is 3, the max of value is `999` and the min -/// value is `-999` -pub(crate) struct DecimalAverager { - /// scale factor for sum values (10^sum_scale) - sum_mul: T::Native, - /// scale factor for target (10^target_scale) - target_mul: T::Native, - /// the output precision - target_precision: u8, -} - -impl DecimalAverager { - /// Create a new `DecimalAverager`: - /// - /// * sum_scale: the scale of `sum` values passed to [`Self::avg`] - /// * target_precision: the output precision - /// * target_scale: the output scale - /// - /// Errors if the resulting data can not be stored - pub fn try_new( - sum_scale: i8, - target_precision: u8, - target_scale: i8, - ) -> Result { - let sum_mul = T::Native::from_usize(10_usize) - .map(|b| b.pow_wrapping(sum_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute sum_mul in DecimalAverager".to_string(), - ))?; - - let target_mul = T::Native::from_usize(10_usize) - .map(|b| b.pow_wrapping(target_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute target_mul in DecimalAverager".to_string(), - ))?; - - if target_mul >= sum_mul { - Ok(Self { - sum_mul, - target_mul, - target_precision, - }) - } else { - // can't convert the lit decimal to the returned data type - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } - - /// Returns the `sum`/`count` as a i128/i256 Decimal128/Decimal256 with - /// target_scale and target_precision and reporting overflow. - /// - /// * sum: The total sum value stored as Decimal128 with sum_scale - /// (passed to `Self::try_new`) - /// * count: total count, stored as a i128/i256 (*NOT* a Decimal128/Decimal256 value) - #[inline(always)] - pub fn avg(&self, sum: T::Native, count: T::Native) -> Result { - if let Ok(value) = sum.mul_checked(self.target_mul.div_wrapping(self.sum_mul)) { - let new_value = value.div_wrapping(count); - - let validate = - T::validate_decimal_precision(new_value, self.target_precision); - - if validate.is_ok() { - Ok(new_value) - } else { - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } else { - // can't convert the lit decimal to the returned data type - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } -} From b3dc90b08c337d58c20ae216538252596ed8dc28 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 22 May 2024 07:31:11 +0800 Subject: [PATCH 7/8] add comment Signed-off-by: jayzhan211 --- .../src/aggregate/groups_accumulator/mod.rs | 2 ++ .../physical-expr/src/aggregate/groups_accumulator/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs index f7c048871475..fd247b78a4ac 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Utilities for implementing [`GroupsAccumulator`] + pub mod accumulate; pub mod bool_op; pub mod prim_op; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 983572c1b7b9..65227b727be7 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -16,12 +16,13 @@ // under the License. mod adapter; +pub use adapter::GroupsAccumulatorAdapter; +// Backward compatibility pub(crate) mod accumulate { pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::{accumulate_indices, NullState}; } -pub use adapter::GroupsAccumulatorAdapter; pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; pub(crate) mod bool_op { From ad689df85da83297a98060a29a99b026abe3c27e Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 22 May 2024 07:46:37 +0800 Subject: [PATCH 8/8] fix doc Signed-off-by: jayzhan211 --- .../src/aggregate/groups_accumulator/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs index fd247b78a4ac..5b0182c5db8a 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Utilities for implementing [`GroupsAccumulator`] +//! Utilities for implementing GroupsAccumulator pub mod accumulate; pub mod bool_op;