Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusi
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true }
async-trait = "0.1.41"
bytes = "1.4"
Expand Down
232 changes: 232 additions & 0 deletions datafusion/core/src/physical_plan/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::physical_plan::sorts::sort::SortOptions;
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::ArrowNativeTypeOp;
use arrow::row::{Row, Rows};
use std::cmp::Ordering;

Expand Down Expand Up @@ -93,3 +96,232 @@ impl Cursor for RowCursor {
t
}
}

/// A cursor over sorted, nullable [`ArrowNativeTypeOp`]
///
/// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering
#[derive(Debug)]
pub struct PrimitiveCursor<T: ArrowNativeTypeOp> {
values: ScalarBuffer<T>,
offset: usize,
// If nulls first, the first non-null index
// Otherwise, the first null index
null_threshold: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite pleased with this formulation of nulls, it avoids needing to consult the null mask at all 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason it is called null_threshold rather than null_index given it is a null index 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought null_index might be confused for the index of the first null, which is not always the case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense.

options: SortOptions,
}

impl<T: ArrowNativeTypeOp> PrimitiveCursor<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The astute will notice this is parameterised on the underlying Native type, helping to reduce codegen by not forcing SortPreservingMergeStream to be generic over ArrowPrimitiveType

/// Create a new [`PrimitiveCursor`] from the provided `values` sorted according to `options`
pub fn new(options: SortOptions, values: ScalarBuffer<T>, null_count: usize) -> Self {
assert!(null_count <= values.len());

let null_threshold = match options.nulls_first {
true => null_count,
false => values.len() - null_count,
};

Self {
values,
offset: 0,
null_threshold,
options,
}
}

fn is_null(&self) -> bool {
(self.offset < self.null_threshold) == self.options.nulls_first
}

fn value(&self) -> T {
self.values[self.offset]
}
}

impl<T: ArrowNativeTypeOp> PartialEq for PrimitiveCursor<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}

impl<T: ArrowNativeTypeOp> Eq for PrimitiveCursor<T> {}
impl<T: ArrowNativeTypeOp> PartialOrd for PrimitiveCursor<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<T: ArrowNativeTypeOp> Ord for PrimitiveCursor<T> {
fn cmp(&self, other: &Self) -> Ordering {
match (self.is_null(), other.is_null()) {
(true, true) => Ordering::Equal,
(true, false) => match self.options.nulls_first {
true => Ordering::Less,
false => Ordering::Greater,
},
(false, true) => match self.options.nulls_first {
true => Ordering::Greater,
false => Ordering::Less,
},
(false, false) => {
let s_v = self.value();
let o_v = other.value();

match self.options.descending {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be worth specializing on consts NULLS_FIRST and ASC/DESC as well to avoid what looks like runtime overhead in the hot path.

Maybe it doesn't make any practical difference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely something we could experiment, my expectation is that it won't make a huge difference given how branch heavy merging inherently is

true => o_v.compare(s_v),
false => s_v.compare(o_v),
}
}
}
}
}

impl<T: ArrowNativeTypeOp> Cursor for PrimitiveCursor<T> {
fn is_finished(&self) -> bool {
self.offset == self.values.len()
}

fn advance(&mut self) -> usize {
let t = self.offset;
self.offset += 1;
t
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_primitive_nulls_first() {
let options = SortOptions {
descending: false,
nulls_first: true,
};

let buffer = ScalarBuffer::from(vec![i32::MAX, 1, 2, 3]);
let mut a = PrimitiveCursor::new(options, buffer, 1);
let buffer = ScalarBuffer::from(vec![1, 2, -2, -1, 1, 9]);
let mut b = PrimitiveCursor::new(options, buffer, 2);

// NULL == NULL
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

// NULL == NULL
b.advance();
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

// NULL < -2
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 1 > -2
a.advance();
assert_eq!(a.cmp(&b), Ordering::Greater);

// 1 > -1
b.advance();
assert_eq!(a.cmp(&b), Ordering::Greater);

// 1 == 1
b.advance();
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

// 9 > 1
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 9 > 2
a.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

let options = SortOptions {
descending: false,
nulls_first: false,
};

let buffer = ScalarBuffer::from(vec![0, 1, i32::MIN, i32::MAX]);
let mut a = PrimitiveCursor::new(options, buffer, 2);
let buffer = ScalarBuffer::from(vec![-1, i32::MAX, i32::MIN]);
let mut b = PrimitiveCursor::new(options, buffer, 2);

// 0 > -1
assert_eq!(a.cmp(&b), Ordering::Greater);

// 0 < NULL
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 1 < NULL
a.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// NULL = NULL
a.advance();
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

let options = SortOptions {
descending: true,
nulls_first: false,
};

let buffer = ScalarBuffer::from(vec![6, 1, i32::MIN, i32::MAX]);
let mut a = PrimitiveCursor::new(options, buffer, 3);
let buffer = ScalarBuffer::from(vec![67, -3, i32::MAX, i32::MIN]);
let mut b = PrimitiveCursor::new(options, buffer, 2);

// 6 > 67
assert_eq!(a.cmp(&b), Ordering::Greater);

// 6 < -3
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 6 < NULL
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 6 < NULL
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// NULL == NULL
a.advance();
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

let options = SortOptions {
descending: true,
nulls_first: true,
};

let buffer = ScalarBuffer::from(vec![i32::MIN, i32::MAX, 6, 3]);
let mut a = PrimitiveCursor::new(options, buffer, 2);
let buffer = ScalarBuffer::from(vec![i32::MAX, 4546, -3]);
let mut b = PrimitiveCursor::new(options, buffer, 1);

// NULL == NULL
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

// NULL == NULL
a.advance();
assert_eq!(a.cmp(&b), Ordering::Equal);
assert_eq!(a, b);

// NULL < 4546
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);

// 6 > 4546
a.advance();
assert_eq!(a.cmp(&b), Ordering::Greater);

// 6 < -3
b.advance();
assert_eq!(a.cmp(&b), Ordering::Less);
}
}
28 changes: 26 additions & 2 deletions datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,31 @@ use crate::common::Result;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::sorts::builder::BatchBuilder;
use crate::physical_plan::sorts::cursor::Cursor;
use crate::physical_plan::sorts::stream::{PartitionedStream, RowCursorStream};
use crate::physical_plan::sorts::stream::{
PartitionedStream, PrimitiveCursorStream, RowCursorStream,
};
use crate::physical_plan::{
PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::downcast_primitive;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

macro_rules! primitive_merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident) => {{
let streams = PrimitiveCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
)));
}};
}

/// Perform a streaming merge of [`SendableRecordBatchStream`]
pub(crate) fn streaming_merge(
streams: Vec<SendableRecordBatchStream>,
Expand All @@ -37,8 +52,17 @@ pub(crate) fn streaming_merge(
tracking_metrics: MemTrackingMetrics,
batch_size: usize,
) -> Result<SendableRecordBatchStream> {
let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth a comment here explaining the rationale for this for future readers. Something like

Suggested change
if expressions.len() == 1 {
// special case single column primitives to avoid overhead of runtime comparators
if expressions.len() == 1 {

let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, tracking_metrics, batch_size),
_ => {}
}
}

let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
Expand Down
Loading