Skip to content

Commit

Permalink
Cherry pick Reduce memory usage of concat (large)utf8 to active_relea…
Browse files Browse the repository at this point in the history
…se (#411)

* Reduce memory usage of concat (large)utf8 (#348)

* reduce memory needed for concat

* reuse code for str allocation buffer

* make sure that only concat preallocates buffers (#382)

* MutableArrayData::with_capacities

* better pattern matching

* add binary capacities

* add list child data

* add struct capacities

* add panic for dictionary type

* change dictionary capacity enum variant

Co-authored-by: Ritchie Vink <ritchie46@gmail.com>
  • Loading branch information
alamb and ritchie46 committed Jun 9, 2021
1 parent 8179193 commit 5b80035
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 15 deletions.
2 changes: 1 addition & 1 deletion arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub type DurationMillisecondBuilder = PrimitiveBuilder<DurationMillisecondType>;
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;

pub use self::transform::MutableArrayData;
pub use self::transform::{Capacities, MutableArrayData};

// --------------------- Array Iterator ---------------------

Expand Down
152 changes: 139 additions & 13 deletions arrow/src/array/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use crate::{
error::{ArrowError, Result},
util::bit_util,
};
use std::mem;

use super::{
data::{into_buffers, new_buffers},
ArrayData,
};
use crate::array::StringOffsetSizeTrait;

mod boolean;
mod fixed_binary;
Expand Down Expand Up @@ -324,14 +326,71 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
})
}

fn preallocate_offset_and_binary_buffer<Offset: StringOffsetSizeTrait>(
capacity: usize,
binary_size: usize,
) -> [MutableBuffer; 2] {
// offsets
let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
// safety: `unsafe` code assumes that this buffer is initialized with one element
if Offset::is_large() {
buffer.push(0i64);
} else {
buffer.push(0i32)
}

[
buffer,
MutableBuffer::new(binary_size * mem::size_of::<u8>()),
]
}

/// Define capacities of child data or data buffers.
#[derive(Debug, Clone)]
pub enum Capacities {
/// Binary, Utf8 and LargeUtf8 data types
/// Define
/// * the capacity of the array offsets
/// * the capacity of the binary/ str buffer
Binary(usize, Option<usize>),
/// List and LargeList data types
/// Define
/// * the capacity of the array offsets
/// * the capacity of the child data
List(usize, Option<Box<Capacities>>),
/// Struct type
/// * the capacity of the array
/// * the capacities of the fields
Struct(usize, Option<Vec<Capacities>>),
/// Dictionary type
/// * the capacity of the array/keys
/// * the capacity of the values
Dictionary(usize, Option<Box<Capacities>>),
/// Don't preallocate inner buffers and rely on array growth strategy
Array(usize),
}
impl<'a> MutableArrayData<'a> {
/// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an
/// [ArrayData] from multiple `arrays`.
///
/// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls
/// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls].
/// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used.
pub fn new(arrays: Vec<&'a ArrayData>, mut use_nulls: bool, capacity: usize) -> Self {
pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
}

/// Similar to [MutableArray::new], but lets users define the preallocated capacities of the array.
/// See also [MutableArray::new] for more information on the arguments.
///
/// # Panic
/// This function panics if the given `capacities` don't match the data type of `arrays`. Or when
/// a [Capacities] variant is not yet supported.
pub fn with_capacities(
arrays: Vec<&'a ArrayData>,
mut use_nulls: bool,
capacities: Capacities,
) -> Self {
let data_type = arrays[0].data_type();
use crate::datatypes::*;

Expand All @@ -341,7 +400,25 @@ impl<'a> MutableArrayData<'a> {
use_nulls = true;
};

let [buffer1, buffer2] = new_buffers(data_type, capacity);
let mut array_capacity;

let [buffer1, buffer2] = match (data_type, &capacities) {
(DataType::LargeUtf8, Capacities::Binary(capacity, Some(value_cap)))
| (DataType::LargeBinary, Capacities::Binary(capacity, Some(value_cap))) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
}
(DataType::Utf8, Capacities::Binary(capacity, Some(value_cap)))
| (DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
}
(_, Capacities::Array(capacity)) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
_ => panic!("Capacities: {:?} not yet supported", capacities),
};

let child_data = match &data_type {
DataType::Null
Expand Down Expand Up @@ -373,20 +450,66 @@ impl<'a> MutableArrayData<'a> {
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
vec![MutableArrayData::new(childs, use_nulls, capacity)]

let capacities = if let Capacities::List(capacity, ref child_capacities) =
capacities
{
array_capacity = capacity;
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(array_capacity))
} else {
Capacities::Array(array_capacity)
};

vec![MutableArrayData::with_capacities(
childs, use_nulls, capacities,
)]
}
// the dictionary type just appends keys and clones the values.
DataType::Dictionary(_, _) => vec![],
DataType::Float16 => unreachable!(),
DataType::Struct(fields) => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, capacity)
})
.collect::<Vec<_>>(),
DataType::Struct(fields) => match capacities {
Capacities::Struct(capacity, Some(ref child_capacities)) => {
array_capacity = capacity;
(0..fields.len())
.zip(child_capacities)
.map(|(i, child_cap)| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::with_capacities(
child_arrays,
use_nulls,
child_cap.clone(),
)
})
.collect::<Vec<_>>()
}
Capacities::Struct(capacity, None) => {
array_capacity = capacity;
(0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, capacity)
})
.collect::<Vec<_>>()
}
_ => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, array_capacity)
})
.collect::<Vec<_>>(),
},
_ => {
todo!("Take and filter operations still not supported for this datatype")
}
Expand All @@ -397,6 +520,9 @@ impl<'a> MutableArrayData<'a> {
0 => unreachable!(),
1 => Some(arrays[0].child_data()[0].clone()),
_ => {
if let Capacities::Dictionary(_, _) = capacities {
panic!("dictionary capacity not yet supported")
}
// Concat dictionaries together
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
Expand Down Expand Up @@ -426,7 +552,7 @@ impl<'a> MutableArrayData<'a> {
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();

let null_bytes = bit_util::ceil(capacity, 8);
let null_bytes = bit_util::ceil(array_capacity, 8);
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);

let extend_values = match &data_type {
Expand Down
57 changes: 56 additions & 1 deletion arrow/src/compute/kernels/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,26 @@
//! ```

use crate::array::*;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};

fn compute_str_values_length<Offset: StringOffsetSizeTrait>(
arrays: &[&ArrayData],
) -> usize {
arrays
.iter()
.map(|&data| {
// get the length of the value buffer
let buf_len = data.buffers()[1].len();
// find the offset of the buffer
// this returns a slice of offsets, starting from the offset of the array
// so we can take the first value
let offset = data.buffer::<Offset>(0)[0];
buf_len - offset.to_usize().unwrap()
})
.sum()
}

/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&Array]) -> Result<ArrayRef> {
if arrays.is_empty() {
Expand All @@ -56,7 +74,25 @@ pub fn concat(arrays: &[&Array]) -> Result<ArrayRef> {

let arrays = arrays.iter().map(|a| a.data()).collect::<Vec<_>>();

let mut mutable = MutableArrayData::new(arrays, false, capacity);
let mut mutable = match arrays[0].data_type() {
DataType::Utf8 => {
let str_values_size = compute_str_values_length::<i32>(&arrays);
MutableArrayData::with_capacities(
arrays,
false,
Capacities::Binary(capacity, Some(str_values_size)),
)
}
DataType::LargeUtf8 => {
let str_values_size = compute_str_values_length::<i64>(&arrays);
MutableArrayData::with_capacities(
arrays,
false,
Capacities::Binary(capacity, Some(str_values_size)),
)
}
_ => MutableArrayData::new(arrays, false, capacity),
};

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
Expand Down Expand Up @@ -452,4 +488,23 @@ mod tests {
let concat = concat_dictionary(input_1, input_2);
assert_eq!(concat, expected);
}

#[test]
fn test_concat_string_sizes() -> Result<()> {
let a: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
let b: LargeStringArray = ((0..150).map(|_| Some("foo"))).collect();
let c = LargeStringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
// 150 * 3 = 450
// 150 * 3 = 450
// 3 * 3 = 9
// ------------+
// 909
// closest 64 byte aligned cap = 960

let arr = concat(&[&a, &b, &c])?;
// this would have been 1280 if we did not precompute the value lengths.
assert_eq!(arr.data().buffers()[1].capacity(), 960);

Ok(())
}
}

0 comments on commit 5b80035

Please sign in to comment.