Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 8, 2021
1 parent 02f89a4 commit c4166d2
Showing 1 changed file with 206 additions and 2 deletions.
208 changes: 206 additions & 2 deletions arrow/src/compute/kernels/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,36 @@ use crate::compute::kernels::sort::LexicographicalComparator;
use crate::compute::SortColumn;
use crate::error::{ArrowError, Result};
use std::cmp::Ordering;
use std::ops::Range;

/// Given a list of sort columns, find partition ranges that would partition lexicographically
/// equal values across columns.
///
/// Here LexicographicalComparator is used in conjunction with binary
/// search so the columns *MUST* be pre-sorted already.
///
/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive
/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last
/// range.
pub fn lexicographical_partition_ranges(
columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
let partition_points = lexicographical_partition_points(columns)?;
Ok(partition_points
.iter()
.zip(partition_points[1..].iter())
.map(|(&start, &end)| Range { start, end })
.collect())
}

/// Given a list of sort columns, find partition points that would partition lexicographically
/// equal values across columns; Here LexicographicalComparator is used in conjunction with binary
/// equal values across columns.
///
/// Here LexicographicalComparator is used in conjunction with binary
/// search so the columns *MUST* be pre-sorted already.
///
/// The returned vec would be of size k+1 where k is cardinality of the sorted values; the first and
/// last value would be 0 and n.
pub fn lexicographical_partition_points(columns: &[SortColumn]) -> Result<Vec<usize>> {
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
Expand All @@ -47,8 +73,18 @@ pub fn lexicographical_partition_points(columns: &[SortColumn]) -> Result<Vec<us
let value_indices = (0..row_count).collect::<Vec<usize>>();

let mut previous_partition_point = 0;
result.push(previous_partition_point);
while previous_partition_point < row_count {
previous_partition_point = value_indices[previous_partition_point..]
// invariant:
// value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
// so in order to save time we can do binary search on the value_indices[previous_partition_point..]
// and find when any value is greater than value_indices[previous_partition_point]; because we are using
// new indices, the new offset is _added_ to the previous_partition_point.
//
// be careful that idx is of type &usize which points to the actual value within value_indices, which itself
// contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
// original columnar data.
previous_partition_point += value_indices[previous_partition_point..]
.partition_point(|idx| {
lexicographical_comparator.compare(idx, &previous_partition_point)
!= Ordering::Greater
Expand All @@ -58,3 +94,171 @@ pub fn lexicographical_partition_points(columns: &[SortColumn]) -> Result<Vec<us

Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::array::*;
use crate::compute::SortOptions;
use std::sync::Arc;

#[test]
fn test_lexicographical_partition_points_empty() {
let input = vec![];
assert!(
lexicographical_partition_points(&input).is_err(),
"lexicographical_partition_points should reject columns with empty rows"
);
}

#[test]
fn test_lexicographical_partition_points_unaligned_rows() {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
options: None,
},
SortColumn {
values: Arc::new(StringArray::from(vec![Some("foo")])) as ArrayRef,
options: None,
},
];
assert!(
lexicographical_partition_points(&input).is_err(),
"lexicographical_partition_points should reject columns with different row counts"
);
}

#[test]
fn test_lexicographical_partition_single_column() -> Result<()> {
let input = vec![SortColumn {
values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
}];
{
let results = lexicographical_partition_points(&input)?;
assert_eq!(vec![0, 1, 8, 9], results);
}
{
let results = lexicographical_partition_ranges(&input)?;
assert_eq!(
vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
results
);
}
Ok(())
}

#[test]
fn test_lexicographical_partition_unique_column_1() -> Result<()> {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]))
as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
{
let results = lexicographical_partition_points(&input)?;
assert_eq!(vec![0, 1, 2], results);
}
{
let results = lexicographical_partition_ranges(&input)?;
assert_eq!(vec![(0_usize..1_usize), (1_usize..2_usize)], results);
}
Ok(())
}

#[test]
fn test_lexicographical_partition_unique_column_2() -> Result<()> {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)]))
as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("baz"),
])) as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
{
let results = lexicographical_partition_points(&input)?;
assert_eq!(vec![0, 1, 2, 3], results);
}
{
let results = lexicographical_partition_ranges(&input)?;
assert_eq!(
vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
results
);
}
Ok(())
}

#[test]
fn test_lexicographical_partition_non_unique_column_1() -> Result<()> {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![
None,
Some(-1),
Some(-1),
Some(1),
])) as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("bar"),
Some("bar"),
])) as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
{
let results = lexicographical_partition_points(&input)?;
assert_eq!(vec![0, 1, 3, 4], results);
}
{
let results = lexicographical_partition_ranges(&input)?;
assert_eq!(
vec![(0_usize..1_usize), (1_usize..3_usize), (3_usize..4_usize),],
results
);
}
Ok(())
}
}

0 comments on commit c4166d2

Please sign in to comment.