From d75f432ae1df3e910ba557ac20137245b27079c5 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Sat, 31 Jul 2021 12:30:04 +1000 Subject: [PATCH 1/3] better join order resolution logic --- datafusion/src/logical_plan/builder.rs | 127 +++++++++++++++++++++++-- datafusion/tests/sql.rs | 11 +++ 2 files changed, 129 insertions(+), 9 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index a742f346207a..df6a82b766c5 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -287,16 +287,125 @@ impl LogicalPlanBuilder { .into_iter() .zip(join_keys.1.into_iter()) .map(|(l, r)| { - let mut swap = false; let l = l.into(); - let left_key = l.clone().normalize(&self.plan).or_else(|_| { - swap = true; - l.normalize(right) - }); - if swap { - (r.into().normalize(&self.plan), left_key) - } else { - (left_key, r.into().normalize(right)) + let r = r.into(); + let lr = l.relation.clone(); + let rr = r.relation.clone(); + + match (lr, rr) { + (Some(lr), Some(rr)) => { + let l_is_left = + self.plan.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &lr + && field.name() == &l.name + }) + }); + let l_is_right = right.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &lr + && field.name() == &l.name + }) + }); + let r_is_left = + self.plan.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &rr + && field.name() == &r.name + }) + }); + let r_is_right = right.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &rr + && field.name() == &r.name + }) + }); + match (l_is_left, l_is_right, r_is_left, r_is_right) { + (true, _, _, true) => (Ok(l), Ok(r)), + (_, true, true, _) => (Ok(r), Ok(l)), + (_, _, _, _) => ( + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + l + ))), + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + r + ))), + ), + } + } + (Some(lr), None) => { + let l_is_left = + self.plan.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &lr + && field.name() == &l.name + }) + }); + let l_is_right = right.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &lr + && field.name() == &l.name + }) + }); + match (l_is_left, l_is_right) { + (true, _) => (Ok(l), r.normalize(right)), + (_, true) => (r.normalize(&self.plan), Ok(l)), + (_, _) => ( + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + l + ))), + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + r + ))), + ), + } + } + (None, Some(rr)) => { + let r_is_left = + self.plan.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &rr + && field.name() == &r.name + }) + }); + let r_is_right = right.all_schemas().iter().any(|schema| { + schema.fields().iter().any(|field| { + field.qualifier().unwrap() == &rr + && field.name() == &r.name + }) + }); + match (r_is_left, r_is_right) { + (true, _) => (Ok(r), l.normalize(right)), + (_, true) => (l.normalize(&self.plan), Ok(r)), + (_, _) => ( + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + l + ))), + Err(DataFusionError::Plan(format!( + "Column {} not found in provided schemas", + r + ))), + ), + } + } + (None, None) => { + let mut swap = false; + let left_key = + l.clone().normalize(&self.plan).or_else(|_| { + swap = true; + l.normalize(right) + }); + if swap { + (r.normalize(&self.plan), left_key) + } else { + (left_key, r.normalize(right)) + } + } } }) .unzip(); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index bfe2f2fc4913..68c4c4ebe694 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1730,6 +1730,17 @@ async fn equijoin() -> Result<()> { let actual = execute(&mut ctx, sql).await; assert_eq!(expected, actual); } + + let mut ctx = create_join_context_qualified()?; + let equivalent_sql = [ + "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t1.a = t2.a ORDER BY t1.a", + "SELECT t1.a, t2.b FROM t1 INNER JOIN t2 ON t2.a = t1.a ORDER BY t1.a", + ]; + let expected = vec![vec!["1", "100"], vec!["2", "200"], vec!["4", "400"]]; + for sql in equivalent_sql.iter() { + let actual = execute(&mut ctx, sql).await; + assert_eq!(expected, actual); + } Ok(()) } From 8fa689cbd99aefac09e407efb297b59a61bae005 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Wed, 4 Aug 2021 08:52:20 +1000 Subject: [PATCH 2/3] simplify by using existing methods --- datafusion/src/logical_plan/builder.rs | 127 ++++++++----------------- 1 file changed, 39 insertions(+), 88 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index df6a82b766c5..1a5e82c80ce1 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -289,108 +289,59 @@ impl LogicalPlanBuilder { .map(|(l, r)| { let l = l.into(); let r = r.into(); + let lr = l.relation.clone(); let rr = r.relation.clone(); match (lr, rr) { (Some(lr), Some(rr)) => { - let l_is_left = - self.plan.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &lr - && field.name() == &l.name - }) - }); - let l_is_right = right.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &lr - && field.name() == &l.name - }) - }); - let r_is_left = - self.plan.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &rr - && field.name() == &r.name - }) - }); - let r_is_right = right.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &rr - && field.name() == &r.name - }) - }); + let l_is_left = self + .plan + .schema() + .field_with_qualified_name(&lr, &l.name); + let l_is_right = + right.schema().field_with_qualified_name(&lr, &l.name); + let r_is_left = self + .plan + .schema() + .field_with_qualified_name(&rr, &r.name); + let r_is_right = + right.schema().field_with_qualified_name(&rr, &r.name); + match (l_is_left, l_is_right, r_is_left, r_is_right) { - (true, _, _, true) => (Ok(l), Ok(r)), - (_, true, true, _) => (Ok(r), Ok(l)), - (_, _, _, _) => ( - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - l - ))), - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - r - ))), - ), + (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)), + (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)), + (_, _, _, _) => { + (l.normalize(&self.plan), r.normalize(right)) + } } } (Some(lr), None) => { - let l_is_left = - self.plan.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &lr - && field.name() == &l.name - }) - }); - let l_is_right = right.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &lr - && field.name() == &l.name - }) - }); + let l_is_left = self + .plan + .schema() + .field_with_qualified_name(&lr, &l.name); + let l_is_right = + right.schema().field_with_qualified_name(&lr, &l.name); + match (l_is_left, l_is_right) { - (true, _) => (Ok(l), r.normalize(right)), - (_, true) => (r.normalize(&self.plan), Ok(l)), - (_, _) => ( - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - l - ))), - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - r - ))), - ), + (Ok(_), _) => (Ok(l), r.normalize(right)), + (_, Ok(_)) => (r.normalize(&self.plan), Ok(l)), + _ => (l.normalize(&self.plan), r.normalize(right)), } } (None, Some(rr)) => { - let r_is_left = - self.plan.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &rr - && field.name() == &r.name - }) - }); - let r_is_right = right.all_schemas().iter().any(|schema| { - schema.fields().iter().any(|field| { - field.qualifier().unwrap() == &rr - && field.name() == &r.name - }) - }); + let r_is_left = self + .plan + .schema() + .field_with_qualified_name(&rr, &r.name); + let r_is_right = + right.schema().field_with_qualified_name(&rr, &r.name); + match (r_is_left, r_is_right) { - (true, _) => (Ok(r), l.normalize(right)), - (_, true) => (l.normalize(&self.plan), Ok(r)), - (_, _) => ( - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - l - ))), - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - r - ))), - ), + (Ok(_), _) => (Ok(r), l.normalize(right)), + (_, Ok(_)) => (l.normalize(&self.plan), Ok(r)), + _ => (l.normalize(&self.plan), r.normalize(right)), } } (None, None) => { From f6ae14773d22db84fc2e386cb6b590a6e4513489 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Thu, 5 Aug 2021 07:25:34 +1000 Subject: [PATCH 3/3] remove unneeded clone and minor formatting --- datafusion/src/logical_plan/builder.rs | 41 +++++++++----------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 1a5e82c80ce1..0dfc1e7aa048 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -290,39 +290,28 @@ impl LogicalPlanBuilder { let l = l.into(); let r = r.into(); - let lr = l.relation.clone(); - let rr = r.relation.clone(); - - match (lr, rr) { + match (&l.relation, &r.relation) { (Some(lr), Some(rr)) => { - let l_is_left = self - .plan - .schema() - .field_with_qualified_name(&lr, &l.name); + let l_is_left = + self.plan.schema().field_with_qualified_name(lr, &l.name); let l_is_right = - right.schema().field_with_qualified_name(&lr, &l.name); - let r_is_left = self - .plan - .schema() - .field_with_qualified_name(&rr, &r.name); + right.schema().field_with_qualified_name(lr, &l.name); + let r_is_left = + self.plan.schema().field_with_qualified_name(rr, &r.name); let r_is_right = - right.schema().field_with_qualified_name(&rr, &r.name); + right.schema().field_with_qualified_name(rr, &r.name); match (l_is_left, l_is_right, r_is_left, r_is_right) { (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)), (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)), - (_, _, _, _) => { - (l.normalize(&self.plan), r.normalize(right)) - } + _ => (l.normalize(&self.plan), r.normalize(right)), } } (Some(lr), None) => { - let l_is_left = self - .plan - .schema() - .field_with_qualified_name(&lr, &l.name); + let l_is_left = + self.plan.schema().field_with_qualified_name(lr, &l.name); let l_is_right = - right.schema().field_with_qualified_name(&lr, &l.name); + right.schema().field_with_qualified_name(lr, &l.name); match (l_is_left, l_is_right) { (Ok(_), _) => (Ok(l), r.normalize(right)), @@ -331,12 +320,10 @@ impl LogicalPlanBuilder { } } (None, Some(rr)) => { - let r_is_left = self - .plan - .schema() - .field_with_qualified_name(&rr, &r.name); + let r_is_left = + self.plan.schema().field_with_qualified_name(rr, &r.name); let r_is_right = - right.schema().field_with_qualified_name(&rr, &r.name); + right.schema().field_with_qualified_name(rr, &r.name); match (r_is_left, r_is_right) { (Ok(_), _) => (Ok(r), l.normalize(right)),