Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-encode dictionaries in selection kernels (take / concat_batches) #3558

Merged
merged 24 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions arrow-schema/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ impl DataType {
)
}

/// Returns true if this type is a variable length byte array type
tustvold marked this conversation as resolved.
Show resolved Hide resolved
#[inline]
pub fn is_byte_array(&self) -> bool {
use DataType::*;
matches!(self, Utf8 | LargeUtf8 | Binary | LargeBinary)
}

/// Returns true if this type is nested (List, FixedSizeList, LargeList, Struct, Union,
/// or Map), or a dictionary of a nested type
pub fn is_nested(&self) -> bool {
Expand Down
65 changes: 65 additions & 0 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
//! assert_eq!(arr.len(), 3);
//! ```

use crate::dictionary::merge_dictionaries;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::cast::as_dictionary_array;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_data::transform::{Capacities, MutableArrayData};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef};
use std::sync::Arc;

fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
let mut item_capacity = 0;
Expand All @@ -54,6 +59,62 @@ fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
Capacities::Binary(item_capacity, Some(bytes_capacity))
}

fn concat_dictionaries<K: ArrowDictionaryKeyType>(
arrays: &[&dyn Array],
) -> Result<DictionaryArray<K>, ArrowError> {
let first_values = &arrays[0].data().child_data()[0];
let single_dictionary = arrays
.iter()
.skip(1)
.all(|a| ArrayData::ptr_eq(&a.data().child_data()[0], first_values));

let (keys, values) = match single_dictionary {
true => {
// All arrays share same dictionary, just concatenate keys
let keys: Vec<_> = arrays
.iter()
.map(|a| as_dictionary_array::<K>(*a).keys() as _)
.collect();

(concat(&keys)?.data().clone(), first_values.clone())
}

false => {
let dictionaries: Vec<_> = arrays
.iter()
.map(|a| (as_dictionary_array::<K>(*a), None))
.collect();

let (mappings, values) = merge_dictionaries(&dictionaries)?;
let capacity = dictionaries.iter().map(|(d, _)| d.len()).sum();
let mut keys = PrimitiveBuilder::<K>::with_capacity(capacity);

for ((d, _), mapping) in dictionaries.iter().zip(mappings) {
for key in d.keys_iter() {
keys.append_option(key.map(|x| mapping[x]));
}
}
(keys.finish().into_data(), values.data().clone())
}
};

// Sanity check
assert_eq!(keys.data_type(), &K::DATA_TYPE);

let builder = keys
.into_builder()
.data_type(arrays[0].data_type().clone())
.child_data(vec![values]);

return Ok(DictionaryArray::from(unsafe { builder.build_unchecked() }));
}

macro_rules! dict_helper {
($t:ty, $arrays:expr) => {
return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
};
}

/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
if arrays.is_empty() {
Expand All @@ -78,6 +139,10 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
DataType::LargeUtf8 => binary_capacity::<LargeUtf8Type>(arrays),
DataType::Binary => binary_capacity::<BinaryType>(arrays),
DataType::LargeBinary => binary_capacity::<LargeBinaryType>(arrays),
DataType::Dictionary(k, v) if v.is_byte_array() => downcast_integer! {
k.as_ref() => (dict_helper, arrays),
_ => unreachable!("illegal dictionary key type {k}")
},
_ => Capacities::Array(arrays.iter().map(|a| a.len()).sum()),
};

Expand Down
171 changes: 171 additions & 0 deletions arrow-select/src/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use crate::interleave::interleave;
use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::cast::{as_generic_binary_array, as_largestring_array, as_string_array};
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType};
use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray};
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::bit_iterator::BitIndexIterator;
use arrow_schema::{ArrowError, DataType};
use std::collections::hash_map::Entry;
use std::collections::HashMap;

/// Given an array of dictionaries and an optional row mask compute a values array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Given an array of dictionaries and an optional row mask compute a values array
/// Given an array of dictionaries and an optional key row mask compute a values array

/// containing all unique, reference values, along with mappings from the [`DictionaryArray`]
/// keys to the new keys within this values array
pub fn merge_dictionaries<K: ArrowDictionaryKeyType>(
dictionaries: &[(&DictionaryArray<K>, Option<&[u8]>)],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to use the masks for the interleave kernel

) -> Result<(Vec<Vec<K::Native>>, ArrayRef), ArrowError> {
let mut num_values = 0;

let mut values = Vec::with_capacity(dictionaries.len());
let mut value_slices = Vec::with_capacity(dictionaries.len());

for (dictionary, key_mask) in dictionaries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw in should_merge dictionary_values function that dictionaries with same pointers are considered same ArrayData::ptr_eq(values, first_values); I don't know how frequent the scenario is. But is it worth to keep track of merged dictionary pointers and skip merging them if they are seen again?

let values_mask = match key_mask {
Some(key_mask) => {
let iter = BitIndexIterator::new(key_mask, 0, dictionary.len());
compute_values_mask(dictionary, iter)
}
None => compute_values_mask(dictionary, 0..dictionary.len()),
};
let v = dictionary.values().as_ref();
num_values += v.len();
value_slices.push(get_masked_values(v, &values_mask));
values.push(v)
}

// Map from value to new index
let mut interner = HashMap::with_capacity(num_values);
// Interleave indices for new values array
let mut indices = Vec::with_capacity(num_values);

// Compute the mapping for each dictionary
let mappings = dictionaries
.iter()
.enumerate()
.zip(value_slices)
.map(|((dictionary_idx, (dictionary, _)), values)| {
let zero = K::Native::from_usize(0).unwrap();
let mut mapping = vec![zero; dictionary.values().len()];
for (value_idx, value) in values {
mapping[value_idx] = match interner.entry(value) {
Entry::Vacant(v) => {
let idx = K::Native::from_usize(indices.len())
.ok_or_else(|| ArrowError::DictionaryKeyOverflowError)?;
indices.push((dictionary_idx, value_idx));
v.insert(idx);
idx
}
Entry::Occupied(o) => *o.get(),
}
}
Ok(mapping)
})
.collect::<Result<Vec<_>, ArrowError>>()?;

let array = interleave(&values, &indices)?;
Ok((mappings, array))
}

/// Return a mask identifying the values that are referenced by keys in `dictionary`
/// at the positions indicated by `selection`
fn compute_values_mask<K, I>(dictionary: &DictionaryArray<K>, selection: I) -> Buffer
where
K: ArrowDictionaryKeyType,
I: IntoIterator<Item = usize>,
{
let len = dictionary.values().len();
let mut builder =
BooleanBufferBuilder::new_from_buffer(MutableBuffer::new_null(len), len);

let keys = dictionary.keys();

for i in selection {
if keys.is_valid(i) {
let key = keys.values()[i];
builder.set_bit(key.as_usize(), true)
}
}
builder.finish()
}

/// Return a Vec containing for each set index in `mask`, the index and byte value of that index
fn get_masked_values<'a>(array: &'a dyn Array, mask: &Buffer) -> Vec<(usize, &'a [u8])> {
match array.data_type() {
DataType::Utf8 => masked_bytes(as_string_array(array), mask),
DataType::LargeUtf8 => masked_bytes(as_largestring_array(array), mask),
DataType::Binary => masked_bytes(as_generic_binary_array::<i32>(array), mask),
DataType::LargeBinary => {
masked_bytes(as_generic_binary_array::<i64>(array), mask)
}
_ => unimplemented!(),
}
}

/// Compute [`get_masked_values`] for a [`GenericByteArray`]
///
/// Note: this does not check the null mask and will return values contained in null slots
fn masked_bytes<'a, T: ByteArrayType>(
array: &'a GenericByteArray<T>,
mask: &Buffer,
) -> Vec<(usize, &'a [u8])> {
let cap = mask.count_set_bits_offset(0, array.len());
let mut out = Vec::with_capacity(cap);
for idx in BitIndexIterator::new(mask.as_slice(), 0, array.len()) {
out.push((idx, array.value(idx).as_ref()))
}
out
}

#[cfg(test)]
mod tests {
use crate::dictionary::merge_dictionaries;
use arrow_array::cast::{as_dictionary_array, as_string_array};
use arrow_array::types::Int32Type;
use arrow_array::{Array, DictionaryArray};
use arrow_buffer::Buffer;

#[test]
fn test_merge_strings() {
let a =
DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]);
let b = DictionaryArray::<Int32Type>::from_iter(["c", "f", "c", "d", "a", "d"]);
let (mappings, combined) = merge_dictionaries(&[(&a, None), (&b, None)]).unwrap();

let values = as_string_array(combined.as_ref());
let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
assert_eq!(&actual, &["a", "b", "d", "c", "e", "f"]);

assert_eq!(mappings.len(), 2);
assert_eq!(&mappings[0], &[0, 1, 2, 3, 4]);
assert_eq!(&mappings[1], &[3, 5, 2, 0]);

let a_slice = a.slice(1, 4);
let (mappings, combined) = merge_dictionaries(&[
(as_dictionary_array::<Int32Type>(a_slice.as_ref()), None),
(&b, None),
])
.unwrap();

let values = as_string_array(combined.as_ref());
let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
assert_eq!(&actual, &["a", "b", "d", "c", "f"]);

assert_eq!(mappings.len(), 2);
assert_eq!(&mappings[0], &[0, 1, 2, 0, 0]);
assert_eq!(&mappings[1], &[3, 4, 2, 0]);

// Mask out only ["b", "b", "d"] from a
let mask = Buffer::from_iter([false, true, false, true, true, false, false]);
let (mappings, combined) =
merge_dictionaries(&[(&a, Some(&mask)), (&b, None)]).unwrap();

let values = as_string_array(combined.as_ref());
let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
assert_eq!(&actual, &["b", "d", "c", "f", "a"]);

assert_eq!(mappings.len(), 2);
assert_eq!(&mappings[0], &[0, 0, 1, 0, 0]);
assert_eq!(&mappings[1], &[2, 3, 1, 4]);
}
}
1 change: 1 addition & 0 deletions arrow-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Arrow selection kernels

pub mod concat;
mod dictionary;
pub mod filter;
pub mod interleave;
pub mod nullif;
Expand Down