Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ArrayEqual for UnionArray #1469

Merged
merged 8 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2117,7 +2117,10 @@ impl UnionBuilder {
let mut field_data = match self.fields.remove(&type_name) {
Some(data) => data,
None => match self.value_offset_builder {
Some(_) => FieldData::new(self.fields.len() as i8, T::DATA_TYPE, None),
Some(_) => {
// For Dense Union, we don't build bitmap in individual field
FieldData::new(self.fields.len() as i8, T::DATA_TYPE, None)
}
None => {
let mut fd = FieldData::new(
self.fields.len() as i8,
Expand Down
104 changes: 101 additions & 3 deletions arrow/src/array/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod list;
mod null;
mod primitive;
mod structure;
mod union;
mod utils;
mod variable_size;

Expand All @@ -55,6 +56,7 @@ use list::list_equal;
use null::null_equal;
use primitive::primitive_equal;
use structure::struct_equal;
use union::union_equal;
use variable_size::variable_sized_equal;

impl PartialEq for dyn Array {
Expand Down Expand Up @@ -232,7 +234,9 @@ fn equal_values(
DataType::Struct(_) => {
struct_equal(lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len)
}
DataType::Union(_, _) => unimplemented!("See ARROW-8576"),
DataType::Union(_, _) => {
union_equal(lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len)
}
DataType::Dictionary(data_type, _) => match data_type.as_ref() {
DataType::Int8 => dictionary_equal::<i8>(
lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len,
Expand Down Expand Up @@ -313,11 +317,11 @@ mod tests {
array::Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryOffsetSizeTrait,
BooleanArray, FixedSizeBinaryBuilder, FixedSizeListBuilder, GenericBinaryArray,
Int32Builder, ListBuilder, NullArray, PrimitiveBuilder, StringArray,
StringDictionaryBuilder, StringOffsetSizeTrait, StructArray,
StringDictionaryBuilder, StringOffsetSizeTrait, StructArray, UnionBuilder,
};
use crate::array::{GenericStringArray, Int32Array};
use crate::buffer::Buffer;
use crate::datatypes::{Field, Int16Type, ToByteSlice};
use crate::datatypes::{Field, Int16Type, Int32Type, ToByteSlice};

use super::*;

Expand Down Expand Up @@ -1347,4 +1351,98 @@ mod tests {
// nulls in it, string1 and string2 are not equal
test_equal(string1, &string2, false);
}

#[test]
fn test_union_equal_dense() {
let mut builder = UnionBuilder::new_dense(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union1 = builder.build().unwrap();

builder = UnionBuilder::new_dense(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union2 = builder.build().unwrap();

builder = UnionBuilder::new_dense(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 5).unwrap();
builder.append::<Int32Type>("c", 4).unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union3 = builder.build().unwrap();

builder = UnionBuilder::new_dense(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union4 = builder.build().unwrap();

test_equal(union1.data(), union2.data(), true);
test_equal(union1.data(), union3.data(), false);
test_equal(union1.data(), union4.data(), false);
}

#[test]
fn test_union_equal_sparse() {
let mut builder = UnionBuilder::new_sparse(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union1 = builder.build().unwrap();

builder = UnionBuilder::new_sparse(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union2 = builder.build().unwrap();

builder = UnionBuilder::new_sparse(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 5).unwrap();
builder.append::<Int32Type>("c", 4).unwrap();
builder.append::<Int32Type>("a", 6).unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union3 = builder.build().unwrap();

builder = UnionBuilder::new_sparse(7);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Int32Type>("b", 2).unwrap();
builder.append::<Int32Type>("c", 3).unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
builder.append_null().unwrap();
builder.append_null().unwrap();
builder.append::<Int32Type>("b", 7).unwrap();
let union4 = builder.build().unwrap();

test_equal(union1.data(), union2.data(), true);
test_equal(union1.data(), union3.data(), false);
test_equal(union1.data(), union4.data(), false);
}
}
171 changes: 171 additions & 0 deletions arrow/src/array/equal/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::datatypes::Field;
use crate::{
array::ArrayData, buffer::Buffer, datatypes::DataType, datatypes::UnionMode,
};

use super::{
equal_range, equal_values, utils::child_logical_null_buffer, utils::equal_nulls,
};

// Checks if corresponding slots in two UnionArrays are same data types
fn equal_types(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to be equal the two arrays must have the same schema and therefore lhs_fields == rhs_fields I think therefore this could simply just compare the type_id arrays for equality directly?

Copy link
Member Author

@viirya viirya Mar 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, even type_ids are different, isn't it still possible that the union arrays are equal?

For example, the fields are int, int, float, string. If lhs type_ids is 1, but rhs type_id is 0, they are both int. We don't know if the actual element at child arrays are equal or not.

So here I compare if the type_ids are the same type or not, not its values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And two union arrays could be equal even if two fields/type_ids are different. For example, we can change the field position and type_id, but the resulting union array is still the same.

Copy link
Contributor

@tustvold tustvold Mar 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of two fields with the same type, I believe these must still have distinct field names and therefore there is a logical difference between values stored in one vs the other. I'm not sure about this though, possibly worth an upstream clarification 🤔

On the case of different field ordering the arrays can never compare equal as they have different data types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equal_types was removed now.

lhs_fields: &[Field],
rhs_fields: &[Field],
lhs_type_ids: &[i8],
rhs_type_ids: &[i8],
) -> bool {
let lhs_slots_types = lhs_type_ids
.iter()
.map(|type_id| lhs_fields.get(*type_id as usize).unwrap().data_type());

let rhs_slots_types = rhs_type_ids
.iter()
.map(|type_id| rhs_fields.get(*type_id as usize).unwrap().data_type());

lhs_slots_types.zip(rhs_slots_types).all(|(l, r)| l == r)
}

fn equal_dense(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much like equal_sparse below, by the time we are calling this method we have already established equality of the type_ids, I therefore wonder if we can simply compute equality of the offsets arrays, followed by computing equality of the underlying child arrays.

This appears to be similar to what is done for list_equal 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even type_ids equality is established, lhs and rhs start positions might be different. So we cannot simply compare offsets arrays and child arrays for dense case.

lhs: &ArrayData,
rhs: &ArrayData,
lhs_type_ids: &[i8],
rhs_type_ids: &[i8],
lhs_offsets: &[i32],
rhs_offsets: &[i32],
) -> bool {
let offsets = lhs_offsets.iter().zip(rhs_offsets.iter());

lhs_type_ids
.iter()
.zip(rhs_type_ids.iter())
.zip(offsets)
.all(|((l_type_id, r_type_id), (l_offset, r_offset))| {
let lhs_values = &lhs.child_data()[*l_type_id as usize];
let rhs_values = &rhs.child_data()[*r_type_id as usize];

equal_values(
lhs_values,
rhs_values,
None,
None,
*l_offset as usize,
*r_offset as usize,
1,
)
})
}

#[allow(clippy::too_many_arguments)]
fn equal_sparse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time we are calling this method we have established that

  • the types are equal
  • the null buffers are equal
  • the type ids arrays are equal

As such I'm surprised this can't just do a standard equality comparison of the child data, which will take into null buffers of the children

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to above question. As field and type_id can be changed in position, but the resulting union array is still the same, we cannot directly compare them, but can only compare child array corresponding to type_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it really does come down to "what does it mean for two UnionArrays to be equal"?

Can it ever be true that UnionArray(Int, Utf8) be equal to UnionArray(Utf8, Int)?

Given my reading of the cpp ArrayEquals code in https://github.com/apache/arrow/blob/master/cpp/src/arrow/compare.cc it seems to me that two arrays are only ever "equal" if their datatypes are (exactly) equal.

Maybe we could follow the cpp implementation -- both for consistency as well as for simplicity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if it is the case, we can simplify the comparison here. What I have in mind and implemented here can be thought as logically equal. That means corresponding slots of two union arrays are equal, but their physical layout may be different -- type_ids and fields can be different separately.

If we only want to compare the physical layout, then fields and type_ids are required to be exactly equal.

Thanks for pointing out the cpp implementation. We should follow it, I agree.

I will revise this change for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equal_sparse was simplified to compare corresponding child_data at given positions.

lhs: &ArrayData,
rhs: &ArrayData,
lhs_nulls: Option<&Buffer>,
rhs_nulls: Option<&Buffer>,
lhs_type_ids: &[i8],
rhs_type_ids: &[i8],
lhs_start: usize,
rhs_start: usize,
) -> bool {
lhs_type_ids
.iter()
.zip(rhs_type_ids.iter())
.enumerate()
.all(|(index, (l_type_id, r_type_id))| {
let lhs_values = &lhs.child_data()[*l_type_id as usize];
let rhs_values = &rhs.child_data()[*r_type_id as usize];

// merge the null data
let lhs_merged_nulls = child_logical_null_buffer(lhs, lhs_nulls, lhs_values);
let rhs_merged_nulls = child_logical_null_buffer(rhs, rhs_nulls, rhs_values);

equal_range(
lhs_values,
rhs_values,
lhs_merged_nulls.as_ref(),
rhs_merged_nulls.as_ref(),
lhs_start + index,
rhs_start + index,
1,
)
})
}

pub(super) fn union_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_nulls: Option<&Buffer>,
rhs_nulls: Option<&Buffer>,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
let lhs_type_ids = lhs.buffer::<i8>(0);
let rhs_type_ids = rhs.buffer::<i8>(0);

match (lhs.data_type(), rhs.data_type()) {
(
DataType::Union(lhs_fields, UnionMode::Dense),
DataType::Union(rhs_fields, UnionMode::Dense),
) => {
let lhs_offsets = lhs.buffer::<i32>(1);
let rhs_offsets = rhs.buffer::<i32>(1);

let lhs_type_id_range = &lhs_type_ids[lhs_start..lhs_start + len];
let rhs_type_id_range = &rhs_type_ids[rhs_start..rhs_start + len];

let lhs_offsets_range = &lhs_offsets[lhs_start..lhs_start + len];
let rhs_offsets_range = &rhs_offsets[rhs_start..rhs_start + len];

// nullness is kept in the parent UnionArray, so we compare its nulls here
equal_types(lhs_fields, rhs_fields, lhs_type_ids, rhs_type_ids)
&& equal_nulls(lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len)
&& equal_dense(
lhs,
rhs,
lhs_type_id_range,
rhs_type_id_range,
lhs_offsets_range,
rhs_offsets_range,
)
}
(
DataType::Union(lhs_fields, UnionMode::Sparse),
DataType::Union(rhs_fields, UnionMode::Sparse),
) => {
let lhs_type_id_range = &lhs_type_ids[lhs_start..lhs_start + len];
let rhs_type_id_range = &rhs_type_ids[rhs_start..rhs_start + len];

equal_types(lhs_fields, rhs_fields, lhs_type_ids, rhs_type_ids)
&& equal_sparse(
lhs,
rhs,
lhs_nulls,
rhs_nulls,
lhs_type_id_range,
rhs_type_id_range,
lhs_start,
rhs_start,
)
}
_ => unimplemented!(
"Logical equality not yet implemented between dense and sparse union arrays"
),
}
}
42 changes: 38 additions & 4 deletions arrow/src/array/equal/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::array::{data::count_nulls, ArrayData, OffsetSizeTrait};
use crate::bitmap::Bitmap;
use crate::buffer::{Buffer, MutableBuffer};
use crate::datatypes::DataType;
use crate::datatypes::{DataType, UnionMode};
use crate::util::bit_util;

// whether bits along the positions are equal
Expand Down Expand Up @@ -66,7 +66,14 @@ pub(super) fn equal_nulls(

#[inline]
pub(super) fn base_equal(lhs: &ArrayData, rhs: &ArrayData) -> bool {
lhs.data_type() == rhs.data_type() && lhs.len() == rhs.len()
let equal_type = match (lhs.data_type(), rhs.data_type()) {
(DataType::Union(l_fields, l_mode), DataType::Union(r_fields, r_mode)) => {
// Defer field datatype check to `union_equal`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because even the lhs fields and rhs fields are different, the two union arrays are still possible equal. We cannot use fields to decide if two union arrays are equal or not. Fields and type_ids are co-related.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec describes a union as an ordered sequence of types with names. My interpretation of this is that the fields are order-sensitive and must be identical for the types to be the same...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised to compare lhs and rhs fields here.

l_fields.len() == r_fields.len() && l_mode == r_mode
}
(l_data_type, r_data_type) => l_data_type == r_data_type,
};
equal_type && lhs.len() == rhs.len()
}

// whether the two memory regions are equal
Expand Down Expand Up @@ -161,8 +168,35 @@ pub(super) fn child_logical_null_buffer(
});
Some(buffer.into())
}
DataType::Union(_, _) => {
unimplemented!("Logical equality not yet implemented for union arrays")
DataType::Union(_, mode) => {
match mode {
UnionMode::Sparse => {
// See the logic of `DataType::Struct` in `child_logical_null_buffer`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could be broken out into a function? I also wonder if it might be possible to implement more efficiently by truncating the longer buffer 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it as a function now.

let result = &parent_bitmap & &self_null_bitmap;
if let Ok(bitmap) = result {
return Some(bitmap.bits);
}

// slow path
let array_offset = parent_data.offset();
let mut buffer = MutableBuffer::new_null(parent_len);
let null_slice = buffer.as_slice_mut();
(0..parent_len).for_each(|index| {
if parent_bitmap.is_set(index + array_offset)
&& self_null_bitmap.is_set(index + array_offset)
{
bit_util::set_bit(null_slice, index);
}
});
Some(buffer.into())
}
UnionMode::Dense => {
// We don't keep bitmap in child data of Dense UnionArray
unimplemented!(
"Logical equality not yet implemented for dense union arrays"
)
}
}
}
DataType::Dictionary(_, _) => {
unimplemented!("Logical equality not yet implemented for nested dictionaries")
Expand Down