Skip to content

Commit

Permalink
Avoid allocating vector of indices in lexicographical_partition_ranges (
Browse files Browse the repository at this point in the history
#998)

* Avoid allocating vector of indices in lexicographical_partition_ranges

* Adjust comments

* Improve comments and remove one unneeded parameter
  • Loading branch information
jhorstmann committed Dec 15, 2021
1 parent f21fa54 commit fc343e7
Showing 1 changed file with 108 additions and 25 deletions.
133 changes: 108 additions & 25 deletions arrow/src/compute/kernels/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct LexicographicalPartitionIterator<'a> {
num_rows: usize,
previous_partition_point: usize,
partition_point: usize,
value_indices: Vec<usize>,
}

impl<'a> LexicographicalPartitionIterator<'a> {
Expand All @@ -62,39 +61,78 @@ impl<'a> LexicographicalPartitionIterator<'a> {
};

let comparator = LexicographicalComparator::try_new(columns)?;
let value_indices = (0..num_rows).collect::<Vec<usize>>();
Ok(LexicographicalPartitionIterator {
comparator,
num_rows,
previous_partition_point: 0,
partition_point: 0,
value_indices,
})
}
}

/// Exponential search is to remedy for the case when array size and cardinality are both large
/// Returns the next partition point of the range `start..end` according to the given comparator.
/// The return value is the index of the first element of the second partition,
/// and is guaranteed to be between `start..=end` (inclusive).
///
/// The values corresponding to those indices are assumed to be partitioned according to the given comparator.
///
/// Exponential search is to remedy for the case when array size and cardinality are both large.
/// In these cases the partition point would be near the beginning of the range and
/// plain binary search would be doing some unnecessary iterations on each call.
///
/// see <https://en.wikipedia.org/wiki/Exponential_search>
#[inline]
fn exponential_search(
indices: &[usize],
target: &usize,
fn exponential_search_next_partition_point(
start: usize,
end: usize,
comparator: &LexicographicalComparator<'_>,
) -> usize {
let target = start;
let mut bound = 1;
while bound < indices.len()
&& comparator.compare(&indices[bound], target) != Ordering::Greater
while bound + start < end
&& comparator.compare(&(bound + start), &target) != Ordering::Greater
{
bound *= 2;
}

// invariant after while loop:
// indices[bound / 2] <= target < indices[min(indices.len(), bound + 1)]
// (start + bound / 2) <= target < min(end, start + bound + 1)
// where <= and < are defined by the comparator;
// note here we have right = min(indices.len(), bound + 1) because indices[bound] might
// note here we have right = min(end, start + bound + 1) because (start + bound) might
// actually be considered and must be included.
(bound / 2)
+ indices[(bound / 2)..indices.len().min(bound + 1)]
.partition_point(|idx| comparator.compare(idx, target) != Ordering::Greater)
partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
comparator.compare(&idx, &target) != Ordering::Greater
})
}

/// Returns the partition point of the range `start..end` according to the given predicate.
/// The return value is the index of the first element of the second partition,
/// and is guaranteed to be between `start..=end` (inclusive).
///
/// The algorithm is similar to a binary search.
///
/// The values corresponding to those indices are assumed to be partitioned according to the given predicate.
///
/// See [`std::slice::partition_point`]
#[inline]
fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> usize {
let mut left = start;
let mut right = end;
let mut size = right - left;
while left < right {
let mid = left + size / 2;

let less = pred(mid);

if less {
left = mid + 1;
} else {
right = mid;
}

size = right - left;
}
left
}

impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
Expand All @@ -103,17 +141,12 @@ impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
fn next(&mut self) -> Option<Self::Item> {
if self.partition_point < self.num_rows {
// invariant:
// value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
// so in order to save time we can do binary search on the value_indices[previous_partition_point..]
// and find when any value is greater than value_indices[previous_partition_point]; because we are using
// new indices, the new offset is _added_ to the previous_partition_point.
//
// be careful that idx is of type &usize which points to the actual value within value_indices, which itself
// contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
// original columnar data.
self.partition_point += exponential_search(
&self.value_indices[self.partition_point..],
&self.partition_point,
// in the range [0..previous_partition_point] all values are <= the value at [previous_partition_point]
// so in order to save time we can do binary search on the range [previous_partition_point..num_rows]
// and find the index where any value is greater than the value at [previous_partition_point]
self.partition_point = exponential_search_next_partition_point(
self.partition_point,
self.num_rows,
&self.comparator,
);
let start = self.previous_partition_point;
Expand All @@ -134,6 +167,56 @@ mod tests {
use crate::datatypes::DataType;
use std::sync::Arc;

#[test]
fn test_partition_point() {
let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
{
let median = input[input.len() / 2];
assert_eq!(
9,
partition_point(
0,
input.len(),
&(|i: usize| input[i].cmp(&median) != Ordering::Greater)
)
);
}
{
let search = input[9];
assert_eq!(
12,
partition_point(
9,
input.len(),
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
{
let search = input[0];
assert_eq!(
3,
partition_point(
0,
9,
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
{
let search = input[5];
assert_eq!(
8,
partition_point(
5,
9,
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
}

#[test]
fn test_lexicographical_partition_ranges_empty() {
let input = vec![];
Expand Down

0 comments on commit fc343e7

Please sign in to comment.