From 0f571dbf2c97c3e8c6abe254e3af199d57eee11b Mon Sep 17 00:00:00 2001 From: Tal Glanzman Date: Mon, 17 Nov 2025 16:25:46 +0200 Subject: [PATCH 1/4] Pick the correct columns for right join type with join filter --- .../src/joins/sort_merge_join/stream.rs | 3 +- .../src/joins/sort_merge_join/tests.rs | 61 +++++++++++++++++++ .../test_files/sort_merge_join.slt | 20 ++++++ 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 28020450c427..0e8c1256cd2e 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -1570,10 +1570,11 @@ impl SortMergeJoinStream { left_columns.extend(right_columns); left_columns } else { + // For right joins, the first columns are the right columns. let left_columns = null_joined_batch .columns() .iter() - .skip(left_columns_length) + .skip(right_columns_length) .cloned() .collect::>(); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 83a5c4041cc0..5930f6578952 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -644,6 +644,67 @@ async fn join_right_one() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn join_right_different_columns_count_with_filter() -> Result<()> { + + // select * + // from t1 + // right join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2 + + let left = build_table( + ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let right = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a1", 0)), + Operator::Gt, + Arc::new(Column::new("a2", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Int32, true), + ])), + ); + + let (_, batches) = join_collect_with_filter(left, right, on, filter, Right).await?; + + // The output order is important as SMJ preserves sortedness + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | + +----+----+----+----+----+ + | | | | 10 | 4 | + | 21 | 5 | 8 | 20 | 5 | + | | | | 30 | 6 | + +----+----+----+----+----+ + "#); + Ok(()) +} + #[tokio::test] async fn join_full_one() -> Result<()> { let left = build_table( diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index ed463333217a..c822c4e9ed34 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -891,3 +891,23 @@ drop table t2; # return sql params back to default values statement ok set datafusion.optimizer.prefer_hash_join = true; + +########## +## Tests for equijoins with different column counts +########## + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +statement ok +CREATE TABLE t1(a int, b int) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +CREATE TABLE t2(a int, b int, c int) AS VALUES (4, 101, 1001), (3, 201, 2001), (2, 250, 3001); + +query IIIII +SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b +---- +NULL NULL NULL 1 100 +2 250 3001 2 200 +NULL NULL NULL 3 300 \ No newline at end of file From cfaeee5bccace3f2fd6a3075bcb68d6073b318ea Mon Sep 17 00:00:00 2001 From: Tal Glanzman Date: Tue, 18 Nov 2025 13:28:27 +0200 Subject: [PATCH 2/4] Pick the correct columns for left join type with join filter --- .../src/joins/sort_merge_join/stream.rs | 4 +- .../src/joins/sort_merge_join/tests.rs | 60 +++++++++++++++++++ .../test_files/sort_merge_join.slt | 9 ++- 3 files changed, 71 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 0e8c1256cd2e..793b57041c6d 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -1560,10 +1560,12 @@ impl SortMergeJoinStream { ); let columns = if !matches!(self.join_type, JoinType::Right) { + // For left joins, the first columns are the left columns. + // Critical: There is a bug here still because the match directions. let mut left_columns = null_joined_batch .columns() .iter() - .take(right_columns_length) + .take(left_columns_length) .cloned() .collect::>(); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 5930f6578952..3f55eaed3c44 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -705,6 +705,66 @@ async fn join_right_different_columns_count_with_filter() -> Result<()> { Ok(()) } +#[tokio::test] +async fn join_left_different_columns_count_with_filter() -> Result<()> { + + // select * + // from t2 + // left join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1 + + let left = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the right + ); + + let right = build_table( + ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a2", 0)), + Operator::Gt, + Arc::new(Column::new("a1", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("a1", DataType::Int32, true), + ])), + ); + + let (_, batches) = join_collect_with_filter(left, right, on, filter, Left).await?; + + // The output order is important as SMJ preserves sortedness + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+ + | a2 | b1 | a1 | b1 | c1 | + +----+----+----+----+----+ + | 10 | 4 | 1 | 4 | 7 | + | 20 | 5 | | | | + | 30 | 6 | | | | + +----+----+----+----+----+ + "#); + Ok(()) +} + #[tokio::test] async fn join_full_one() -> Result<()> { let left = build_table( diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index c822c4e9ed34..8d7d81568816 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -910,4 +910,11 @@ SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b ---- NULL NULL NULL 1 100 2 250 3001 2 200 -NULL NULL NULL 3 300 \ No newline at end of file +NULL NULL NULL 3 300 + +query IIIII +SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b +---- +1 100 NULL NULL NULL +2 200 2 250 3001 +3 300 NULL NULL NULL \ No newline at end of file From e1de055405f3d0fd99e91c44b1b37a3740d4faf1 Mon Sep 17 00:00:00 2001 From: Tal Glanzman Date: Tue, 18 Nov 2025 13:44:07 +0200 Subject: [PATCH 3/4] Fix column picks for LeftMark and RightMark --- .../src/joins/sort_merge_join/stream.rs | 49 +++---- .../src/joins/sort_merge_join/tests.rs | 125 +++++++++++++++++- .../test_files/sort_merge_join.slt | 24 +++- 3 files changed, 171 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 793b57041c6d..0325e37d42e7 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -1559,29 +1559,32 @@ impl SortMergeJoinStream { null_joined_batch.num_rows(), ); - let columns = if !matches!(self.join_type, JoinType::Right) { - // For left joins, the first columns are the left columns. - // Critical: There is a bug here still because the match directions. - let mut left_columns = null_joined_batch - .columns() - .iter() - .take(left_columns_length) - .cloned() - .collect::>(); - - left_columns.extend(right_columns); - left_columns - } else { - // For right joins, the first columns are the right columns. - let left_columns = null_joined_batch - .columns() - .iter() - .skip(right_columns_length) - .cloned() - .collect::>(); - - right_columns.extend(left_columns); - right_columns + let columns = match self.join_type { + JoinType::Right => { + // The first columns are the right columns. + let left_columns = null_joined_batch + .columns() + .iter() + .skip(right_columns_length) + .cloned() + .collect::>(); + + right_columns.extend(left_columns); + right_columns + } + JoinType::Left | JoinType::LeftMark | JoinType::RightMark => { + // The first columns are the left columns. + let mut left_columns = null_joined_batch + .columns() + .iter() + .take(left_columns_length) + .cloned() + .collect::>(); + + left_columns.extend(right_columns); + left_columns + } + _ => exec_err!("Did not expect join type {}", self.join_type)?, }; // Push the streamed/buffered batch joined nulls to the output diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 3f55eaed3c44..2c9a639f2808 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -644,10 +644,8 @@ async fn join_right_one() -> Result<()> { Ok(()) } - #[tokio::test] async fn join_right_different_columns_count_with_filter() -> Result<()> { - // select * // from t1 // right join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2 @@ -707,7 +705,6 @@ async fn join_right_different_columns_count_with_filter() -> Result<()> { #[tokio::test] async fn join_left_different_columns_count_with_filter() -> Result<()> { - // select * // from t2 // left join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1 @@ -765,6 +762,128 @@ async fn join_left_different_columns_count_with_filter() -> Result<()> { Ok(()) } +#[tokio::test] +async fn join_left_mark_different_columns_count_with_filter() -> Result<()> { + // select * + // from t2 + // left mark join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1 + + let left = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the right + ); + + let right = build_table( + ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a2", 0)), + Operator::Gt, + Arc::new(Column::new("a1", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("a1", DataType::Int32, true), + ])), + ); + + let (_, batches) = + join_collect_with_filter(left, right, on, filter, LeftMark).await?; + + // The output order is important as SMJ preserves sortedness + // LeftMark returns all left rows with a boolean mark column + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-------+ + | a2 | b1 | mark | + +----+----+-------+ + | 10 | 4 | true | + | 20 | 5 | false | + | 30 | 6 | false | + +----+----+-------+ + "#); + Ok(()) +} + +#[tokio::test] +async fn join_right_mark_different_columns_count_with_filter() -> Result<()> { + // select * + // from t1 + // right mark join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2 + + let left = build_table( + ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let right = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a1", 0)), + Operator::Gt, + Arc::new(Column::new("a2", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Int32, true), + ])), + ); + + let (_, batches) = + join_collect_with_filter(left, right, on, filter, RightMark).await?; + + // The output order is important as SMJ preserves sortedness + // RightMark returns all right rows with a boolean mark column + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-------+ + | a2 | b1 | mark | + +----+----+-------+ + | 10 | 4 | false | + | 20 | 5 | true | + | 30 | 6 | false | + +----+----+-------+ + "#); + Ok(()) +} + #[tokio::test] async fn join_full_one() -> Result<()> { let left = build_table( diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index 8d7d81568816..aa87026c5cf3 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -899,12 +899,24 @@ set datafusion.optimizer.prefer_hash_join = true; statement ok set datafusion.optimizer.prefer_hash_join = false; +statement ok +DROP TABLE IF EXISTS t1; + statement ok CREATE TABLE t1(a int, b int) AS VALUES (1, 100), (2, 200), (3, 300); +statement ok +DROP TABLE IF EXISTS t2; + statement ok CREATE TABLE t2(a int, b int, c int) AS VALUES (4, 101, 1001), (3, 201, 2001), (2, 250, 3001); +statement ok +DROP TABLE IF EXISTS t3; + +statement ok +CREATE TABLE t3(x int) AS VALUES (1); + query IIIII SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b ---- @@ -917,4 +929,14 @@ SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b ---- 1 100 NULL NULL NULL 2 200 2 250 3001 -3 300 NULL NULL NULL \ No newline at end of file +3 300 NULL NULL NULL + +# Small table for LeftMark + +# LeftMark equijoin with different columns count +query III rowsort +SELECT t2.a, t2.b, t2.c +FROM t2 +WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) +---- +4 101 1001 \ No newline at end of file From 0c457ba90d2508636efb59c07e2a0b45f936eec9 Mon Sep 17 00:00:00 2001 From: Tal Glanzman Date: Tue, 18 Nov 2025 18:43:09 +0200 Subject: [PATCH 4/4] Enhance fuzz tests to fuzzify the columns count --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 697 ++++++++++-------- 1 file changed, 390 insertions(+), 307 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index e8ff1ccf0670..b6a5e18da1f1 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -91,141 +91,163 @@ fn col_lt_col_filter(schema1: Arc, schema2: Arc) -> JoinFilter { #[tokio::test] async fn test_inner_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Inner, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Inner, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Inner, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Inner, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Left, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Left, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Left, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Left, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Right, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Right, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Right, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Right, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Full, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Full, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Full, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[NljHj, HjSmj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Full, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj, HjSmj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000, false), + make_staggered_batches_i32(1000, false), JoinType::RightSemi, Some(Box::new(col_lt_col_filter)), ) @@ -235,45 +257,51 @@ async fn test_right_semi_join_1k_filtered() { #[tokio::test] async fn test_left_anti_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftAnti, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000, false), + make_staggered_batches_i32(1000, false), JoinType::RightAnti, Some(Box::new(col_lt_col_filter)), ) @@ -283,226 +311,262 @@ async fn test_right_anti_join_1k_filtered() { #[tokio::test] async fn test_left_mark_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_mark_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support #[tokio::test] async fn test_right_mark_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_mark_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Inner, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Inner, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Inner, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Inner, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Left, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Left, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Left, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Left, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Right, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Right, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Right, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Right, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Full, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Full, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Full, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[NljHj, HjSmj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Full, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj, HjSmj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftAnti, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k_binary() { JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000, false), + make_staggered_batches_binary(1000, false), JoinType::RightAnti, None, ) @@ -513,8 +577,8 @@ async fn test_right_anti_join_1k_binary() { #[tokio::test] async fn test_right_anti_join_1k_binary_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000, false), + make_staggered_batches_binary(1000, false), JoinType::RightAnti, Some(Box::new(col_lt_col_filter)), ) @@ -524,51 +588,59 @@ async fn test_right_anti_join_1k_binary_filtered() { #[tokio::test] async fn test_left_mark_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_mark_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support #[tokio::test] async fn test_right_mark_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_mark_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } type JoinFilterBuilder = Box, Arc) -> JoinFilter>; @@ -1031,7 +1103,7 @@ impl JoinFuzzTestCase { /// Return randomly sized record batches with: /// two sorted int32 columns 'a', 'b' ranged from 0..99 as join columns /// two random int32 columns 'x', 'y' as other columns -fn make_staggered_batches_i32(len: usize) -> Vec { +fn make_staggered_batches_i32(len: usize, with_extra_column: bool) -> Vec { let mut rng = rand::rng(); let mut input12: Vec<(i32, i32)> = vec![(0, 0); len]; let mut input3: Vec = vec![0; len]; @@ -1047,14 +1119,18 @@ fn make_staggered_batches_i32(len: usize) -> Vec { let input3 = Int32Array::from_iter_values(input3); let input4 = Int32Array::from_iter_values(input4); - // split into several record batches - let batch = RecordBatch::try_from_iter(vec![ + let mut columns = vec![ ("a", Arc::new(input1) as ArrayRef), ("b", Arc::new(input2) as ArrayRef), ("x", Arc::new(input3) as ArrayRef), - ("y", Arc::new(input4) as ArrayRef), - ]) - .unwrap(); + ]; + + if with_extra_column { + columns.push(("y", Arc::new(input4) as ArrayRef)); + } + + // split into several record batches + let batch = RecordBatch::try_from_iter(columns).unwrap(); // use a random number generator to pick a random sized output stagger_batch_with_seed(batch, 42) @@ -1070,7 +1146,10 @@ fn rand_bytes(rng: &mut R, min: usize, max: usize) -> Vec { /// Return randomly sized record batches with: /// two sorted binary columns 'a', 'b' (lexicographically) as join columns /// two random binary columns 'x', 'y' as other columns -fn make_staggered_batches_binary(len: usize) -> Vec { +fn make_staggered_batches_binary( + len: usize, + with_extra_column: bool, +) -> Vec { let mut rng = rand::rng(); // produce (a,b) pairs then sort lexicographically so SMJ has naturally sorted keys @@ -1088,13 +1167,17 @@ fn make_staggered_batches_binary(len: usize) -> Vec { let x = BinaryArray::from_iter_values(input3.iter()); let y = BinaryArray::from_iter_values(input4.iter()); - let batch = RecordBatch::try_from_iter(vec![ + let mut columns = vec![ ("a", Arc::new(a) as ArrayRef), ("b", Arc::new(b) as ArrayRef), ("x", Arc::new(x) as ArrayRef), - ("y", Arc::new(y) as ArrayRef), - ]) - .unwrap(); + ]; + + if with_extra_column { + columns.push(("y", Arc::new(y) as ArrayRef)); + } + + let batch = RecordBatch::try_from_iter(columns).unwrap(); // preserve your existing randomized partitioning stagger_batch_with_seed(batch, 42)