diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index e8ff1ccf0670..b6a5e18da1f1 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -91,141 +91,163 @@ fn col_lt_col_filter(schema1: Arc, schema2: Arc) -> JoinFilter { #[tokio::test] async fn test_inner_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Inner, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Inner, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Inner, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Inner, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Left, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Left, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Left, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Left, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Right, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Right, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Right, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Right, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Full, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Full, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::Full, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[NljHj, HjSmj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::Full, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj, HjSmj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000, false), + make_staggered_batches_i32(1000, false), JoinType::RightSemi, Some(Box::new(col_lt_col_filter)), ) @@ -235,45 +257,51 @@ async fn test_right_semi_join_1k_filtered() { #[tokio::test] async fn test_left_anti_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftAnti, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000, false), + make_staggered_batches_i32(1000, false), JoinType::RightAnti, Some(Box::new(col_lt_col_filter)), ) @@ -283,226 +311,262 @@ async fn test_right_anti_join_1k_filtered() { #[tokio::test] async fn test_left_mark_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_mark_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::LeftMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::LeftMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support #[tokio::test] async fn test_right_mark_join_1k() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_mark_join_1k_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_i32(1000), - make_staggered_batches_i32(1000), - JoinType::RightMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_i32(1000, left_extra), + make_staggered_batches_i32(1000, right_extra), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Inner, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Inner, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_inner_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Inner, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Inner, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Left, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Left, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Left, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Left, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Right, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Right, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Right, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Right, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Full, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Full, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_full_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::Full, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[NljHj, HjSmj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::Full, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj, HjSmj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_semi_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightSemi, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_semi_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightSemi, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftAnti, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_anti_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftAnti, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_anti_join_1k_binary() { JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000, false), + make_staggered_batches_binary(1000, false), JoinType::RightAnti, None, ) @@ -513,8 +577,8 @@ async fn test_right_anti_join_1k_binary() { #[tokio::test] async fn test_right_anti_join_1k_binary_filtered() { JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000, false), + make_staggered_batches_binary(1000, false), JoinType::RightAnti, Some(Box::new(col_lt_col_filter)), ) @@ -524,51 +588,59 @@ async fn test_right_anti_join_1k_binary_filtered() { #[tokio::test] async fn test_left_mark_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_left_mark_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::LeftMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::LeftMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support #[tokio::test] async fn test_right_mark_join_1k_binary() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightMark, - None, - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } #[tokio::test] async fn test_right_mark_join_1k_binary_filtered() { - JoinFuzzTestCase::new( - make_staggered_batches_binary(1000), - make_staggered_batches_binary(1000), - JoinType::RightMark, - Some(Box::new(col_lt_col_filter)), - ) - .run_test(&[HjSmj, NljHj], false) - .await + for (left_extra, right_extra) in [(true, true), (false, true), (true, false)] { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000, left_extra), + make_staggered_batches_binary(1000, right_extra), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await + } } type JoinFilterBuilder = Box, Arc) -> JoinFilter>; @@ -1031,7 +1103,7 @@ impl JoinFuzzTestCase { /// Return randomly sized record batches with: /// two sorted int32 columns 'a', 'b' ranged from 0..99 as join columns /// two random int32 columns 'x', 'y' as other columns -fn make_staggered_batches_i32(len: usize) -> Vec { +fn make_staggered_batches_i32(len: usize, with_extra_column: bool) -> Vec { let mut rng = rand::rng(); let mut input12: Vec<(i32, i32)> = vec![(0, 0); len]; let mut input3: Vec = vec![0; len]; @@ -1047,14 +1119,18 @@ fn make_staggered_batches_i32(len: usize) -> Vec { let input3 = Int32Array::from_iter_values(input3); let input4 = Int32Array::from_iter_values(input4); - // split into several record batches - let batch = RecordBatch::try_from_iter(vec![ + let mut columns = vec![ ("a", Arc::new(input1) as ArrayRef), ("b", Arc::new(input2) as ArrayRef), ("x", Arc::new(input3) as ArrayRef), - ("y", Arc::new(input4) as ArrayRef), - ]) - .unwrap(); + ]; + + if with_extra_column { + columns.push(("y", Arc::new(input4) as ArrayRef)); + } + + // split into several record batches + let batch = RecordBatch::try_from_iter(columns).unwrap(); // use a random number generator to pick a random sized output stagger_batch_with_seed(batch, 42) @@ -1070,7 +1146,10 @@ fn rand_bytes(rng: &mut R, min: usize, max: usize) -> Vec { /// Return randomly sized record batches with: /// two sorted binary columns 'a', 'b' (lexicographically) as join columns /// two random binary columns 'x', 'y' as other columns -fn make_staggered_batches_binary(len: usize) -> Vec { +fn make_staggered_batches_binary( + len: usize, + with_extra_column: bool, +) -> Vec { let mut rng = rand::rng(); // produce (a,b) pairs then sort lexicographically so SMJ has naturally sorted keys @@ -1088,13 +1167,17 @@ fn make_staggered_batches_binary(len: usize) -> Vec { let x = BinaryArray::from_iter_values(input3.iter()); let y = BinaryArray::from_iter_values(input4.iter()); - let batch = RecordBatch::try_from_iter(vec![ + let mut columns = vec![ ("a", Arc::new(a) as ArrayRef), ("b", Arc::new(b) as ArrayRef), ("x", Arc::new(x) as ArrayRef), - ("y", Arc::new(y) as ArrayRef), - ]) - .unwrap(); + ]; + + if with_extra_column { + columns.push(("y", Arc::new(y) as ArrayRef)); + } + + let batch = RecordBatch::try_from_iter(columns).unwrap(); // preserve your existing randomized partitioning stagger_batch_with_seed(batch, 42) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 28020450c427..0325e37d42e7 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -1559,26 +1559,32 @@ impl SortMergeJoinStream { null_joined_batch.num_rows(), ); - let columns = if !matches!(self.join_type, JoinType::Right) { - let mut left_columns = null_joined_batch - .columns() - .iter() - .take(right_columns_length) - .cloned() - .collect::>(); - - left_columns.extend(right_columns); - left_columns - } else { - let left_columns = null_joined_batch - .columns() - .iter() - .skip(left_columns_length) - .cloned() - .collect::>(); - - right_columns.extend(left_columns); - right_columns + let columns = match self.join_type { + JoinType::Right => { + // The first columns are the right columns. + let left_columns = null_joined_batch + .columns() + .iter() + .skip(right_columns_length) + .cloned() + .collect::>(); + + right_columns.extend(left_columns); + right_columns + } + JoinType::Left | JoinType::LeftMark | JoinType::RightMark => { + // The first columns are the left columns. + let mut left_columns = null_joined_batch + .columns() + .iter() + .take(left_columns_length) + .cloned() + .collect::>(); + + left_columns.extend(right_columns); + left_columns + } + _ => exec_err!("Did not expect join type {}", self.join_type)?, }; // Push the streamed/buffered batch joined nulls to the output diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 83a5c4041cc0..f91bffbed78f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -644,6 +644,240 @@ async fn join_right_one() -> Result<()> { Ok(()) } +#[tokio::test] +async fn join_right_different_columns_count_with_filter() -> Result<()> { + // select * + // from t1 + // right join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2 + + let left = build_table( + ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let right = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a1", 0)), + Operator::Gt, + Arc::new(Column::new("a2", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Int32, true), + ])), + ); + + let (_, batches) = join_collect_with_filter(left, right, on, filter, Right).await?; + + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | + +----+----+----+----+----+ + | | | | 10 | 4 | + | 21 | 5 | 8 | 20 | 5 | + | | | | 30 | 6 | + +----+----+----+----+----+ + "#); + Ok(()) +} + +#[tokio::test] +async fn join_left_different_columns_count_with_filter() -> Result<()> { + // select * + // from t2 + // left join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1 + + let left = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the right + ); + + let right = build_table( + ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a2", 0)), + Operator::Gt, + Arc::new(Column::new("a1", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("a1", DataType::Int32, true), + ])), + ); + + let (_, batches) = join_collect_with_filter(left, right, on, filter, Left).await?; + + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+ + | a2 | b1 | a1 | b1 | c1 | + +----+----+----+----+----+ + | 10 | 4 | 1 | 4 | 7 | + | 20 | 5 | | | | + | 30 | 6 | | | | + +----+----+----+----+----+ + "#); + Ok(()) +} + +#[tokio::test] +async fn join_left_mark_different_columns_count_with_filter() -> Result<()> { + // select * + // from t2 + // left mark join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1 + + let left = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the right + ); + + let right = build_table( + ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a2", 0)), + Operator::Gt, + Arc::new(Column::new("a1", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("a1", DataType::Int32, true), + ])), + ); + + let (_, batches) = + join_collect_with_filter(left, right, on, filter, LeftMark).await?; + + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-------+ + | a2 | b1 | mark | + +----+----+-------+ + | 10 | 4 | true | + | 20 | 5 | false | + | 30 | 6 | false | + +----+----+-------+ + "#); + Ok(()) +} + +#[tokio::test] +async fn join_right_mark_different_columns_count_with_filter() -> Result<()> { + // select * + // from t1 + // right mark join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2 + + let left = build_table( + ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2) + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + let right = build_table_two_cols( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a1", 0)), + Operator::Gt, + Arc::new(Column::new("a2", 1)), + )), + vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Int32, true), + ])), + ); + + let (_, batches) = + join_collect_with_filter(left, right, on, filter, RightMark).await?; + + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-------+ + | a2 | b1 | mark | + +----+----+-------+ + | 10 | 4 | false | + | 20 | 5 | true | + | 30 | 6 | false | + +----+----+-------+ + "#); + Ok(()) +} + #[tokio::test] async fn join_full_one() -> Result<()> { let left = build_table( diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index ed463333217a..aa87026c5cf3 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -891,3 +891,52 @@ drop table t2; # return sql params back to default values statement ok set datafusion.optimizer.prefer_hash_join = true; + +########## +## Tests for equijoins with different column counts +########## + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +statement ok +DROP TABLE IF EXISTS t1; + +statement ok +CREATE TABLE t1(a int, b int) AS VALUES (1, 100), (2, 200), (3, 300); + +statement ok +DROP TABLE IF EXISTS t2; + +statement ok +CREATE TABLE t2(a int, b int, c int) AS VALUES (4, 101, 1001), (3, 201, 2001), (2, 250, 3001); + +statement ok +DROP TABLE IF EXISTS t3; + +statement ok +CREATE TABLE t3(x int) AS VALUES (1); + +query IIIII +SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b +---- +NULL NULL NULL 1 100 +2 250 3001 2 200 +NULL NULL NULL 3 300 + +query IIIII +SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b +---- +1 100 NULL NULL NULL +2 200 2 250 3001 +3 300 NULL NULL NULL + +# Small table for LeftMark + +# LeftMark equijoin with different columns count +query III rowsort +SELECT t2.a, t2.b, t2.c +FROM t2 +WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) +---- +4 101 1001 \ No newline at end of file