Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
paste = "^1.0"
petgraph = "0.8.3"
tokio = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand All @@ -79,3 +80,6 @@ name = "is_null"
[[bench]]
harness = false
name = "binary_op"

[package.metadata.cargo-machete]
ignored = ["half"]
102 changes: 100 additions & 2 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use parking_lot::RwLock;
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
use tokio::sync::watch;

use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -27,6 +28,24 @@ use datafusion_common::{
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterState {
/// Filter is in progress and may receive more updates.
InProgress { generation: u64 },
/// Filter is complete and will not receive further updates.
Complete { generation: u64 },
}

impl FilterState {
fn generation(&self) -> u64 {
match self {
FilterState::InProgress { generation }
| FilterState::Complete { generation } => *generation,
}
}
}

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
Expand All @@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
/// The source of dynamic filters.
inner: Arc<RwLock<Inner>>,
/// Broadcasts filter state (updates and completion) to all waiters.
state_watch: watch::Sender<FilterState>,
/// For testing purposes track the data type and nullability to make sure they don't change.
/// If they do, there's a bug in the implementation.
/// But this can have overhead in production, so it's only included in our tests.
Expand All @@ -57,6 +78,10 @@ struct Inner {
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
generation: u64,
expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
}

impl Inner {
Expand All @@ -66,6 +91,7 @@ impl Inner {
// This is not currently used anywhere but it seems useful to have this simple distinction.
generation: 1,
expr,
is_complete: false,
}
}

Expand Down Expand Up @@ -134,10 +160,12 @@ impl DynamicFilterPhysicalExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
inner: Arc<dyn PhysicalExpr>,
) -> Self {
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
Self {
children,
remapped_children: None, // Initially no remapped children
inner: Arc::new(RwLock::new(Inner::new(inner))),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
Expand Down Expand Up @@ -185,7 +213,7 @@ impl DynamicFilterPhysicalExpr {
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
}

/// Update the current expression.
/// Update the current expression and notify all waiters.
/// Any children of this expression must be a subset of the original children
/// passed to the constructor.
/// This should be called e.g.:
Expand All @@ -204,13 +232,68 @@ impl DynamicFilterPhysicalExpr {

// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
let new_generation = current.generation + 1;
*current = Inner {
generation: current.generation + 1,
generation: new_generation,
expr: new_expr,
is_complete: current.is_complete,
};
drop(current); // Release the lock before broadcasting

// Broadcast the new state to all waiters
let _ = self.state_watch.send(FilterState::InProgress {
generation: new_generation,
});
Ok(())
}

/// Mark this dynamic filter as complete and broadcast to all waiters.
///
/// This signals that all expected updates have been received.
/// Waiters using [`Self::wait_complete`] will be notified.
pub fn mark_complete(&self) {
let mut current = self.inner.write();
let current_generation = current.generation;
current.is_complete = true;
drop(current);

// Broadcast completion to all waiters
let _ = self.state_watch.send(FilterState::Complete {
generation: current_generation,
});
}

/// Wait asynchronously for any update to this filter.
///
/// This method will return when [`Self::update`] is called and the generation increases.
/// It does not guarantee that the filter is complete.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
let current_gen = rx.borrow_and_update().generation();

// Wait until generation increases
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
}

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}

let mut rx = self.state_watch.subscribe();
let _ = rx
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
.await;
}

fn render(
&self,
f: &mut std::fmt::Formatter<'_>,
Expand Down Expand Up @@ -253,6 +336,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
state_watch: self.state_watch.clone(),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
}))
Expand Down Expand Up @@ -509,4 +593,18 @@ mod test {
"Expected err when evaluate is called after changing the expression."
);
}

#[tokio::test]
async fn test_wait_complete_already_complete() {
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(42) as Arc<dyn PhysicalExpr>,
));

// Mark as complete immediately
dynamic_filter.mark_complete();

// wait_complete should return immediately
dynamic_filter.wait_complete().await;
}
}
99 changes: 99 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4486,4 +4486,103 @@ mod tests {
fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}

/// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
#[tokio::test]
async fn test_hash_join_marks_filter_complete() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 6]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);

let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// After the join completes, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;

Ok(())
}

/// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
#[tokio::test]
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
// Empty left side (build side)
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let right = build_table(
("a2", &vec![10, 20, 30]),
("b1", &vec![4, 5, 6]),
("c2", &vec![70, 80, 90]),
);

let on = vec![(
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// Even with empty build side, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl SharedBuildAccumulator {
self.dynamic_filter.update(case_expr)?;
}
}
self.dynamic_filter.mark_complete();
}

Ok(())
Expand Down
53 changes: 52 additions & 1 deletion datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,13 @@ impl TopK {
common_sort_prefix_converter: _,
common_sort_prefix: _,
finished: _,
filter: _,
filter,
} = self;
let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop

// Mark the dynamic filter as complete now that TopK processing is finished.
filter.read().expr().mark_complete();

// break into record batches as needed
let mut batches = vec![];
if let Some(mut batch) = heap.emit()? {
Expand Down Expand Up @@ -1198,4 +1201,52 @@ mod tests {

Ok(())
}

/// This test verifies that the dynamic filter is marked as complete after TopK processing finishes.
#[tokio::test]
async fn test_topk_marks_filter_complete() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let sort_expr = PhysicalSortExpr {
expr: col("a", schema.as_ref())?,
options: SortOptions::default(),
};

let full_expr = LexOrdering::from([sort_expr.clone()]);
let prefix = vec![sort_expr];

// Create a dummy runtime environment and metrics
let runtime = Arc::new(RuntimeEnv::default());
let metrics = ExecutionPlanMetricsSet::new();

// Create a dynamic filter that we'll check for completion
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true)));
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create a TopK instance
let mut topk = TopK::try_new(
0,
Arc::clone(&schema),
prefix,
full_expr,
2,
10,
runtime,
&metrics,
Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))),
)?;

let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
topk.insert_batch(batch)?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also do an assertion for 'in progress' here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the issue is that its hard to set up in the tests a wait_update before the update happens without doing something like tokio::spawn (which I think is not allowed to use in tests) or setting a timeout which would introduce some indeterminism to the test

// Call emit to finish TopK processing
let _results: Vec<_> = topk.emit()?.try_collect().await?;

// After emit is called, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;

Ok(())
}
}