Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions datafusion/sqllogictest/test_files/join_is_not_distinct_from.slt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,36 @@ JOIN t4 ON (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT F
2 2 NULL NULL 200 200
3 3 30 30 NULL NULL

# Test mixed: 1 Eq key + multiple IS NOT DISTINCT FROM keys.
# The optimizer unconditionally favours Eq keys (see extract_equijoin_predicate.rs,
# "Only convert when there are NO equijoin predicates, to be conservative").
# All IS NOT DISTINCT FROM predicates should be demoted to filter, even when they outnumber the Eq key.
query TT
EXPLAIN SELECT t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
FROM t3
JOIN t4 ON (t3.id = t4.id) AND (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT FROM t4.val2)
----
logical_plan
01)Projection: t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
02)--Inner Join: t3.id = t4.id Filter: t3.val1 IS NOT DISTINCT FROM t4.val1 AND t3.val2 IS NOT DISTINCT FROM t4.val2
03)----TableScan: t3 projection=[id, val1, val2]
04)----TableScan: t4 projection=[id, val1, val2]
physical_plan
01)ProjectionExec: expr=[id@0 as t3_id, id@3 as t4_id, val1@1 as val1, val1@4 as val1, val2@2 as val2, val2@5 as val2]
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], filter=val1@0 IS NOT DISTINCT FROM val1@2 AND val2@1 IS NOT DISTINCT FROM val2@3
03)----DataSourceExec: partitions=1, partition_sizes=[1]
04)----DataSourceExec: partitions=1, partition_sizes=[1]

# Verify correct results: all 3 rows should match (including NULL=NULL via IS NOT DISTINCT FROM in filter)
query IIIIII rowsort
SELECT t3.id AS t3_id, t4.id AS t4_id, t3.val1, t4.val1, t3.val2, t4.val2
FROM t3
JOIN t4 ON (t3.id = t4.id) AND (t3.val1 IS NOT DISTINCT FROM t4.val1) AND (t3.val2 IS NOT DISTINCT FROM t4.val2)
----
1 1 10 10 100 100
2 2 NULL NULL 200 200
3 3 30 30 NULL NULL

statement ok
drop table t0;

Expand Down
186 changes: 146 additions & 40 deletions datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::common::{Column, JoinType, NullEquality, not_impl_err, plan_err};
use datafusion::logical_expr::requalify_sides_if_needed;
use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::utils::split_conjunction_owned;
use datafusion::logical_expr::{
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
};
Expand Down Expand Up @@ -56,15 +56,10 @@ pub async fn from_join_rel(
// So we extract each part as follows:
// - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector
// - Otherwise we add the expression to join_filter (use conjunction if filter already exists)
let (join_ons, nulls_equal_nulls, join_filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(&on);
let (join_ons, null_equality, join_filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(on);
let (left_cols, right_cols): (Vec<_>, Vec<_>) =
itertools::multiunzip(join_ons);
let null_equality = if nulls_equal_nulls {
NullEquality::NullEqualsNull
} else {
NullEquality::NullEqualsNothing
};
left.join_detailed(
right.build()?,
join_type,
Expand All @@ -89,49 +84,61 @@ pub async fn from_join_rel(
}

fn split_eq_and_noneq_join_predicate_with_nulls_equality(
filter: &Expr,
) -> (Vec<(Column, Column)>, bool, Option<Expr>) {
let exprs = split_conjunction(filter);
filter: Expr,
) -> (Vec<(Column, Column)>, NullEquality, Option<Expr>) {
let exprs = split_conjunction_owned(filter);

let mut accum_join_keys: Vec<(Column, Column)> = vec![];
let mut eq_keys: Vec<(Column, Column)> = vec![];
let mut indistinct_keys: Vec<(Column, Column)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
let mut nulls_equal_nulls = false;

for expr in exprs {
#[expect(clippy::collapsible_match)]
match expr {
Expr::BinaryExpr(binary_expr) => match binary_expr {
x @ (BinaryExpr {
left,
op: Operator::Eq,
right,
Expr::BinaryExpr(BinaryExpr {
left,
op: op @ (Operator::Eq | Operator::IsNotDistinctFrom),
right,
}) => match (*left, *right) {
(Expr::Column(l), Expr::Column(r)) => match op {
Operator::Eq => eq_keys.push((l, r)),
Operator::IsNotDistinctFrom => indistinct_keys.push((l, r)),
_ => unreachable!(),
},
(left, right) => {
accum_filters.push(Expr::BinaryExpr(BinaryExpr {
left: Box::new(left),
op,
right: Box::new(right),
}));
}
| BinaryExpr {
left,
op: Operator::IsNotDistinctFrom,
right,
}) => {
nulls_equal_nulls = match x.op {
Operator::Eq => false,
Operator::IsNotDistinctFrom => true,
_ => unreachable!(),
};

match (left.as_ref(), right.as_ref()) {
(Expr::Column(l), Expr::Column(r)) => {
accum_join_keys.push((l.clone(), r.clone()));
}
_ => accum_filters.push(expr.clone()),
}
}
_ => accum_filters.push(expr.clone()),
},
_ => accum_filters.push(expr.clone()),
_ => accum_filters.push(expr),
}
}

let (join_keys, null_equality) =
match (eq_keys.is_empty(), indistinct_keys.is_empty()) {
// Mixed: use eq_keys as equijoin keys, demote indistinct keys to filter
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We unconditionally favour Eq keys here, but if we have a case where there exists multiple (say 4) IS NOT DISTINCT FROM column pairs and 1 Eq column pair, this demotes all 4 to filter and keeps just the 1 eq key, right?

But, in this case, would the inverse (demote the single eq to filter) not allow more columns to participate in the hash partitioning/pruning and therefore be a bit more performant?
More selective hash key = frwer candidate pairs survive and need fewer row-by-row filter evaluation, if I understand correctly?

Copy link
Contributor Author

@hareshkh hareshkh Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimiser currently already has this behaviour of favouring Eq predicates over Indistinct predicates. Added a SLT to confirm that behaviour - https://github.com/apache/datafusion/pull/21121/changes#diff-63fc43cf735eb03abd4d114cfbbf24982939425938a74b354fb7db6da7d499d7R305, and replicating that behaviour in this change.

I also think that selectivity is a function of data i.e. having a hash join on 3 indistinct keys could produce more data than 1 eq key.

(false, false) => {
for (l, r) in indistinct_keys {
accum_filters.push(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(l)),
op: Operator::IsNotDistinctFrom,
right: Box::new(Expr::Column(r)),
}));
}
(eq_keys, NullEquality::NullEqualsNothing)
}
// Only eq keys
(false, true) => (eq_keys, NullEquality::NullEqualsNothing),
// Only indistinct keys
(true, false) => (indistinct_keys, NullEquality::NullEqualsNull),
// No keys at all
(true, true) => (vec![], NullEquality::NullEqualsNothing),
};

let join_filter = accum_filters.into_iter().reduce(Expr::and);
(accum_join_keys, nulls_equal_nulls, join_filter)
(join_keys, null_equality, join_filter)
}

fn from_substrait_jointype(join_type: i32) -> datafusion::common::Result<JoinType> {
Expand All @@ -153,3 +160,102 @@ fn from_substrait_jointype(join_type: i32) -> datafusion::common::Result<JoinTyp
plan_err!("invalid join type variant {join_type}")
}
}

#[cfg(test)]
mod tests {
use super::*;

fn col(name: &str) -> Expr {
Expr::Column(Column::from_name(name))
}

fn indistinct(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(left),
op: Operator::IsNotDistinctFrom,
right: Box::new(right),
})
}

fn fmt_keys(keys: &[(Column, Column)]) -> String {
keys.iter()
.map(|(l, r)| format!("{l} = {r}"))
.collect::<Vec<_>>()
.join(", ")
}

#[test]
fn split_only_eq_keys() {
let expr = col("a").eq(col("b"));
let (keys, null_eq, filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);

assert_eq!(fmt_keys(&keys), "a = b");
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
assert!(filter.is_none());
}

#[test]
fn split_only_indistinct_keys() {
let expr = indistinct(col("a"), col("b"));
let (keys, null_eq, filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);

assert_eq!(fmt_keys(&keys), "a = b");
assert_eq!(null_eq, NullEquality::NullEqualsNull);
assert!(filter.is_none());
}

/// Regression: mixed `equal` + `is_not_distinct_from` must demote
/// the indistinct key to the join filter so the single NullEquality
/// flag stays consistent (NullEqualsNothing for the eq keys).
#[test]
fn split_mixed_eq_and_indistinct_demotes_indistinct_to_filter() {
let expr =
indistinct(col("val_l"), col("val_r")).and(col("id_l").eq(col("id_r")));

let (keys, null_eq, filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);

assert_eq!(fmt_keys(&keys), "id_l = id_r");
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
assert_eq!(
filter.unwrap().to_string(),
"val_l IS NOT DISTINCT FROM val_r"
);
}

/// Multiple IS NOT DISTINCT FROM keys with a single Eq key should demote
/// all indistinct keys to the filter.
#[test]
fn split_mixed_multiple_indistinct_demoted() {
let expr = indistinct(col("a_l"), col("a_r"))
.and(indistinct(col("b_l"), col("b_r")))
.and(col("id_l").eq(col("id_r")));

let (keys, null_eq, filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);

assert_eq!(fmt_keys(&keys), "id_l = id_r");
assert_eq!(null_eq, NullEquality::NullEqualsNothing);
assert_eq!(
filter.unwrap().to_string(),
"a_l IS NOT DISTINCT FROM a_r AND b_l IS NOT DISTINCT FROM b_r"
);
}

#[test]
fn split_non_column_eq_goes_to_filter() {
let expr = Expr::Literal(
datafusion::common::ScalarValue::Utf8(Some("x".into())),
None,
)
.eq(col("b"));

let (keys, _, filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(expr);

assert!(keys.is_empty());
assert_eq!(filter.unwrap().to_string(), "Utf8(\"x\") = b");
}
}
106 changes: 106 additions & 0 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#[cfg(test)]
mod tests {
use crate::utils::test::add_plan_schemas_to_ctx;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
Expand All @@ -33,6 +35,34 @@ mod tests {
use std::io::BufReader;
use substrait::proto::Plan;

async fn execute_plan(name: &str) -> Result<Vec<RecordBatch>> {
let path = format!("tests/testdata/test_plans/{name}");
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");
let ctx = SessionContext::new();
let plan = from_substrait_plan(&ctx.state(), &proto).await?;
ctx.execute_logical_plan(plan).await?.collect().await
}

/// Pretty-print batches as a table with header on top and data rows sorted.
fn pretty_sorted(batches: &[RecordBatch]) -> String {
let pretty = pretty_format_batches(batches).unwrap().to_string();
let all_lines: Vec<&str> = pretty.trim().lines().collect();
let header = &all_lines[..3];
let mut data: Vec<&str> = all_lines[3..all_lines.len() - 1].to_vec();
data.sort();
let footer = &all_lines[all_lines.len() - 1..];
header
.iter()
.copied()
.chain(data)
.chain(footer.iter().copied())
.collect::<Vec<_>>()
.join("\n")
}

async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
let path =
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
Expand Down Expand Up @@ -762,4 +792,80 @@ mod tests {

Ok(())
}

/// Substrait join with both `equal` and `is_not_distinct_from` must demote
/// `IS NOT DISTINCT FROM` to the join filter.
#[tokio::test]
async fn test_mixed_join_equal_and_indistinct_inner_join() -> Result<()> {
let plan_str =
test_plan_to_string("mixed_join_equal_and_indistinct.json").await?;
// Eq becomes the equijoin key; IS NOT DISTINCT FROM is demoted to filter.
assert_snapshot!(
plan_str,
@r#"
Projection: left.id, left.val, left.comment, right.id AS id0, right.val AS val0, right.comment AS comment0
Inner Join: left.id = right.id Filter: left.val IS NOT DISTINCT FROM right.val
SubqueryAlias: left
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
SubqueryAlias: right
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
"#
);

// Execute and verify actual rows, including NULL=NULL matches (ids 3,4).
let results = execute_plan("mixed_join_equal_and_indistinct.json").await?;
assert_snapshot!(pretty_sorted(&results),
@r"
+----+-----+---------+-----+------+----------+
| id | val | comment | id0 | val0 | comment0 |
+----+-----+---------+-----+------+----------+
| 1 | a | c1 | 1 | a | c1 |
| 2 | b | c2 | 2 | b | c2 |
| 3 | | c3 | 3 | | c3 |
| 4 | | c4 | 4 | | c4 |
| 5 | e | c5 | 5 | e | c5 |
| 6 | f | c6 | 6 | f | c6 |
+----+-----+---------+-----+------+----------+
"
);

Ok(())
}

/// Substrait join with both `equal` and `is_not_distinct_from` must demote
/// `IS NOT DISTINCT FROM` to the join filter.
#[tokio::test]
async fn test_mixed_join_equal_and_indistinct_left_join() -> Result<()> {
let plan_str =
test_plan_to_string("mixed_join_equal_and_indistinct_left.json").await?;
assert_snapshot!(
plan_str,
@r#"
Projection: left.id, left.val, left.comment, right.id AS id0, right.val AS val0, right.comment AS comment0
Left Join: left.id = right.id Filter: left.val IS NOT DISTINCT FROM right.val
SubqueryAlias: left
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
SubqueryAlias: right
Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"), Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"), Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
"#
);

let results = execute_plan("mixed_join_equal_and_indistinct_left.json").await?;
assert_snapshot!(pretty_sorted(&results),
@r"
+----+-----+---------+-----+------+----------+
| id | val | comment | id0 | val0 | comment0 |
+----+-----+---------+-----+------+----------+
| 1 | a | c1 | 1 | a | c1 |
| 2 | b | c2 | 2 | b | c2 |
| 3 | | c3 | 3 | | c3 |
| 4 | | c4 | 4 | | c4 |
| 5 | e | c5 | 5 | e | c5 |
| 6 | f | c6 | 6 | f | c6 |
+----+-----+---------+-----+------+----------+
"
);

Ok(())
}
}
Loading
Loading