From e07d386ad9203bdf91f1ab646c9dafd644c17ccc Mon Sep 17 00:00:00 2001 From: Wang Date: Fri, 25 Nov 2022 19:55:23 +0800 Subject: [PATCH 1/2] HashJoin should return Err when the right side input stream produce Err, add more join UTs to cover different join types --- .../core/src/physical_plan/joins/hash_join.rs | 66 +++- datafusion/core/tests/sql/joins.rs | 338 ++++++++++++++++++ datafusion/core/tests/sql/mod.rs | 127 +++++++ datafusion/sql/src/planner.rs | 3 +- 4 files changed, 531 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 24b0f90d7c65..89583c03c187 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -1554,7 +1554,8 @@ impl HashJoinStream { } Some(result.map(|x| x.0)) } - other => { + Some(err) => Some(err), + None => { let timer = self.join_metrics.join_time.timer(); // For the left join, produce rows for unmatched rows match self.join_type { @@ -1593,7 +1594,7 @@ impl HashJoinStream { | JoinType::Right => {} } - other + None } }) } @@ -1618,9 +1619,11 @@ mod tests { physical_plan::{ common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec, }, + test::exec::MockExec, test::{build_table_i32, columns}, }; use arrow::datatypes::Field; + use arrow::error::ArrowError; use datafusion_expr::Operator; use super::*; @@ -3095,4 +3098,63 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn join_with_error_right() { + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), + ("c1", &vec![7, 8, 9]), + ); + + // right input stream returns one good batch and then one error. + // The error should be returned. + let err = Err(ArrowError::ComputeError("bad data error".to_string())); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + + let on = vec![( + Column::new_with_schema("b1", &left.schema()).unwrap(), + Column::new_with_schema("b1", &right.schema()).unwrap(), + )]; + let schema = right.schema(); + let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![])); + let right_input = Arc::new(MockExec::new(vec![Ok(right), err], schema)); + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::RightSemi, + JoinType::RightAnti, + ]; + + for join_type in join_types { + let join = join( + left.clone(), + right_input.clone(), + on.clone(), + &join_type, + false, + ) + .unwrap(); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let stream = join.execute(0, task_ctx).unwrap(); + + // Expect that an error is returned + let result_string = crate::physical_plan::common::collect(stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {}", + result_string + ); + } + } } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 7dca0e91ca0a..112f6f68af27 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -138,6 +138,58 @@ async fn equijoin_left_and_condition_from_right() -> Result<()> { Ok(()) } +#[tokio::test] +async fn equijoin_left_and_not_null_condition_from_right() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name is not null ORDER BY t1_id"; + let res = ctx.create_logical_plan(sql); + assert!(res.is_ok()); + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+---------+", + "| t1_id | t1_name | t2_name |", + "+-------+---------+---------+", + "| 11 | a | z |", + "| 22 | b | y |", + "| 33 | c | |", + "| 44 | d | x |", + "+-------+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +async fn equijoin_left_with_error_right() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; + let sql = + "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name is not null ORDER BY t1_id"; + let res = ctx.create_logical_plan(sql); + assert!(res.is_ok()); + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+---------+", + "| t1_id | t1_name | t2_name |", + "+-------+---------+---------+", + "| 11 | a | z |", + "| 22 | b | y |", + "| 33 | c | |", + "| 44 | d | x |", + "+-------+---------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + #[tokio::test] async fn full_join_sub_query() -> Result<()> { let test_repartition_joins = vec![true, false]; @@ -1992,3 +2044,289 @@ async fn sort_merge_join_on_decimal() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn left_semi_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_left_semi_anti_join_context_with_null_ids( + "t1_id", + "t2_id", + repartition_joins, + ) + .unwrap(); + + let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id"; + let msg = format!("Creating logical plan for '{}'", sql); + let plan = ctx.create_logical_plan(sql).expect(&msg); + let state = ctx.state(); + let logical_plan = state.optimize(&plan)?; + let physical_plan = state.create_physical_plan(&logical_plan).await?; + let expected = if repartition_joins { + vec![ + "SortExec: [t1_id@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", + " CoalesceBatchesExec: target_batch_size=4096", + " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " MemoryExec: partitions=1, partition_sizes=[1]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)", + " ProjectionExec: expr=[t2_id@0 as t2_id]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " MemoryExec: partitions=1, partition_sizes=[1]", + ] + } else { + vec![ + "SortExec: [t1_id@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]", + " CoalesceBatchesExec: target_batch_size=4096", + " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", + " MemoryExec: partitions=1, partition_sizes=[1]", + " ProjectionExec: expr=[t2_id@0 as t2_id]", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " MemoryExec: partitions=1, partition_sizes=[1]", + ] + }; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+", + "| t1_id | t1_name |", + "+-------+---------+", + "| 11 | a |", + "| 11 | a |", + "| 22 | b |", + "| 44 | d |", + "+-------+---------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id) ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+", + "| t1_id | t1_name |", + "+-------+---------+", + "| 11 | a |", + "| 11 | a |", + "| 22 | b |", + "| 44 | d |", + "+-------+---------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT t1_id FROM t1 INTERSECT SELECT t2_id FROM t2 ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| t1_id |", + "+-------+", + "| 11 |", + "| 22 |", + "| 44 |", + "| |", + "+-------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +async fn left_anti_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_left_semi_anti_join_context_with_null_ids( + "t1_id", + "t2_id", + repartition_joins, + ) + .unwrap(); + + let sql = "SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id) ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+", + "| t1_id | t1_name |", + "+-------+---------+", + "| 33 | c |", + "| | e |", + "+-------+---------+", + ]; + assert_batches_eq!(expected, &actual); + + let sql = "SELECT t1_id FROM t1 EXCEPT SELECT t2_id FROM t2 ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| t1_id |", + "+-------+", + "| 33 |", + "+-------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "Test ignored, will be enabled after fixing the anti join plan bug"] +// https://github.com/apache/arrow-datafusion/issues/4366 +async fn error_left_anti_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_left_semi_anti_join_context_with_null_ids( + "t1_id", + "t2_id", + repartition_joins, + ) + .unwrap(); + + let sql = "SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id and t1_id > 11) ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+", + "| t1_id | t1_name |", + "+-------+---------+", + "| 11 | a |", + "| 11 | a |", + "| 33 | c |", + "| | e |", + "+-------+---------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "Test ignored, will be enabled after fixing the NAAJ bug"] +// https://github.com/apache/arrow-datafusion/issues/4211 +async fn null_aware_left_anti_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_left_semi_anti_join_context_with_null_ids( + "t1_id", + "t2_id", + repartition_joins, + ) + .unwrap(); + + let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id NOT IN (SELECT t2_id FROM t2) ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec!["++", "++"]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "Test ignored, will be enabled after fixing right semi join bug"] +// https://github.com/apache/arrow-datafusion/issues/4247 +async fn right_semi_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_right_semi_anti_join_context_with_null_ids( + "t1_id", + "t2_id", + repartition_joins, + ) + .unwrap(); + + let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id"; + let msg = format!("Creating logical plan for '{}'", sql); + let plan = ctx.create_logical_plan(sql).expect(&msg); + let state = ctx.state(); + let logical_plan = state.optimize(&plan)?; + let physical_plan = state.create_physical_plan(&logical_plan).await?; + let expected = if repartition_joins { + vec![ "SortExec: [t1_id@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", + " CoalesceBatchesExec: target_batch_size=4096", + " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " MemoryExec: partitions=1, partition_sizes=[1]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " MemoryExec: partitions=1, partition_sizes=[1]", + ] + } else { + vec![ + "SortExec: [t1_id@0 ASC NULLS LAST]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=RoundRobinBatch(2)", + " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }", + " MemoryExec: partitions=1, partition_sizes=[1]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ] + }; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+--------+", + "| t1_id | t1_name | t1_int |", + "+-------+---------+--------+", + "| 11 | a | 1 |", + "+-------+---------+--------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "Test ignored, will be enabled after fixing cross join bug"] +// https://github.com/apache/arrow-datafusion/issues/4363 +async fn error_cross_join() -> Result<()> { + let test_repartition_joins = vec![true, false]; + for repartition_joins in test_repartition_joins { + let ctx = create_join_context("t1_id", "t2_id", repartition_joins).unwrap(); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON (t1_id != t2_id and t2_id >= 100) ORDER BY t1_id"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+---------+---------+", + "| t1_id | t1_name | t2_name |", + "+-------+---------+---------+", + "| 11 | a | |", + "| 22 | b | |", + "| 33 | c | |", + "| 44 | d | |", + "+-------+---------+---------+", + ]; + + assert_batches_eq!(expected, &actual); + } + + Ok(()) +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 81a48bd9921b..1e1307672394 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -231,6 +231,133 @@ fn create_join_context( Ok(ctx) } +fn create_left_semi_anti_join_context_with_null_ids( + column_left: &str, + column_right: &str, + repartition_joins: bool, +) -> Result { + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_repartition_joins(repartition_joins) + .with_target_partitions(2), + ); + + let t1_schema = Arc::new(Schema::new(vec![ + Field::new(column_left, DataType::UInt32, true), + Field::new("t1_name", DataType::Utf8, true), + Field::new("t1_int", DataType::UInt32, true), + ])); + let t1_data = RecordBatch::try_new( + t1_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(11), + Some(22), + Some(33), + Some(44), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])), + Arc::new(UInt32Array::from_slice([1, 1, 2, 3, 4, 0])), + ], + )?; + ctx.register_batch("t1", t1_data)?; + + let t2_schema = Arc::new(Schema::new(vec![ + Field::new(column_right, DataType::UInt32, true), + Field::new("t2_name", DataType::Utf8, true), + Field::new("t2_int", DataType::UInt32, true), + ])); + let t2_data = RecordBatch::try_new( + t2_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(11), + Some(22), + Some(44), + Some(55), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("z"), + Some("z"), + Some("y"), + Some("x"), + Some("w"), + Some("v"), + ])), + Arc::new(UInt32Array::from_slice([3, 3, 1, 3, 3, 0])), + ], + )?; + ctx.register_batch("t2", t2_data)?; + + Ok(ctx) +} + +fn create_right_semi_anti_join_context_with_null_ids( + column_left: &str, + column_right: &str, + repartition_joins: bool, +) -> Result { + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_repartition_joins(repartition_joins) + .with_target_partitions(2), + ); + + let t1_schema = Arc::new(Schema::new(vec![ + Field::new(column_left, DataType::UInt32, true), + Field::new("t1_name", DataType::Utf8, true), + Field::new("t1_int", DataType::UInt32, true), + ])); + let t1_data = RecordBatch::try_new( + t1_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(22), + Some(33), + Some(44), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])), + Arc::new(UInt32Array::from_slice([1, 2, 3, 4, 0])), + ], + )?; + ctx.register_batch("t1", t1_data)?; + + let t2_schema = Arc::new(Schema::new(vec![ + Field::new(column_right, DataType::UInt32, true), + Field::new("t2_name", DataType::Utf8, true), + ])); + // t2 data size is smaller than t1 + let t2_data = RecordBatch::try_new( + t2_schema, + vec![ + Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])), + Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])), + ], + )?; + ctx.register_batch("t2", t2_data)?; + + Ok(ctx) +} + fn create_join_context_qualified( left_name: &str, right_name: &str, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 4e4ca02f6a8a..3ae821379188 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -767,7 +767,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let join_filter = filter.into_iter().reduce(Expr::and); if left_keys.is_empty() { - // When we don't have join keys, use cross join + // TODO should not use cross join when the join_filter exists + // https://github.com/apache/arrow-datafusion/issues/4363 let join = LogicalPlanBuilder::from(left).cross_join(&right)?; join_filter .map(|filter| join.filter(filter)) From d296b9622601bb4c629f06fa48dafd2d8ad243d2 Mon Sep 17 00:00:00 2001 From: Wang Date: Fri, 25 Nov 2022 20:02:49 +0800 Subject: [PATCH 2/2] remove wrong ut --- datafusion/core/tests/sql/joins.rs | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 112f6f68af27..4831cbe4af91 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -164,32 +164,6 @@ async fn equijoin_left_and_not_null_condition_from_right() -> Result<()> { Ok(()) } -#[tokio::test] -async fn equijoin_left_with_error_right() -> Result<()> { - let test_repartition_joins = vec![true, false]; - for repartition_joins in test_repartition_joins { - let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; - let sql = - "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name is not null ORDER BY t1_id"; - let res = ctx.create_logical_plan(sql); - assert!(res.is_ok()); - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+---------+", - "| t1_id | t1_name | t2_name |", - "+-------+---------+---------+", - "| 11 | a | z |", - "| 22 | b | y |", - "| 33 | c | |", - "| 44 | d | x |", - "+-------+---------+---------+", - ]; - assert_batches_eq!(expected, &actual); - } - - Ok(()) -} - #[tokio::test] async fn full_join_sub_query() -> Result<()> { let test_repartition_joins = vec![true, false];