From 82357c78a5f23ff3077f963c736d69068cb8bdd5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Nov 2022 17:26:53 +0100 Subject: [PATCH 1/5] feat: add `TDigest::size` --- datafusion/physical-expr/src/aggregate/tdigest.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs index 977678e5b05b..c25566077e00 100644 --- a/datafusion/physical-expr/src/aggregate/tdigest.rs +++ b/datafusion/physical-expr/src/aggregate/tdigest.rs @@ -187,6 +187,12 @@ impl TDigest { pub(crate) fn max_size(&self) -> usize { self.max_size } + + /// Size in bytes including `Self`. + pub(crate) fn size(&self) -> usize { + std::mem::size_of_val(self) + + (std::mem::size_of::() * self.centroids.capacity()) + } } impl Default for TDigest { @@ -741,4 +747,12 @@ mod tests { assert_error_bounds!(t, quantile = 0.5, want = 500.0); assert_state_roundtrip!(t); } + + #[test] + fn test_size() { + let t = TDigest::new(10); + let t = t.merge_unsorted_f64(vec![0.0, 1.0]); + + assert_eq!(t.size(), 96); + } } From 9e6fb88f709c7bbd0e7392e5d41fc858411e1ea9 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Nov 2022 17:27:32 +0100 Subject: [PATCH 2/5] feat: add `ScalarValue::size` --- datafusion/common/src/scalar.rs | 70 +++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 96d1ab6722f7..893d7ef41bb7 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2290,6 +2290,76 @@ impl ScalarValue { ScalarValue::Null => array.data().is_null(index), } } + + /// Estimate size if bytes including `Self` + pub fn size(&self) -> usize { + std::mem::size_of_val(&self) + + match self { + ScalarValue::Null + | ScalarValue::Boolean(_) + | ScalarValue::Float32(_) + | ScalarValue::Float64(_) + | ScalarValue::Decimal128(_, _, _) + | ScalarValue::Int8(_) + | ScalarValue::Int16(_) + | ScalarValue::Int32(_) + | ScalarValue::Int64(_) + | ScalarValue::UInt8(_) + | ScalarValue::UInt16(_) + | ScalarValue::UInt32(_) + | ScalarValue::UInt64(_) + | ScalarValue::Date32(_) + | ScalarValue::Date64(_) + | ScalarValue::Time32Second(_) + | ScalarValue::Time32Millisecond(_) + | ScalarValue::Time64Microsecond(_) + | ScalarValue::Time64Nanosecond(_) + | ScalarValue::IntervalYearMonth(_) + | ScalarValue::IntervalDayTime(_) + | ScalarValue::IntervalMonthDayNano(_) => 0, + ScalarValue::Utf8(s) + | ScalarValue::LargeUtf8(s) + | ScalarValue::TimestampSecond(_, s) + | ScalarValue::TimestampMillisecond(_, s) + | ScalarValue::TimestampMicrosecond(_, s) + | ScalarValue::TimestampNanosecond(_, s) => { + s.as_ref().map(|s| s.capacity()).unwrap_or_default() + } + ScalarValue::Binary(b) + | ScalarValue::FixedSizeBinary(_, b) + | ScalarValue::LargeBinary(b) => { + b.as_ref().map(|b| b.capacity()).unwrap_or_default() + } + // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + ScalarValue::List(vals, field) => { + vals.as_ref() + .map(|vals| { + vals.iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + + (std::mem::size_of::() * vals.capacity()) + }) + .unwrap_or_default() + + std::mem::size_of_val(field) + } + // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + ScalarValue::Struct(vals, fields) => { + vals.as_ref() + .map(|vals| { + vals.iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + + (std::mem::size_of::() * vals.capacity()) + }) + .unwrap_or_default() + + (std::mem::size_of::() * fields.capacity()) + } + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + ScalarValue::Dictionary(dt, sv) => { + std::mem::size_of_val(dt.as_ref()) + sv.size() + } + } + } } macro_rules! impl_scalar { From 0a28451a7117d17d05c2c3c9cc1c8540e9457f8e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Nov 2022 17:28:42 +0100 Subject: [PATCH 3/5] feat: add `Accumulator::size` --- datafusion-examples/examples/simple_udaf.rs | 4 ++++ .../core/tests/user_defined_aggregates.rs | 4 ++++ datafusion/expr/src/accumulator.rs | 3 +++ .../src/aggregate/approx_distinct.rs | 5 +++++ .../src/aggregate/approx_percentile_cont.rs | 6 ++++++ .../approx_percentile_cont_with_weight.rs | 6 ++++++ .../physical-expr/src/aggregate/array_agg.rs | 11 +++++++++++ .../src/aggregate/array_agg_distinct.rs | 11 +++++++++++ .../physical-expr/src/aggregate/average.rs | 4 ++++ .../src/aggregate/correlation.rs | 9 +++++++++ .../physical-expr/src/aggregate/count.rs | 4 ++++ .../src/aggregate/count_distinct.rs | 19 +++++++++++++++++++ .../physical-expr/src/aggregate/covariance.rs | 4 ++++ .../physical-expr/src/aggregate/median.rs | 14 ++++++++++++++ .../physical-expr/src/aggregate/min_max.rs | 8 ++++++++ .../physical-expr/src/aggregate/stddev.rs | 5 +++++ datafusion/physical-expr/src/aggregate/sum.rs | 4 ++++ .../src/aggregate/sum_distinct.rs | 7 +++++++ .../physical-expr/src/aggregate/variance.rs | 4 ++++ datafusion/proto/src/lib.rs | 4 ++++ 20 files changed, 136 insertions(+) diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/simple_udaf.rs index 416d514e3197..bb3a42b8bf81 100644 --- a/datafusion-examples/examples/simple_udaf.rs +++ b/datafusion-examples/examples/simple_udaf.rs @@ -153,6 +153,10 @@ impl Accumulator for GeometricMean { self.merge(&v) }) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } #[tokio::main] diff --git a/datafusion/core/tests/user_defined_aggregates.rs b/datafusion/core/tests/user_defined_aggregates.rs index ea6838e70dba..2903d42720f2 100644 --- a/datafusion/core/tests/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined_aggregates.rs @@ -244,4 +244,8 @@ impl Accumulator for FirstSelector { // same logic is needed as in update_batch self.update_batch(states) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs index d0540f50195a..93ad8359af01 100644 --- a/datafusion/expr/src/accumulator.rs +++ b/datafusion/expr/src/accumulator.rs @@ -54,6 +54,9 @@ pub trait Accumulator: Send + Sync + Debug { /// returns its value based on its current state. fn evaluate(&self) -> Result; + + /// Size in bytes including `Self`. + fn size(&self) -> usize; } /// Representation of internal accumulator state. Accumulators can potentially have a mix of diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs index 7302008ba825..a2ac7f093005 100644 --- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs @@ -239,6 +239,11 @@ macro_rules! default_accumulator_impl { fn evaluate(&self) -> Result { Ok(ScalarValue::UInt64(Some(self.hll.count() as u64))) } + + fn size(&self) -> usize { + // HLL has static size + std::mem::size_of_val(self) + } }; } diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 7506757199e2..2b1a5da85692 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -355,6 +355,7 @@ impl ApproxPercentileAccumulator { } } } + impl Accumulator for ApproxPercentileAccumulator { fn state(&self) -> Result> { Ok(self @@ -413,4 +414,9 @@ impl Accumulator for ApproxPercentileAccumulator { Ok(()) } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::size_of_val(self) + self.digest.size() + } } diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs index 85426015e3a0..41f195f382b9 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs @@ -150,4 +150,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator { Ok(()) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + - std::mem::size_of_val(&self.approx_percentile_cont_accumulator) + + self.approx_percentile_cont_accumulator.size() + } } diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index 160e4477b102..b69acdc861bc 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -153,6 +153,17 @@ impl Accumulator for ArrayAggAccumulator { self.datatype.clone(), )) } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::size_of_val(self) + + (std::mem::size_of::() * self.values.capacity()) + + self + .values + .iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index a0ef021b807c..5a1857916087 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -156,6 +156,17 @@ impl Accumulator for DistinctArrayAggAccumulator { self.datatype.clone(), )) } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::size_of_val(self) + + (std::mem::size_of::() * self.values.capacity()) + + self + .values + .iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index f034e3d56897..da70252d4b44 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -200,6 +200,10 @@ impl Accumulator for AvgAccumulator { )), } } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs index 8645bd5497af..a013bd43acc4 100644 --- a/datafusion/physical-expr/src/aggregate/correlation.rs +++ b/datafusion/physical-expr/src/aggregate/correlation.rs @@ -193,6 +193,15 @@ impl Accumulator for CorrelationAccumulator { Ok(ScalarValue::Float64(None)) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.covar) + + self.covar.size() + - std::mem::size_of_val(&self.stddev1) + + self.stddev1.size() + - std::mem::size_of_val(&self.stddev2) + + self.stddev2.size() + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index b64328aa3a78..4721bf8f2301 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -149,6 +149,10 @@ impl Accumulator for CountAccumulator { fn evaluate(&self) -> Result { Ok(ScalarValue::Int64(Some(self.count))) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index d4c0b4406adc..da90f660de35 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -218,6 +218,25 @@ impl Accumulator for DistinctCountAccumulator { ))), } } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::size_of_val(self) + + (std::mem::size_of::() * self.values.capacity()) + + self + .values + .iter() + .map(|vals| { + (std::mem::size_of::() * vals.0.capacity()) + + vals + .0 + .iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + }) + .sum::() + + (std::mem::size_of::() * self.state_data_types.capacity()) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs index 7797e008d9fe..b5343059e585 100644 --- a/datafusion/physical-expr/src/aggregate/covariance.rs +++ b/datafusion/physical-expr/src/aggregate/covariance.rs @@ -373,6 +373,10 @@ impl Accumulator for CovarianceAccumulator { Ok(ScalarValue::Float64(Some(self.algo_const / count as f64))) } } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index 968cd6986601..deef5dec21cb 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -183,6 +183,20 @@ impl Accumulator for MedianAccumulator { )), } } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::align_of_val(self) + + (std::mem::size_of::() * self.all_values.capacity()) + + self + .all_values + .iter() + .map(|array_ref| { + std::mem::size_of_val(array_ref.as_ref()) + + array_ref.get_array_memory_size() + }) + .sum::() + } } /// Create an empty array diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 2d6961bfcb88..73f898c426b1 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -571,6 +571,10 @@ impl Accumulator for MaxAccumulator { fn evaluate(&self) -> Result { Ok(self.max.clone()) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() + } } #[derive(Debug)] @@ -735,6 +739,10 @@ impl Accumulator for MinAccumulator { fn evaluate(&self) -> Result { Ok(self.min.clone()) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs index 639971811f51..94ec2418b5a6 100644 --- a/datafusion/physical-expr/src/aggregate/stddev.rs +++ b/datafusion/physical-expr/src/aggregate/stddev.rs @@ -211,6 +211,11 @@ impl Accumulator for StddevAccumulator { )), } } + + fn size(&self) -> usize { + std::mem::align_of_val(self) - std::mem::align_of_val(&self.variance) + + self.variance.size() + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 112856988129..32259bc3061d 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -279,6 +279,10 @@ impl Accumulator for SumAccumulator { Ok(self.sum.clone()) } } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index 73c4828e887f..3e2119eac6b4 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -175,6 +175,13 @@ impl Accumulator for DistinctSumAccumulator { } Ok(sum_value) } + + fn size(&self) -> usize { + // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) + std::mem::size_of_val(self) + + (std::mem::size_of::() * self.hash_values.capacity()) + + self.hash_values.iter().map(|sv| sv.size()).sum::() + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs index d6ed8c95778b..8af810a9ef27 100644 --- a/datafusion/physical-expr/src/aggregate/variance.rs +++ b/datafusion/physical-expr/src/aggregate/variance.rs @@ -306,6 +306,10 @@ impl Accumulator for VarianceAccumulator { Ok(ScalarValue::Float64(Some(self.m2 / count as f64))) } } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } #[cfg(test)] diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 315fbd0dda0e..da61188f0a9f 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -1226,6 +1226,10 @@ mod roundtrip_tests { fn evaluate(&self) -> datafusion::error::Result { Ok(ScalarValue::Float64(None)) } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } } let dummy_agg = create_udaf( From 0df6f40e50f35fd372c78b783c09f2308902973f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Nov 2022 18:23:24 +0100 Subject: [PATCH 4/5] docs: improve Co-authored-by: Andrew Lamb --- datafusion/common/src/scalar.rs | 3 ++- datafusion/expr/src/accumulator.rs | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 893d7ef41bb7..838aadfb9d2a 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2291,7 +2291,8 @@ impl ScalarValue { } } - /// Estimate size if bytes including `Self` + /// Estimate size if bytes including `Self`. For values with internal containers such as `String` + /// includes the allocated size (`capacity`) rather than the current length (`len`) pub fn size(&self) -> usize { std::mem::size_of_val(&self) + match self { diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs index 93ad8359af01..3d107c0c4eae 100644 --- a/datafusion/expr/src/accumulator.rs +++ b/datafusion/expr/src/accumulator.rs @@ -55,7 +55,9 @@ pub trait Accumulator: Send + Sync + Debug { /// returns its value based on its current state. fn evaluate(&self) -> Result; - /// Size in bytes including `Self`. + /// Allocated size required for this accumulator, in bytes, including `Self`. + /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used + /// not the `len` fn size(&self) -> usize; } From a5d0b03a54f65367a2f34aa3d52be1b2ad89879c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 23 Nov 2022 18:32:06 +0100 Subject: [PATCH 5/5] refactor: simplify memory accounting --- datafusion/common/src/scalar.rs | 30 +++++++++++++++---- .../physical-expr/src/aggregate/array_agg.rs | 9 ++---- .../src/aggregate/array_agg_distinct.rs | 9 ++---- .../src/aggregate/count_distinct.rs | 7 +---- .../src/aggregate/sum_distinct.rs | 5 ++-- 5 files changed, 31 insertions(+), 29 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 838aadfb9d2a..9a1119469a59 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -19,6 +19,7 @@ use std::borrow::Borrow; use std::cmp::{max, Ordering}; +use std::collections::HashSet; use std::convert::{Infallible, TryInto}; use std::ops::{Add, Sub}; use std::str::FromStr; @@ -2334,12 +2335,7 @@ impl ScalarValue { // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) ScalarValue::List(vals, field) => { vals.as_ref() - .map(|vals| { - vals.iter() - .map(|sv| sv.size() - std::mem::size_of_val(sv)) - .sum::() - + (std::mem::size_of::() * vals.capacity()) - }) + .map(|vals| Self::size_of_vec(vals) - std::mem::size_of_val(vals)) .unwrap_or_default() + std::mem::size_of_val(field) } @@ -2361,6 +2357,28 @@ impl ScalarValue { } } } + + /// Estimates [size](Self::size) of [`Vec`] in bytes. + /// + /// Includes the size of the [`Vec`] container itself. + pub fn size_of_vec(vec: &Vec) -> usize { + (std::mem::size_of::() * vec.capacity()) + + vec + .iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + } + + /// Estimates [size](Self::size) of [`HashSet`] in bytes. + /// + /// Includes the size of the [`HashSet`] container itself. + pub fn size_of_hashset(set: &HashSet) -> usize { + (std::mem::size_of::() * set.capacity()) + + set + .iter() + .map(|sv| sv.size() - std::mem::size_of_val(sv)) + .sum::() + } } macro_rules! impl_scalar { diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index b69acdc861bc..1eff48ce8147 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -156,13 +156,8 @@ impl Accumulator for ArrayAggAccumulator { fn size(&self) -> usize { // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) - std::mem::size_of_val(self) - + (std::mem::size_of::() * self.values.capacity()) - + self - .values - .iter() - .map(|sv| sv.size() - std::mem::size_of_val(sv)) - .sum::() + std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.values) + - std::mem::size_of_val(&self.values) } } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 5a1857916087..bb32a9ffd666 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -159,13 +159,8 @@ impl Accumulator for DistinctArrayAggAccumulator { fn size(&self) -> usize { // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) - std::mem::size_of_val(self) - + (std::mem::size_of::() * self.values.capacity()) - + self - .values - .iter() - .map(|sv| sv.size() - std::mem::size_of_val(sv)) - .sum::() + std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.values) + - std::mem::size_of_val(&self.values) } } diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index da90f660de35..ead64ebd809d 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -227,12 +227,7 @@ impl Accumulator for DistinctCountAccumulator { .values .iter() .map(|vals| { - (std::mem::size_of::() * vals.0.capacity()) - + vals - .0 - .iter() - .map(|sv| sv.size() - std::mem::size_of_val(sv)) - .sum::() + ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0) }) .sum::() + (std::mem::size_of::() * self.state_data_types.capacity()) diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index 3e2119eac6b4..95823c5175fa 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -178,9 +178,8 @@ impl Accumulator for DistinctSumAccumulator { fn size(&self) -> usize { // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147) - std::mem::size_of_val(self) - + (std::mem::size_of::() * self.hash_values.capacity()) - + self.hash_values.iter().map(|sv| sv.size()).sum::() + std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.hash_values) + - std::mem::size_of_val(&self.hash_values) } }