Skip to content

Commit

Permalink
feat: Implement GetSize for array (risingwavelabs#8995)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Apr 7, 2023
1 parent 36dd239 commit 85980fd
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 25 deletions.
9 changes: 9 additions & 0 deletions src/common/src/array/bool_array.rs
Expand Up @@ -17,6 +17,7 @@ use risingwave_pb::data::{ArrayType, PbArray};
use super::{Array, ArrayBuilder, ArrayMeta};
use crate::array::ArrayBuilderImpl;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BoolArray {
Expand Down Expand Up @@ -66,6 +67,12 @@ impl FromIterator<bool> for BoolArray {
}
}

impl EstimateSize for BoolArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.estimated_heap_size()
}
}

impl Array for BoolArray {
type Builder = BoolArrayBuilder;
type OwnedItem = bool;
Expand Down Expand Up @@ -194,6 +201,8 @@ mod tests {
})
.collect_vec();
let array = helper_test_builder(v.clone());
assert_eq!(256, array.estimated_heap_size());
assert_eq!(320, array.estimated_size());
let res = v.iter().zip_eq_fast(array.iter()).all(|(a, b)| *a == b);
assert!(res);
}
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/array/bytes_array.rs
Expand Up @@ -22,6 +22,7 @@ use risingwave_pb::data::{ArrayType, PbArray};
use super::{Array, ArrayBuilder, ArrayMeta};
use crate::array::ArrayBuilderImpl;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::util::iter_util::ZipEqDebug;

/// `BytesArray` is a collection of Rust `[u8]`s.
Expand All @@ -32,6 +33,14 @@ pub struct BytesArray {
data: Vec<u8>,
}

impl EstimateSize for BytesArray {
fn estimated_heap_size(&self) -> usize {
self.offset.capacity() * size_of::<u32>()
+ self.bitmap.estimated_heap_size()
+ self.data.capacity()
}
}

impl Array for BytesArray {
type Builder = BytesArrayBuilder;
type OwnedItem = Box<[u8]>;
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/array/jsonb_array.rs
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::size_of;

use postgres_types::{FromSql as _, ToSql as _, Type};
use serde_json::Value;

use super::{Array, ArrayBuilder};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::{Scalar, ScalarRef};
use crate::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -470,3 +473,10 @@ impl serde_json::ser::Formatter for ToTextFormatter {
writer.write_all(b": ")
}
}

// TODO: We need to fix this later.
impl EstimateSize for JsonbArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<Value>()
}
}
10 changes: 10 additions & 0 deletions src/common/src/array/list_array.rs
Expand Up @@ -16,6 +16,7 @@ use core::fmt;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::size_of;

use bytes::{Buf, BufMut};
use itertools::EitherOrBoth::{Both, Left, Right};
Expand All @@ -25,6 +26,7 @@ use serde::{Deserializer, Serializer};

use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayMeta, ArrayResult, RowRef};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::row::Row;
use crate::types::to_text::ToText;
use crate::types::{hash_datum, DataType, Datum, DatumRef, Scalar, ScalarRefImpl, ToDatumRef};
Expand Down Expand Up @@ -160,6 +162,14 @@ pub struct ListArray {
pub(super) value_type: DataType,
}

impl EstimateSize for ListArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size()
+ self.offsets.capacity() * size_of::<u32>()
+ self.value.estimated_heap_size()
}
}

impl Array for ListArray {
type Builder = ListArrayBuilder;
type OwnedItem = ListValue;
Expand Down
21 changes: 19 additions & 2 deletions src/common/src/array/mod.rs
Expand Up @@ -71,9 +71,10 @@ pub use crate::array::num256_array::{
Int256Array, Int256ArrayBuilder, Uint256Array, Uint256ArrayBuilder,
};
use crate::buffer::Bitmap;
use crate::collection::estimate_size::EstimateSize;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;
pub type ArrayResult<T> = std::result::Result<T, ArrayError>;
pub type ArrayResult<T> = Result<T, ArrayError>;

pub type I64Array = PrimitiveArray<i64>;
pub type I32Array = PrimitiveArray<i32>;
Expand Down Expand Up @@ -162,7 +163,9 @@ pub trait ArrayBuilder: Send + Sync + Sized + 'static {
/// In some cases, we will need to store owned data. For example, when aggregating min
/// and max, we need to store current maximum in the aggregator. In this case, we
/// could use `A::OwnedItem` in aggregator struct.
pub trait Array: std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> {
pub trait Array:
std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
{
/// A reference to item in array, as well as return type of `value_at`, which is
/// reciprocal to `Self::OwnedItem`.
type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
Expand Down Expand Up @@ -676,6 +679,20 @@ macro_rules! impl_array {

for_all_variants! { impl_array }

macro_rules! impl_array_estimate_size {
($({ $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
impl EstimateSize for ArrayImpl {
fn estimated_heap_size(&self) -> usize {
match self {
$( Self::$variant_name(inner) => inner.estimated_heap_size(), )*
}
}
}
}
}

for_all_variants! { impl_array_estimate_size }

impl ArrayImpl {
pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
(0..self.len()).map(|i| self.value_at(i))
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/array/num256_array.rs
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::io::{Cursor, Read};
use std::mem::size_of;

use ethnum::{I256, U256};
use risingwave_pb::common::buffer::CompressionType;
Expand All @@ -21,6 +22,7 @@ use risingwave_pb::data::PbArray;

use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::num256::{Int256, Int256Ref, Uint256, Uint256Ref};
use crate::types::Scalar;

Expand Down Expand Up @@ -204,3 +206,15 @@ impl_array_for_num256!(
Int256Ref<'a>,
Int256
);

impl EstimateSize for Uint256Array {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<U256>()
}
}

impl EstimateSize for Int256Array {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<I256>()
}
}
7 changes: 7 additions & 0 deletions src/common/src/array/primitive_array.rs
Expand Up @@ -24,6 +24,7 @@ use super::{Array, ArrayBuilder, ArrayResult};
use crate::array::serial_array::Serial;
use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::for_all_native_types;
use crate::types::decimal::Decimal;
use crate::types::interval::Interval;
Expand Down Expand Up @@ -273,6 +274,12 @@ impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
}
}

impl<T: PrimitiveArrayItemType> EstimateSize for PrimitiveArray<T> {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<T>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
72 changes: 49 additions & 23 deletions src/common/src/array/struct_array.rs
Expand Up @@ -25,6 +25,7 @@ use risingwave_pb::data::{PbArray, PbArrayType, StructArrayData};
use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayMeta, ArrayResult};
use crate::array::ArrayRef;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::to_text::ToText;
use crate::types::{hash_datum, DataType, Datum, DatumRef, Scalar, ScalarRefImpl, ToDatumRef};
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -128,13 +129,13 @@ impl ArrayBuilder for StructArrayBuilder {
.into_iter()
.map(|b| Arc::new(b.finish()))
.collect::<Vec<ArrayRef>>();
StructArray {
bitmap: self.bitmap.finish(),
StructArray::new(
self.bitmap.finish(),
children,
children_type: self.children_type,
children_names: self.children_names,
len: self.len,
}
self.children_type,
self.children_names,
self.len,
)
}
}

Expand All @@ -145,6 +146,8 @@ pub struct StructArray {
children_type: Arc<[DataType]>,
children_names: Arc<[String]>,
len: usize,

heap_size: usize,
}

impl StructArrayBuilder {
Expand Down Expand Up @@ -219,6 +222,29 @@ impl Array for StructArray {
}

impl StructArray {
fn new(
bitmap: Bitmap,
children: Vec<ArrayRef>,
children_type: Arc<[DataType]>,
children_names: Arc<[String]>,
len: usize,
) -> Self {
let heap_size = bitmap.estimated_heap_size()
+ children
.iter()
.map(|c| c.estimated_heap_size())
.sum::<usize>();

Self {
bitmap,
children,
children_type,
children_names,
len,
heap_size,
}
}

pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
ensure!(
array.values.is_empty(),
Expand All @@ -238,13 +264,7 @@ impl StructArray {
.map(DataType::from)
.collect::<Vec<DataType>>()
.into();
let arr = StructArray {
bitmap,
children,
children_type,
children_names: vec![].into(),
len: cardinality,
};
let arr = Self::new(bitmap, children, children_type, vec![].into(), cardinality);
Ok(arr.into())
}

Expand Down Expand Up @@ -273,13 +293,13 @@ impl StructArray {
let cardinality = null_bitmap.len();
let bitmap = Bitmap::from_iter(null_bitmap.to_vec());
let children = children.into_iter().map(Arc::new).collect_vec();
StructArray {
Self::new(
bitmap,
children_type: children_type.into(),
children_names: vec![].into(),
len: cardinality,
children,
}
children_type.into(),
vec![].into(),
cardinality,
)
}

pub fn from_slices_with_field_names(
Expand All @@ -291,13 +311,13 @@ impl StructArray {
let cardinality = null_bitmap.len();
let bitmap = Bitmap::from_iter(null_bitmap.to_vec());
let children = children.into_iter().map(Arc::new).collect_vec();
StructArray {
Self::new(
bitmap,
children_type: children_type.into(),
children_names: children_name.into(),
len: cardinality,
children,
}
children_type.into(),
children_name.into(),
cardinality,
)
}

#[cfg(test)]
Expand All @@ -310,6 +330,12 @@ impl StructArray {
}
}

impl EstimateSize for StructArray {
fn estimated_heap_size(&self) -> usize {
self.heap_size
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
pub struct StructValue {
fields: Box<[Datum]>,
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/array/utf8_array.rs
Expand Up @@ -20,13 +20,20 @@ use super::bytes_array::{BytesWriter, PartialBytesWriter};
use super::{Array, ArrayBuilder, ArrayMeta, BytesArray, BytesArrayBuilder};
use crate::array::ArrayBuilderImpl;
use crate::buffer::Bitmap;
use crate::collection::estimate_size::EstimateSize;

/// `Utf8Array` is a collection of Rust Utf8 `str`s. It's a wrapper of `BytesArray`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Utf8Array {
bytes: BytesArray,
}

impl EstimateSize for Utf8Array {
fn estimated_heap_size(&self) -> usize {
self.bytes.estimated_heap_size()
}
}

impl Array for Utf8Array {
type Builder = Utf8ArrayBuilder;
type OwnedItem = Box<str>;
Expand Down

0 comments on commit 85980fd

Please sign in to comment.