Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat + fix: IPC support for run encoded array. #3662

Merged
merged 9 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
283 changes: 258 additions & 25 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,16 @@ impl<R: RunEndIndexType> RunArray<R> {
})
}

/// Returns index to the physical array for the given index to the logical array.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
fn get_physical_index_from_run_ends_array(
run_ends: &PrimitiveArray<R>,
askoa marked this conversation as resolved.
Show resolved Hide resolved
logical_index: usize,
) -> Option<usize> {
if logical_index >= Self::logical_len(run_ends) {
return None;
}
let mut st: usize = 0;
let mut en: usize = self.run_ends().len();
let mut en: usize = run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
Expand All @@ -164,7 +165,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends().value_unchecked(mid - 1).as_usize()
run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
Expand All @@ -175,6 +176,19 @@ impl<R: RunEndIndexType> RunArray<R> {
Some(st)
}

/// Returns index to the physical array for the given index to the logical array.
askoa marked this conversation as resolved.
Show resolved Hide resolved
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
return None;
}
Self::get_physical_index_from_run_ends_array(
&self.run_ends,
logical_index + self.offset(),
)
}

/// Returns the physical indices of the input logical indices. Returns error if any of the logical
/// index cannot be converted to physical index. The logical indices are sorted and iterated along
/// with run_ends array to find matching physical index. The approach used here was chosen over
Expand All @@ -192,6 +206,10 @@ impl<R: RunEndIndexType> RunArray<R> {
{
let indices_len = logical_indices.len();

if indices_len == 0 {
return Ok(vec![]);
}

// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
Expand All @@ -204,12 +222,36 @@ impl<R: RunEndIndexType> RunArray<R> {
.unwrap()
});

// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index =
askoa marked this conversation as resolved.
Show resolved Hide resolved
logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= self.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
}

// Skip some physical indices based on offset.
let skip_value = if self.offset() > 0 {
Self::get_physical_index_from_run_ends_array(self.run_ends(), self.offset())
askoa marked this conversation as resolved.
Show resolved Hide resolved
.ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Cannot convert offset {} to physical index.",
self.offset()
))
})?
} else {
0
};

let mut physical_indices = vec![0; indices_len];

let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.run_ends.values().iter().enumerate() {
// Get the run end index of current physical index
let run_end_value = run_end.as_usize();
for (physical_index, run_end) in
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - self.offset();

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
Expand All @@ -233,6 +275,63 @@ impl<R: RunEndIndexType> RunArray<R> {
}
Ok(physical_indices)
}

/// Returns a `RunArray` with zero offset and length matching the last value
/// in run_ends array.
pub fn into_non_sliced_array(self) -> Result<Self, ArrowError> {
askoa marked this conversation as resolved.
Show resolved Hide resolved
askoa marked this conversation as resolved.
Show resolved Hide resolved
if self.data.offset() == 0 && self.data.len() == Self::logical_len(&self.run_ends)
{
return Ok(self);
}
// The physical index of original run_ends array from which the `ArrayData`is sliced.
let start_physical_index = Self::get_physical_index_from_run_ends_array(
&self.run_ends,
self.data.offset(),
)
.ok_or_else(|| {
askoa marked this conversation as resolved.
Show resolved Hide resolved
ArrowError::InvalidArgumentError(format!(
"Cannot convert the offset {} to physical index",
self.data.offset()
))
})?;

// The logical length of original run_ends array until which the `ArrayData` is sliced.
let end_logical_index = self.data.offset() + self.data.len() - 1;
// The physical index of original run_ends array until which the `ArrayData`is sliced.
let end_physical_index =
Self::get_physical_index_from_run_ends_array(&self.run_ends, end_logical_index).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Cannot convert the `offset + len - 1` {end_logical_index} to physical index"
))
})?;

let physical_length = end_physical_index - start_physical_index + 1;

// build new run_ends array by subtrating offset from run ends.
let new_run_ends: PrimitiveArray<R> = self
.run_ends
.values()
.iter()
.skip(start_physical_index)
.take(physical_length)
.map(|f| f.as_usize() - self.data.offset())
.map(|f| f.min(self.len()))
askoa marked this conversation as resolved.
Show resolved Hide resolved
.map(R::Native::from_usize)
askoa marked this conversation as resolved.
Show resolved Hide resolved
.collect();
askoa marked this conversation as resolved.
Show resolved Hide resolved

// build new values by slicing physical indices.
let new_values = self
.values
.slice(start_physical_index, physical_length)
.into_data();

let builder = ArrayDataBuilder::new(self.data_type().clone())
.len(self.len())
.add_child_data(new_run_ends.into_data())
.add_child_data(new_values);
let array_data = builder.build()?;
askoa marked this conversation as resolved.
Show resolved Hide resolved
Ok(array_data.into())
}
}

impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
Expand Down Expand Up @@ -552,6 +651,33 @@ mod tests {
result
}

fn compare_logical_and_physical_indices(
askoa marked this conversation as resolved.
Show resolved Hide resolved
askoa marked this conversation as resolved.
Show resolved Hide resolved
logical_indices: &[u32],
logical_array: &[Option<i32>],
physical_indices: &[usize],
physical_array: &PrimitiveArray<Int32Type>,
) {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the input_array matches physical index in typed_run_array
logical_indices
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = logical_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_array.is_valid(*physical_ix));
let actual = physical_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_array.is_null(*physical_ix))
}
};
});
}
#[test]
fn test_run_array() {
// Construct a value array
Expand Down Expand Up @@ -797,6 +923,54 @@ mod tests {
}
}

#[test]
fn test_run_array_unslice() {
let total_len = 80;
let input_array = build_input_array(total_len);

// Encode the input_array to run array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();

// test for all slice lengths.
for slice_len in 1..=total_len {
// test for offset = 0, slice length = slice_len
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();

// Create unsliced run array.
let unsliced_run_array = sliced_run_array.into_non_sliced_array().unwrap();
let typed = unsliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();
let expected: Vec<Option<i32>> =
input_array.iter().take(slice_len).copied().collect();
let actual: Vec<Option<i32>> = typed.into_iter().collect();
assert_eq!(expected, actual);

// test for offset = total_len - slice_len, length = slice_len
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();

// Create unsliced run array.
let unsliced_run_array = sliced_run_array.into_non_sliced_array().unwrap();
let typed = unsliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();
let expected: Vec<Option<i32>> = input_array
.iter()
.skip(total_len - slice_len)
.copied()
.collect();
let actual: Vec<Option<i32>> = typed.into_iter().collect();
assert_eq!(expected, actual);
}
}

#[test]
fn test_get_physical_indices() {
// Test for logical lengths starting from 10 to 250 increasing by 10
Expand Down Expand Up @@ -824,23 +998,82 @@ mod tests {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the input_array matches physical index in typed_run_array
logical_indices
compare_logical_and_physical_indices(
&logical_indices,
&input_array,
&physical_indices,
physical_values_array,
);
}
}

#[test]
fn test_get_physical_indices_sliced() {
let total_len = 80;
let input_array = build_input_array(total_len);

// Encode the input_array to run array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();
let physical_values_array = as_primitive_array::<Int32Type>(run_array.values());

// test for all slice lengths.
for slice_len in 1..=total_len {
// create an array consisting of all the indices repeated twice and shuffled.
let mut logical_indices: Vec<u32> = (0_u32..(slice_len as u32)).collect();
// add same indices once more
logical_indices.append(&mut logical_indices.clone());
let mut rng = thread_rng();
logical_indices.shuffle(&mut rng);

// test for offset = 0 and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array: Vec<Option<i32>> =
input_array.iter().take(slice_len).copied().collect();

// slice the run array
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();

// Get physical indices.
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
&sliced_input_array,
&physical_indices,
physical_values_array,
);

// test for offset = total_len - slice_len and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array: Vec<Option<i32>> = input_array
askoa marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = input_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_values_array.is_valid(*physical_ix));
let actual = physical_values_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_values_array.is_null(*physical_ix))
}
};
});
.skip(total_len - slice_len)
.copied()
.collect();

// slice the run array
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();

// Get physical indices
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
&sliced_input_array,
&physical_indices,
physical_values_array,
);
}
}
}
13 changes: 7 additions & 6 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,9 +1326,9 @@ impl ArrayData {
DataType::RunEndEncoded(run_ends, _values) => {
let run_ends_data = self.child_data()[0].clone();
match run_ends.data_type() {
DataType::Int16 => run_ends_data.check_run_ends::<i16>(self.len()),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(self.len()),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(self.len()),
DataType::Int16 => run_ends_data.check_run_ends::<i16>(),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -1487,7 +1487,7 @@ impl ArrayData {
}

/// Validates that each value in run_ends array is positive and strictly increasing.
fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
fn check_run_ends<T>(&self) -> Result<(), ArrowError>
where
T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
{
Expand All @@ -1514,9 +1514,10 @@ impl ArrayData {
Ok(())
})?;

if prev_value.as_usize() != array_len {
if prev_value.as_usize() < (self.offset + self.len) {
askoa marked this conversation as resolved.
Show resolved Hide resolved
return Err(ArrowError::InvalidArgumentError(format!(
"The length of array does not match the last value in the run_ends array. The last value of run_ends array is {prev_value} and length of array is {array_len}."
"The offset + length of array should be lte last value in the run_ends array. The last value of run_ends array is {prev_value} and offset + length of array is {}.",
askoa marked this conversation as resolved.
Show resolved Hide resolved
self.offset + self.len
)));
}
Ok(())
Expand Down