Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion-examples/examples/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl Accumulator for GeometricMean {
self.merge(&v)
})
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}

#[tokio::main]
Expand Down
89 changes: 89 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::borrow::Borrow;
use std::cmp::{max, Ordering};
use std::collections::HashSet;
use std::convert::{Infallible, TryInto};
use std::ops::{Add, Sub};
use std::str::FromStr;
Expand Down Expand Up @@ -2290,6 +2291,94 @@ impl ScalarValue {
ScalarValue::Null => array.data().is_null(index),
}
}

/// Estimate size if bytes including `Self`. For values with internal containers such as `String`
/// includes the allocated size (`capacity`) rather than the current length (`len`)
pub fn size(&self) -> usize {
std::mem::size_of_val(&self)
+ match self {
ScalarValue::Null
| ScalarValue::Boolean(_)
| ScalarValue::Float32(_)
| ScalarValue::Float64(_)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Int8(_)
| ScalarValue::Int16(_)
| ScalarValue::Int32(_)
| ScalarValue::Int64(_)
| ScalarValue::UInt8(_)
| ScalarValue::UInt16(_)
| ScalarValue::UInt32(_)
| ScalarValue::UInt64(_)
| ScalarValue::Date32(_)
| ScalarValue::Date64(_)
| ScalarValue::Time32Second(_)
| ScalarValue::Time32Millisecond(_)
| ScalarValue::Time64Microsecond(_)
| ScalarValue::Time64Nanosecond(_)
| ScalarValue::IntervalYearMonth(_)
| ScalarValue::IntervalDayTime(_)
| ScalarValue::IntervalMonthDayNano(_) => 0,
ScalarValue::Utf8(s)
| ScalarValue::LargeUtf8(s)
| ScalarValue::TimestampSecond(_, s)
| ScalarValue::TimestampMillisecond(_, s)
| ScalarValue::TimestampMicrosecond(_, s)
| ScalarValue::TimestampNanosecond(_, s) => {
s.as_ref().map(|s| s.capacity()).unwrap_or_default()
}
ScalarValue::Binary(b)
| ScalarValue::FixedSizeBinary(_, b)
| ScalarValue::LargeBinary(b) => {
b.as_ref().map(|b| b.capacity()).unwrap_or_default()
}
// TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
ScalarValue::List(vals, field) => {
vals.as_ref()
.map(|vals| Self::size_of_vec(vals) - std::mem::size_of_val(vals))
.unwrap_or_default()
+ std::mem::size_of_val(field)
}
// TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
ScalarValue::Struct(vals, fields) => {
vals.as_ref()
.map(|vals| {
vals.iter()
.map(|sv| sv.size() - std::mem::size_of_val(sv))
.sum::<usize>()
+ (std::mem::size_of::<ScalarValue>() * vals.capacity())
})
.unwrap_or_default()
+ (std::mem::size_of::<Field>() * fields.capacity())
}
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
ScalarValue::Dictionary(dt, sv) => {
std::mem::size_of_val(dt.as_ref()) + sv.size()
}
}
}

/// Estimates [size](Self::size) of [`Vec`] in bytes.
///
/// Includes the size of the [`Vec`] container itself.
pub fn size_of_vec(vec: &Vec<Self>) -> usize {
(std::mem::size_of::<ScalarValue>() * vec.capacity())
+ vec
.iter()
.map(|sv| sv.size() - std::mem::size_of_val(sv))
.sum::<usize>()
}

/// Estimates [size](Self::size) of [`HashSet`] in bytes.
///
/// Includes the size of the [`HashSet`] container itself.
pub fn size_of_hashset<S>(set: &HashSet<Self, S>) -> usize {
(std::mem::size_of::<ScalarValue>() * set.capacity())
+ set
.iter()
.map(|sv| sv.size() - std::mem::size_of_val(sv))
.sum::<usize>()
}
}

macro_rules! impl_scalar {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,8 @@ impl Accumulator for FirstSelector {
// same logic is needed as in update_batch
self.update_batch(states)
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
5 changes: 5 additions & 0 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub trait Accumulator: Send + Sync + Debug {

/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;

/// Allocated size required for this accumulator, in bytes, including `Self`.
/// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
/// not the `len`
fn size(&self) -> usize;
}

/// Representation of internal accumulator state. Accumulators can potentially have a mix of
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/aggregate/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ macro_rules! default_accumulator_impl {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(self.hll.count() as u64)))
}

fn size(&self) -> usize {
// HLL has static size
std::mem::size_of_val(self)
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ impl ApproxPercentileAccumulator {
}
}
}

impl Accumulator for ApproxPercentileAccumulator {
fn state(&self) -> Result<Vec<AggregateState>> {
Ok(self
Expand Down Expand Up @@ -413,4 +414,9 @@ impl Accumulator for ApproxPercentileAccumulator {

Ok(())
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::size_of_val(self) + self.digest.size()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {

Ok(())
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
- std::mem::size_of_val(&self.approx_percentile_cont_accumulator)
+ self.approx_percentile_cont_accumulator.size()
}
}
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ impl Accumulator for ArrayAggAccumulator {
self.datatype.clone(),
))
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.values)
- std::mem::size_of_val(&self.values)
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ impl Accumulator for DistinctArrayAggAccumulator {
self.datatype.clone(),
))
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
- std::mem::size_of_val(&self.values)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ impl Accumulator for AvgAccumulator {
)),
}
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
}
}

#[derive(Debug)]
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-expr/src/aggregate/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ impl Accumulator for CorrelationAccumulator {

Ok(ScalarValue::Float64(None))
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.covar)
+ self.covar.size()
- std::mem::size_of_val(&self.stddev1)
+ self.stddev1.size()
- std::mem::size_of_val(&self.stddev2)
+ self.stddev2.size()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ impl Accumulator for CountAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count)))
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}

#[derive(Debug)]
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-expr/src/aggregate/count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,20 @@ impl Accumulator for DistinctCountAccumulator {
))),
}
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::size_of_val(self)
+ (std::mem::size_of::<DistinctScalarValues>() * self.values.capacity())
+ self
.values
.iter()
.map(|vals| {
ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0)
})
.sum::<usize>()
+ (std::mem::size_of::<DataType>() * self.state_data_types.capacity())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ impl Accumulator for CovarianceAccumulator {
Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
}
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ impl Accumulator for MedianAccumulator {
)),
}
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::align_of_val(self)
+ (std::mem::size_of::<ArrayRef>() * self.all_values.capacity())
+ self
.all_values
.iter()
.map(|array_ref| {
std::mem::size_of_val(array_ref.as_ref())
+ array_ref.get_array_memory_size()
})
.sum::<usize>()
}
}

/// Create an empty array
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,10 @@ impl Accumulator for MaxAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.max.clone())
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -735,6 +739,10 @@ impl Accumulator for MinAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.min.clone())
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
}
}

#[derive(Debug)]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/aggregate/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ impl Accumulator for StddevAccumulator {
)),
}
}

fn size(&self) -> usize {
std::mem::align_of_val(self) - std::mem::align_of_val(&self.variance)
+ self.variance.size()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ impl Accumulator for SumAccumulator {
Ok(self.sum.clone())
}
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
}
}

#[derive(Debug)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/aggregate/sum_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ impl Accumulator for DistinctSumAccumulator {
}
Ok(sum_value)
}

fn size(&self) -> usize {
// TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.hash_values)
- std::mem::size_of_val(&self.hash_values)
}
}

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-expr/src/aggregate/tdigest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ impl TDigest {
pub(crate) fn max_size(&self) -> usize {
self.max_size
}

/// Size in bytes including `Self`.
pub(crate) fn size(&self) -> usize {
std::mem::size_of_val(self)
+ (std::mem::size_of::<Centroid>() * self.centroids.capacity())
}
}

impl Default for TDigest {
Expand Down Expand Up @@ -741,4 +747,12 @@ mod tests {
assert_error_bounds!(t, quantile = 0.5, want = 500.0);
assert_state_roundtrip!(t);
}

#[test]
fn test_size() {
let t = TDigest::new(10);
let t = t.merge_unsorted_f64(vec![0.0, 1.0]);

assert_eq!(t.size(), 96);
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ impl Accumulator for VarianceAccumulator {
Ok(ScalarValue::Float64(Some(self.m2 / count as f64)))
}
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,10 @@ mod roundtrip_tests {
fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
Ok(ScalarValue::Float64(None))
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}

let dummy_agg = create_udaf(
Expand Down