Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/aggregates/topk/hash_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ has_integer!(u8, u16, u32, u64);
has_integer!(IntervalDayTime, IntervalMonthDayNano);
hash_float!(f16, f32, f64);

pub fn new_hash_table(limit: usize, kt: DataType) -> Result<Box<dyn ArrowHashTable>> {
pub fn new_hash_table(
limit: usize,
kt: DataType,
) -> Result<Box<dyn ArrowHashTable + Send>> {
macro_rules! downcast_helper {
($kt:ty, $d:ident) => {
return Ok(Box::new(PrimitiveHashTable::<$kt>::new(limit)))
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-plan/src/aggregates/topk/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,11 @@ compare_integer!(u8, u16, u32, u64);
compare_integer!(IntervalDayTime, IntervalMonthDayNano);
compare_float!(f16, f32, f64);

pub fn new_heap(limit: usize, desc: bool, vt: DataType) -> Result<Box<dyn ArrowHeap>> {
pub fn new_heap(
limit: usize,
desc: bool,
vt: DataType,
) -> Result<Box<dyn ArrowHeap + Send>> {
macro_rules! downcast_helper {
($vt:ty, $d:ident) => {
return Ok(Box::new(PrimitiveHeap::<$vt>::new(limit, desc, vt)))
Expand Down
9 changes: 2 additions & 7 deletions datafusion/physical-plan/src/aggregates/topk/priority_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,12 @@ use datafusion_common::Result;

/// A `Map<K, V>` / `PriorityQueue` combo that evicts the worst values after reaching `capacity`
pub struct PriorityMap {
map: Box<dyn ArrowHashTable>,
heap: Box<dyn ArrowHeap>,
map: Box<dyn ArrowHashTable + Send>,
heap: Box<dyn ArrowHeap + Send>,
capacity: usize,
mapper: Vec<(usize, usize)>,
}

// JUSTIFICATION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avantgardnerio -- can you please remind me how we tested this / how we can double check that this doesn't cause a performance regression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't verify benchmarks, but per static code anaysis, PriorityMap is required to be Send. if you just remove trait implementation (marker), code won't compile.
With this PR, PriorityMap is still Send. The only difference is that this is now inferred by the compiler, so safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I ran the benchmark, and it was slower than without the optimization. So I made this change and it got however much faster is listed in the comment.

I sounds from @findepi though like the question is now moot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing unsafe impl Send for PriorityMap {} line alone gives compilation error, because rustc does not infer PriorityMap to be Send

 main *$ cargo build
   Compiling datafusion-physical-plan v41.0.0 (/Users/findepi/repos/datafusion/datafusion/physical-plan)
error[E0277]: `(dyn ArrowHashTable + 'static)` cannot be sent between threads safely
   --> datafusion/physical-plan/src/aggregates/mod.rs:249:9
    |
249 | /         match stream {
250 | |             StreamType::AggregateStream(stream) => Box::pin(stream),
251 | |             StreamType::GroupedHash(stream) => Box::pin(stream),
252 | |             StreamType::GroupedPriorityQueue(stream) => Box::pin(stream),
253 | |         }
    | |_________^ `(dyn ArrowHashTable + 'static)` cannot be sent between threads safely
    |
    = help: the trait `std::marker::Send` is not implemented for `(dyn ArrowHashTable + 'static)`, which is required by `GroupedTopKAggregateStream: std::marker::Send`
    = note: required for `std::ptr::Unique<(dyn ArrowHashTable + 'static)>` to implement `std::marker::Send`
note: required because it appears within the type `Box<(dyn ArrowHashTable + 'static)>`
   --> /Users/findepi/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
    |
237 | pub struct Box<
    |            ^^^
note: required because it appears within the type `PriorityMap`
   --> datafusion/physical-plan/src/aggregates/topk/priority_map.rs:27:12
    |
27  | pub struct PriorityMap {
    |            ^^^^^^^^^^^
note: required because it appears within the type `GroupedTopKAggregateStream`
   --> datafusion/physical-plan/src/aggregates/topk_stream.rs:39:12
    |
39  | pub struct GroupedTopKAggregateStream {
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: required for the cast from `Pin<Box<GroupedTopKAggregateStream>>` to `Pin<Box<dyn RecordBatchStream + std::marker::Send>>`

error[E0277]: `(dyn ArrowHeap + 'static)` cannot be sent between threads safely
   --> datafusion/physical-plan/src/aggregates/mod.rs:249:9
    |
249 | /         match stream {
250 | |             StreamType::AggregateStream(stream) => Box::pin(stream),
251 | |             StreamType::GroupedHash(stream) => Box::pin(stream),
252 | |             StreamType::GroupedPriorityQueue(stream) => Box::pin(stream),
253 | |         }
    | |_________^ `(dyn ArrowHeap + 'static)` cannot be sent between threads safely
    |
    = help: the trait `std::marker::Send` is not implemented for `(dyn ArrowHeap + 'static)`, which is required by `GroupedTopKAggregateStream: std::marker::Send`
    = note: required for `std::ptr::Unique<(dyn ArrowHeap + 'static)>` to implement `std::marker::Send`
note: required because it appears within the type `Box<(dyn ArrowHeap + 'static)>`
   --> /Users/findepi/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
    |
237 | pub struct Box<
    |            ^^^
note: required because it appears within the type `PriorityMap`
   --> datafusion/physical-plan/src/aggregates/topk/priority_map.rs:27:12
    |
27  | pub struct PriorityMap {
    |            ^^^^^^^^^^^
note: required because it appears within the type `GroupedTopKAggregateStream`
   --> datafusion/physical-plan/src/aggregates/topk_stream.rs:39:12
    |
39  | pub struct GroupedTopKAggregateStream {
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: required for the cast from `Pin<Box<GroupedTopKAggregateStream>>` to `Pin<Box<dyn RecordBatchStream + std::marker::Send>>`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `datafusion-physical-plan` (lib) due to 2 previous errors

however, removing unsafe impl Send for PriorityMap {} plus other changes in this PR keeps PriorityMap as Send, so the code works exactly as it does on current main

// Benefit: ~15% speedup + required to index into RawTable from binary heap
// Soundness: it is only accessed by one thread at a time, and indexes are kept up to date
unsafe impl Send for PriorityMap {}

impl PriorityMap {
pub fn new(
key_type: DataType,
Expand Down