From 22fbcd0499c0cd9ecdab6768dd5bbf3eb9d559ab Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 1 Aug 2022 09:03:43 -0600 Subject: [PATCH 1/5] repro unit test --- datafusion/core/tests/sql/intersection.rs | 4 ++-- datafusion/core/tests/sql/joins.rs | 26 ++++++++++++++++++++--- datafusion/core/tests/sql/mod.rs | 6 +++--- datafusion/core/tests/sql/predicates.rs | 4 ++-- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/datafusion/core/tests/sql/intersection.rs b/datafusion/core/tests/sql/intersection.rs index fadeefee4136..607048477bea 100644 --- a/datafusion/core/tests/sql/intersection.rs +++ b/datafusion/core/tests/sql/intersection.rs @@ -23,7 +23,7 @@ async fn intersect_with_null_not_equal() { INTERSECT SELECT * FROM (SELECT null AS id1, 2 AS id2) t2"; let expected = vec!["++", "++"]; - let ctx = create_join_context_qualified().unwrap(); + let ctx = create_join_context_qualified("t1", "t2").unwrap(); let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); } @@ -41,7 +41,7 @@ async fn intersect_with_null_equal() { "+-----+-----+", ]; - let ctx = create_join_context_qualified().unwrap(); + let ctx = create_join_context_qualified("t1", "t2").unwrap(); let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 525a7c9e0848..c6fa1ae997a3 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -39,7 +39,7 @@ async fn equijoin() -> Result<()> { assert_batches_eq!(expected, &actual); } - let ctx = create_join_context_qualified()?; + let ctx = create_join_context_qualified("t1", "t2")?; let equivalent_sql = [ "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t1.a = t2.a ORDER BY t1.a", "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t2.a = t1.a ORDER BY t1.a", @@ -890,13 +890,33 @@ async fn inner_join_qualified_names() -> Result<()> { ]; for sql in equivalent_sql.iter() { - let ctx = create_join_context_qualified()?; + let ctx = create_join_context_qualified("t1", "t2")?; let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); } Ok(()) } +#[tokio::test] +async fn inner_join_invalid_relation_in_projection() -> Result<()> { + let sql = "select a.a, b.b from a join b on a.a = b.b"; + + let expected = vec![ + "+---+----+----+---+-----+-----+", + "| a | b | c | a | b | c |", + "+---+----+----+---+-----+-----+", + "| 1 | 10 | 50 | 1 | 100 | 500 |", + "| 2 | 20 | 60 | 2 | 200 | 600 |", + "| 4 | 40 | 80 | 4 | 400 | 800 |", + "+---+----+----+---+-----+-----+", + ]; + + let ctx = create_join_context_qualified("a", "b")?; + let actual = execute_to_batches(&ctx, sql).await; + assert_batches_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn inner_join_nulls() { let sql = "SELECT * FROM (SELECT null AS id1) t1 @@ -908,7 +928,7 @@ async fn inner_join_nulls() { "++", ]; - let ctx = create_join_context_qualified().unwrap(); + let ctx = create_join_context_qualified("t1", "t2").unwrap(); let actual = execute_to_batches(&ctx, sql).await; // left and right shouldn't match anything diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index f4153757ffb8..39b7a254ad19 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -227,7 +227,7 @@ fn create_join_context(column_left: &str, column_right: &str) -> Result Result { +fncreate_join_context_qualified(left_name: &str, right_name: &str) -> Result { let ctx = SessionContext::new(); let t1_schema = Arc::new(Schema::new(vec![ @@ -244,7 +244,7 @@ fn create_join_context_qualified() -> Result { ], )?; let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?; - ctx.register_table("t1", Arc::new(t1_table))?; + ctx.register_table(left_name, Arc::new(t1_table))?; let t2_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt32, true), @@ -260,7 +260,7 @@ fn create_join_context_qualified() -> Result { ], )?; let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?; - ctx.register_table("t2", Arc::new(t2_table))?; + ctx.register_table(right_name, Arc::new(t2_table))?; Ok(ctx) } diff --git a/datafusion/core/tests/sql/predicates.rs b/datafusion/core/tests/sql/predicates.rs index e6cb77d9a7c7..93aa8c3fba83 100644 --- a/datafusion/core/tests/sql/predicates.rs +++ b/datafusion/core/tests/sql/predicates.rs @@ -313,7 +313,7 @@ async fn except_with_null_not_equal() { "+-----+-----+", ]; - let ctx = create_join_context_qualified().unwrap(); + let ctx = create_join_context_qualified("t1", "t2").unwrap(); let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); @@ -325,7 +325,7 @@ async fn except_with_null_equal() { EXCEPT SELECT * FROM (SELECT null AS id1, 1 AS id2) t2"; let expected = vec!["++", "++"]; - let ctx = create_join_context_qualified().unwrap(); + let ctx = create_join_context_qualified("t1", "t2").unwrap(); let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); From 5d5e307b32ef67d204ed4cb5fc196a95735421bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 1 Aug 2022 09:11:05 -0600 Subject: [PATCH 2/5] improve test --- datafusion/core/tests/sql/joins.rs | 15 +++------------ datafusion/core/tests/sql/mod.rs | 5 ++++- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index c6fa1ae997a3..d1632277a30b 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -898,19 +898,10 @@ async fn inner_join_qualified_names() -> Result<()> { } #[tokio::test] -async fn inner_join_invalid_relation_in_projection() -> Result<()> { +async fn issue_3002() -> Result<()> { + // repro case for https://github.com/apache/arrow-datafusion/issues/3002 let sql = "select a.a, b.b from a join b on a.a = b.b"; - - let expected = vec![ - "+---+----+----+---+-----+-----+", - "| a | b | c | a | b | c |", - "+---+----+----+---+-----+-----+", - "| 1 | 10 | 50 | 1 | 100 | 500 |", - "| 2 | 20 | 60 | 2 | 200 | 600 |", - "| 4 | 40 | 80 | 4 | 400 | 800 |", - "+---+----+----+---+-----+-----+", - ]; - + let expected = vec!["++", "++"]; let ctx = create_join_context_qualified("a", "b")?; let actual = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &actual); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 39b7a254ad19..a61f954893fe 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -227,7 +227,10 @@ fn create_join_context(column_left: &str, column_right: &str) -> Result Result { +fn create_join_context_qualified( + left_name: &str, + right_name: &str, +) -> Result { let ctx = SessionContext::new(); let t1_schema = Arc::new(Schema::new(vec![ From c66ae962051deea760b6b7d22193e4f995d7f57c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 1 Aug 2022 09:38:12 -0600 Subject: [PATCH 3/5] fix --- datafusion/sql/src/planner.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 593140120966..a9614d80c260 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -1718,18 +1718,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } else { match (var_names.pop(), var_names.pop()) { (Some(name), Some(relation)) if var_names.is_empty() => { - if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) { - // Access to a field of a column which is a structure, example: SELECT my_struct.key - Ok(Expr::GetIndexedField { - expr: Box::new(Expr::Column(field.qualified_column())), - key: ScalarValue::Utf8(Some(name)), - }) - } else { - // table.column identifier - Ok(Expr::Column(Column { - relation: Some(relation), - name, - })) + match schema.field_with_qualified_name(&relation, &name) { + Ok(_) => { + // table.column identifier + Ok(Expr::Column(Column { + relation: Some(relation), + name, + })) + }, + Err(_) => if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) { + // Access to a field of a column which is a structure, example: SELECT my_struct.key + Ok(Expr::GetIndexedField { + expr: Box::new(Expr::Column(field.qualified_column())), + key: ScalarValue::Utf8(Some(name)), + }) + } else { + // table.column identifier + Ok(Expr::Column(Column { + relation: Some(relation), + name, + })) + } } } _ => Err(DataFusionError::NotImplemented(format!( From 2f8474602eeb71275b489fed6fcd499f47922575 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 1 Aug 2022 10:01:53 -0600 Subject: [PATCH 4/5] make logic more robust --- datafusion/sql/src/planner.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index a9614d80c260..423f8bce02f2 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -1720,24 +1720,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { (Some(name), Some(relation)) if var_names.is_empty() => { match schema.field_with_qualified_name(&relation, &name) { Ok(_) => { - // table.column identifier + // found an exact match on a qualified name so this is a table.column identifier Ok(Expr::Column(Column { relation: Some(relation), name, })) }, - Err(_) => if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) { - // Access to a field of a column which is a structure, example: SELECT my_struct.key - Ok(Expr::GetIndexedField { - expr: Box::new(Expr::Column(field.qualified_column())), - key: ScalarValue::Utf8(Some(name)), - }) - } else { - // table.column identifier - Ok(Expr::Column(Column { - relation: Some(relation), - name, - })) + Err(e) => { + let search_term = format!(".{}.{}", relation, name); + if schema.field_names().iter().any(|name| name.as_str().ends_with(&search_term)) { + // this could probably be improved but here we handle the case + // where the qualifier is only a partial qualifier such as when + // referencing "t1.foo" when the available field is "public.t1.foo" + Ok(Expr::Column(Column { + relation: Some(relation), + name, + })) + } else if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) { + // Access to a field of a column which is a structure, example: SELECT my_struct.key + Ok(Expr::GetIndexedField { + expr: Box::new(Expr::Column(field.qualified_column())), + key: ScalarValue::Utf8(Some(name)), + }) + } else { + Err(e) + } } } } From 3093654eeea8ee38b8a047906c9fc18ce5ba8d3f Mon Sep 17 00:00:00 2001 From: Wei-Ting Kuo Date: Tue, 2 Aug 2022 01:39:29 +0800 Subject: [PATCH 5/5] add a simple test case --- datafusion/core/tests/sql/projection.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 363e96c364c6..71ebde5785a4 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -335,3 +335,16 @@ fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) { .collect(); assert_eq!(actual, expected); } + +#[tokio::test] +async fn paralleproject_column_with_same_name_as_relationl() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select a.a from (select 1 as a) as a;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +}