Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat + fix: IPC support for run encoded array. #3662

Merged
merged 9 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 143 additions & 27 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ pub struct RunArray<R: RunEndIndexType> {
}

impl<R: RunEndIndexType> RunArray<R> {
// calculates the logical length of the array encoded
// by the given run_ends array.
fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
/// Calculates the logical length of the array encoded
/// by the given run_ends array.
pub fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
let len = run_ends.len();
if len == 0 {
return 0;
Expand Down Expand Up @@ -145,14 +145,15 @@ impl<R: RunEndIndexType> RunArray<R> {
}

/// Returns index to the physical array for the given index to the logical array.
/// The function does not adjust the input logical index based on `ArrayData::offset`.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= Self::logical_len(&self.run_ends) {
return None;
}
let mut st: usize = 0;
let mut en: usize = self.run_ends().len();
let mut en: usize = self.run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
Expand All @@ -164,7 +165,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends().value_unchecked(mid - 1).as_usize()
self.run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
Expand All @@ -175,6 +176,17 @@ impl<R: RunEndIndexType> RunArray<R> {
Some(st)
}

/// Returns index to the physical array for the given index to the logical array.
askoa marked this conversation as resolved.
Show resolved Hide resolved
/// This function adjusts the input logical index based on `ArrayData::offset`
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
return None;
}
self.get_zero_offset_physical_index(logical_index + self.offset())
}

/// Returns the physical indices of the input logical indices. Returns error if any of the logical
/// index cannot be converted to physical index. The logical indices are sorted and iterated along
/// with run_ends array to find matching physical index. The approach used here was chosen over
Expand All @@ -192,6 +204,10 @@ impl<R: RunEndIndexType> RunArray<R> {
{
let indices_len = logical_indices.len();

if indices_len == 0 {
return Ok(vec![]);
}

// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
Expand All @@ -204,12 +220,30 @@ impl<R: RunEndIndexType> RunArray<R> {
.unwrap()
});

// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index =
askoa marked this conversation as resolved.
Show resolved Hide resolved
logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= self.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
}

// Skip some physical indices based on offset.
let skip_value = if self.offset() > 0 {
self.get_zero_offset_physical_index(self.offset()).unwrap()
} else {
0
};
Comment on lines +233 to +237
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let skip_value = if self.offset() > 0 {
self.get_zero_offset_physical_index(self.offset()).unwrap()
} else {
0
};
let skip_value = self.get_physical_index(0).unwrap_or_default();

Perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We should get a valid response back. Panic or error is the better option. I used to have error, but based on your earlier feedback changed it to panic.


let mut physical_indices = vec![0; indices_len];

let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.run_ends.values().iter().enumerate() {
// Get the run end index of current physical index
let run_end_value = run_end.as_usize();
for (physical_index, run_end) in
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - self.offset();

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
Expand Down Expand Up @@ -552,6 +586,34 @@ mod tests {
result
}

// Asserts that `logical_array[logical_indices[*]] == physical_array[physical_indices[*]]`
fn compare_logical_and_physical_indices(
askoa marked this conversation as resolved.
Show resolved Hide resolved
askoa marked this conversation as resolved.
Show resolved Hide resolved
logical_indices: &[u32],
logical_array: &[Option<i32>],
physical_indices: &[usize],
physical_array: &PrimitiveArray<Int32Type>,
) {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the logical_array matches physical index in physical_array
logical_indices
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = logical_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_array.is_valid(*physical_ix));
let actual = physical_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_array.is_null(*physical_ix))
}
};
});
}
#[test]
fn test_run_array() {
// Construct a value array
Expand Down Expand Up @@ -824,23 +886,77 @@ mod tests {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the input_array matches physical index in typed_run_array
logical_indices
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = input_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_values_array.is_valid(*physical_ix));
let actual = physical_values_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_values_array.is_null(*physical_ix))
}
};
});
compare_logical_and_physical_indices(
&logical_indices,
&input_array,
&physical_indices,
physical_values_array,
);
}
}

#[test]
fn test_get_physical_indices_sliced() {
let total_len = 80;
let input_array = build_input_array(total_len);

// Encode the input_array to run array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();
let physical_values_array = as_primitive_array::<Int32Type>(run_array.values());

// test for all slice lengths.
for slice_len in 1..=total_len {
// create an array consisting of all the indices repeated twice and shuffled.
let mut logical_indices: Vec<u32> = (0_u32..(slice_len as u32)).collect();
// add same indices once more
logical_indices.append(&mut logical_indices.clone());
let mut rng = thread_rng();
logical_indices.shuffle(&mut rng);

// test for offset = 0 and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array = &input_array[0..slice_len];

// slice the run array
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();

// Get physical indices.
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
sliced_input_array,
&physical_indices,
physical_values_array,
);

// test for offset = total_len - slice_len and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array = &input_array[total_len - slice_len..total_len];

// slice the run array
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();

// Get physical indices
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
sliced_input_array,
&physical_indices,
physical_values_array,
);
}
}
}
13 changes: 7 additions & 6 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,9 +1326,9 @@ impl ArrayData {
DataType::RunEndEncoded(run_ends, _values) => {
let run_ends_data = self.child_data()[0].clone();
match run_ends.data_type() {
DataType::Int16 => run_ends_data.check_run_ends::<i16>(self.len()),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(self.len()),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(self.len()),
DataType::Int16 => run_ends_data.check_run_ends::<i16>(),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ impl ArrayData {
}

/// Validates that each value in run_ends array is positive and strictly increasing.
fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
fn check_run_ends<T>(&self) -> Result<(), ArrowError>
where
T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
{
Expand All @@ -1514,9 +1514,10 @@ impl ArrayData {
Ok(())
})?;

if prev_value.as_usize() != array_len {
if prev_value.as_usize() < (self.offset + self.len) {
askoa marked this conversation as resolved.
Show resolved Hide resolved
return Err(ArrowError::InvalidArgumentError(format!(
"The length of array does not match the last value in the run_ends array. The last value of run_ends array is {prev_value} and length of array is {array_len}."
"The offset + length of array should be less or equal to last value in the run_ends array. The last value of run_ends array is {prev_value} and offset + length of array is {}.",
self.offset + self.len
)));
}
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion arrow-data/src/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod fixed_list;
mod list;
mod null;
mod primitive;
mod run;
mod structure;
mod union;
mod utils;
Expand All @@ -50,6 +51,8 @@ use structure::struct_equal;
use union::union_equal;
use variable_size::variable_sized_equal;

use self::run::run_equal;

/// Compares the values of two [ArrayData] starting at `lhs_start` and `rhs_start` respectively
/// for `len` slots.
#[inline]
Expand Down Expand Up @@ -137,7 +140,7 @@ fn equal_values(
},
DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start, rhs_start, len),
DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, len),
DataType::RunEndEncoded(_, _) => todo!(),
DataType::RunEndEncoded(_, _) => run_equal(lhs, rhs, lhs_start, rhs_start, len),
}
}

Expand Down
84 changes: 84 additions & 0 deletions arrow-data/src/equal/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::data::ArrayData;

use super::equal_range;

/// The current implementation of comparison of run array support physical comparison.
/// Comparing run encoded array based on logical indices (`lhs_start`, `rhs_start`) will
/// be time consuming as converting from logical index to physical index cannot be done
askoa marked this conversation as resolved.
Show resolved Hide resolved
/// in constant time. The current comparison compares the underlying physical arrays.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
if lhs_start != 0
|| rhs_start != 0
|| (lhs.len() != len && rhs.len() != len)
askoa marked this conversation as resolved.
Show resolved Hide resolved
|| lhs.offset() > 0
|| rhs.offset() > 0
{
unimplemented!("Logical comparison for run array not supported.")
}

if lhs.len() != rhs.len() {
return false;
}

let lhs_run_ends_array = lhs.child_data().get(0).unwrap();
let lhs_values_array = lhs.child_data().get(1).unwrap();

let rhs_run_ends_array = rhs.child_data().get(0).unwrap();
let rhs_values_array = rhs.child_data().get(1).unwrap();

if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
return false;
}

if lhs_values_array.len() != rhs_values_array.len() {
return false;
}

// check run ends array are equal. The length of the physical array
// is used to validate the child arrays.
let run_ends_equal = equal_range(
lhs_run_ends_array,
rhs_run_ends_array,
lhs_start,
rhs_start,
lhs_run_ends_array.len(),
);

// if run ends array are not the same return early without validating
// values array.
if !run_ends_equal {
return false;
}

// check values array are equal
equal_range(
lhs_values_array,
rhs_values_array,
lhs_start,
rhs_start,
rhs_values_array.len(),
)
}
Loading