Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7174d72
feat: Add null-aware anti join support (Phase 1)
viirya Jan 4, 2026
c369f7e
feat: Add automatic null-aware anti join for NOT IN subqueries
viirya Jan 4, 2026
f4475e5
fix: Only use null-aware anti join for NOT IN, not NOT EXISTS
viirya Jan 4, 2026
62af907
fix: Add missing null_aware parameter to Join test functions
viirya Jan 4, 2026
af62405
fix format
viirya Jan 4, 2026
c911d67
fix: Add missing null_aware parameter to core test files
viirya Jan 4, 2026
2ea6df6
fix: Add missing null_aware parameter to all test HashJoinExec calls
viirya Jan 4, 2026
24a916a
fix: Add missing null_aware parameter to remaining test files
viirya Jan 4, 2026
70882c4
fix: Update joins.slt test expectation for correct null-aware NOT IN …
viirya Jan 4, 2026
2c9d780
fix: Address clippy warnings in Join::try_new
viirya Jan 4, 2026
be8932e
fix: Return NULL rows for NOT IN with empty subquery
viirya Jan 6, 2026
f77cb38
docs: Document known limitation with NOT IN and OR conditions
viirya Jan 7, 2026
2cb3778
fix format
viirya Jan 7, 2026
a14c641
fix: Only set probe_side_non_empty when batch has rows
viirya Jan 7, 2026
b871119
fix: Force CollectLeft partition mode for null-aware anti joins
viirya Jan 7, 2026
626909d
fix: Force CollectLeft in physical planner for null-aware joins
viirya Jan 7, 2026
17c6dca
Fix null-aware anti join with shared atomic state for partitioned exe…
viirya Jan 7, 2026
ac06033
fix format
viirya Jan 7, 2026
c27ad59
Add test case from GitHub issue #10583 for null-aware anti join
viirya Jan 7, 2026
f0d3e98
Document correlated NOT IN limitation from issue #10583
viirya Jan 7, 2026
47491d7
Add null_aware field to protobuf serialization and preserve in Join r…
viirya Jan 7, 2026
7b5dced
update tpch q16 query plan
viirya Jan 7, 2026
07de9a8
Optimize null-aware anti join to skip CollectLeft when keys are non-n…
viirya Jan 9, 2026
4179501
Fix null-aware anti join tests after rebase
viirya Jan 9, 2026
2549fbf
fix format
viirya Jan 9, 2026
18be10a
Fix clippy warnings: collapse nested if statements
viirya Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ impl DefaultPhysicalPlanner {
filter,
join_type,
null_equality,
null_aware,
schema: join_schema,
..
}) => {
Expand Down Expand Up @@ -1487,6 +1488,8 @@ impl DefaultPhysicalPlanner {
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join
&& !*null_aware
// Null-aware joins must use CollectLeft
{
Arc::new(HashJoinExec::try_new(
physical_left,
Expand All @@ -1497,6 +1500,7 @@ impl DefaultPhysicalPlanner {
None,
PartitionMode::Auto,
*null_equality,
*null_aware,
)?)
} else {
Arc::new(HashJoinExec::try_new(
Expand All @@ -1508,6 +1512,7 @@ impl DefaultPhysicalPlanner {
None,
PartitionMode::CollectLeft,
*null_equality,
*null_aware,
)?)
};

Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ async fn join_yields(
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
false,
)?);

query_yields(join, session_ctx.task_ctx()).await
Expand Down Expand Up @@ -655,6 +656,7 @@ async fn join_agg_yields(
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
false,
)?);

// Project only one column (“value” from the left side) because we just want to sum that
Expand Down Expand Up @@ -720,6 +722,7 @@ async fn hash_join_yields(
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
false,
)?);

query_yields(join, session_ctx.task_ctx()).await
Expand Down Expand Up @@ -751,9 +754,10 @@ async fn hash_join_without_repartition_and_no_agg(
/* filter */ None,
&JoinType::Inner,
/* output64 */ None,
// Using CollectLeft is fine—just avoid RepartitionExecs partitioned channels.
// Using CollectLeft is fine—just avoid RepartitionExec's partitioned channels.
PartitionMode::CollectLeft,
NullEquality::NullEqualsNull,
false,
)?);

query_yields(join, session_ctx.task_ctx()).await
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ impl JoinFuzzTestCase {
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
)
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -354,6 +355,7 @@ async fn test_static_filter_pushdown_through_hash_join() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -418,6 +420,7 @@ async fn test_static_filter_pushdown_through_hash_join() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -981,6 +984,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -1170,6 +1174,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -1363,6 +1368,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -1531,6 +1537,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand All @@ -1550,6 +1557,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -1665,6 +1673,7 @@ async fn test_hashjoin_parent_filter_pushdown() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -2771,6 +2780,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -2899,6 +2909,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -3049,6 +3060,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -3199,6 +3211,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -3333,6 +3346,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -3441,6 +3455,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ async fn test_join_with_swap() {
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -284,6 +285,7 @@ async fn test_left_join_no_swap() {
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -333,6 +335,7 @@ async fn test_join_with_swap_semi() {
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)
.unwrap();

Expand Down Expand Up @@ -388,6 +391,7 @@ async fn test_join_with_swap_mark() {
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)
.unwrap();

Expand Down Expand Up @@ -461,6 +465,7 @@ async fn test_nested_join_swap() {
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap();
let child_schema = child_join.schema();
Expand All @@ -478,6 +483,7 @@ async fn test_nested_join_swap() {
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap();

Expand Down Expand Up @@ -518,6 +524,7 @@ async fn test_join_no_swap() {
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -745,6 +752,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
Some(projection),
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?);

let swapped = join
Expand Down Expand Up @@ -906,6 +914,7 @@ fn check_join_partition_mode(
None,
PartitionMode::Auto,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
Expand Down Expand Up @@ -1554,6 +1563,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
None,
t.initial_mode,
NullEquality::NullEqualsNothing,
false,
)?) as _;

let optimized_join_plan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,7 @@ fn test_hash_join_after_projection() -> Result<()> {
None,
PartitionMode::Auto,
NullEquality::NullEqualsNothing,
false,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,7 @@ fn hash_join_exec(
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub fn hash_join_exec(
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?))
}

Expand Down
23 changes: 23 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,25 @@ impl LogicalPlanBuilder {
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
filter: Option<Expr>,
null_equality: NullEquality,
) -> Result<Self> {
self.join_detailed_with_options(
right,
join_type,
join_keys,
filter,
null_equality,
false,
)
}

pub fn join_detailed_with_options(
self,
right: LogicalPlan,
join_type: JoinType,
join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
filter: Option<Expr>,
null_equality: NullEquality,
null_aware: bool,
) -> Result<Self> {
if join_keys.0.len() != join_keys.1.len() {
return plan_err!("left_keys and right_keys were not the same length");
Expand Down Expand Up @@ -1128,6 +1147,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equality,
null_aware,
})))
}

Expand Down Expand Up @@ -1201,6 +1221,7 @@ impl LogicalPlanBuilder {
join_type,
JoinConstraint::Using,
NullEquality::NullEqualsNothing,
false, // null_aware
)?;

Ok(Self::new(LogicalPlan::Join(join)))
Expand All @@ -1217,6 +1238,7 @@ impl LogicalPlanBuilder {
JoinType::Inner,
JoinConstraint::On,
NullEquality::NullEqualsNothing,
false, // null_aware
)?;

Ok(Self::new(LogicalPlan::Join(join)))
Expand Down Expand Up @@ -1471,6 +1493,7 @@ impl LogicalPlanBuilder {
join_type,
JoinConstraint::On,
NullEquality::NullEqualsNothing,
false, // null_aware
)?;

Ok(Self::new(LogicalPlan::Join(join)))
Expand Down
Loading