Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix join column handling logic for On and Using constraints #605

Merged
merged 6 commits into from
Jul 7, 2021

Conversation

houqp
Copy link
Member

@houqp houqp commented Jun 23, 2021

Which issue does this PR close?

Follow up for #55 (review).

Closes #601.
Closes #671.

Also fixed a bug where index_of_column won't error out on duplicated fields.

Rationale for this change

In MySQL and Postgres, Join with On constraints produces output with join column from both relations preserved. For example:

SELECT * FROM test t1 JOIN test t2 ON t1.id = t2.id;

produces:

id id
1 1
2 2

While join with Using constraints deduplicates the join column:

SELECT * FROM test t1 JOIN test t2 USING (id);

produces:

id
1
2

However, in our current implementation, join column dedup is applied in all cases. This PR changes the behavior so it's consistent with MySQL and Postgres.

Here comes the annoying part.

Note that for join with Using constraint, users can still project join columns using relations from both sides. For example SELECT t1.id, t2.id FROM test t1 JOIN test t2 USING (id) produces the same output as SELECT * FROM test t1 JOIN test t2 ON t1.id = t2.id.

This means for Using joins, we need to model a join column as a single shared column between both relations. Current DFField struct only allows a field/column to have a single qualifier, so I ended adding a new shared_qualifiers field to DFField struct to handle this edge-case. Our logical plan builder will be responsible for setting this field when building join queries with using constraints. During query optimization and planning, the shared_qualifiers field is used to look up column by name and qualifier.

Other alternatives include changing the qualifer field of DFField to an option of enum to account for single qualifier and shared qualifiers. None of these approaches felt elegant to me. I am curious if anyone has ideas or suggestions on how to better implement this behavior.

What changes are included in this PR?

  • Expose JoinConstraints to ballista.proto
  • Unify JoinType enum between logical and physical planes
  • Added context execution tests to enforce semantics for On and Using joins
  • Refactored dfschema module to use index_of_column_by_name in both index_of_column and field_with_qualified_name methods.

Are there any user-facing changes?

Join with On constraints will now output joined columns from both relations without deduplication. Dedup is only applied to join with Using constraints.

use crate::physical_plan::expressions::Column;

/// All valid types of joins.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum JoinType {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse the same enum from logical plane.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @houqp -- I think this PR is an improvement.

Also, I did some more testing and there are still cases that don't appear to be working (which I think would be fine to do as a follow on PR) -- they no longer panic :)

echo "1" > /tmp/foo.csv
cargo run -p datafusion-cli 
> 
CREATE EXTERNAL TABLE foo(bar int)
STORED AS CSV
LOCATION '/tmp/foo.csv';
0 rows in set. Query took 0.000 seconds.
> select * from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
Plan("Schema contains duplicate unqualified field name 'bar'")

> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
Plan("Schema contains duplicate unqualified field name 'bar'")

> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 USING(bar);
NotImplemented("Unsupported compound identifier '[\"f1\", \"bar\"]'")

> select * from foo as f1 JOIN foo as f2 USING(bar);
+-----+
| bar |
+-----+
| 1   |
+-----+

@@ -1259,6 +1259,96 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn left_join_using() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 definitely an improvement

datafusion/src/logical_plan/dfschema.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Jun 23, 2021

@houqp

The approach we seem to be taking is to try and keep the information about where a column came from in a join -- e.g. that a output DF field could be referred to as either f1.foo or f2.foo for example, which is getting complicated

It seems like a core challenge is in the semantics of the * expansion in select * from ... type queries which varies depending on type of join. Have you considered perhaps focusing in that area rather than trying to track the optional qualifiers through the plans?

So for example, in a f1 JOIN f1 have the output schema contain both f1.foo and f2.foo but then change the expansion of * to have somethign like f1.foo as foo?

@houqp
Copy link
Member Author

houqp commented Jun 23, 2021

Good catch on the regressions, I will get them fixed tonight with unit tests.

It seems like a core challenge is in the semantics of the * expansion in select * from ... type queries which varies depending on type of join.

Yeah, that's exactly the problem.

Initially, I tried the approach of keeping the join columns from both relations (i.e. f1.foo and f2.foo) in the logical schema. Then I ran into the problem of it breaking the physical planning invariants. Because for Using join, we are only producing a single join column in the output, the physical plan schema can only has a single field for the join column. The schema from the logical plan will need to match that as well in order to honor that invariant.

@alamb
Copy link
Contributor

alamb commented Jun 23, 2021

Because for Using join, we are only producing a single join column in the output

is this something we can change (aka update the using join to produce columns and then rely on projection pushdown to prune the uneeded ones out?)

@houqp
Copy link
Member Author

houqp commented Jun 23, 2021

is this something we can change (aka update the using join to produce columns and then rely on projection pushdown to prune the uneeded ones out?)

That's a good point, the single column output semantic doesn't need to be enforced at the join node level. let me give this a try as well 👍 This could simplify our join handling logic. I will also double check to see if this would result in nontrivial runtime overhead for us.

@houqp
Copy link
Member Author

houqp commented Jun 24, 2021

@alamb, I wasn't able to reproduce the errors you showed in #605 (review)

Here is what I got:

> CREATE EXTERNAL TABLE foo(bar int)
STORED AS CSV
LOCATION '/tmp/foo.csv';
0 rows in set. Query took 0.001 seconds.
> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
+-----+-----+
| bar | bar |
+-----+-----+
| 3   | 3   |
| 1   | 1   |
| 4   | 4   |
| 2   | 2   |
+-----+-----+
4 rows in set. Query took 0.022 seconds.
>  select * from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
+-----+-----+
| bar | bar |
+-----+-----+
| 3   | 3   |
| 1   | 1   |
| 4   | 4   |
| 2   | 2   |
+-----+-----+
4 rows in set. Query took 0.022 seconds.
> select f1.bar, f2.bar from foo as f1 JOIN foo as f2 USING(bar);
+-----+-----+
| bar | bar |
+-----+-----+
| 2   | 2   |
| 3   | 3   |
| 1   | 1   |
| 4   | 4   |
+-----+-----+
4 rows in set. Query took 0.020 seconds.
> 

Perhaps you rain those tests with a different build?

@alamb
Copy link
Contributor

alamb commented Jun 24, 2021

Perhaps you rain those tests with a different build?

I am not sure what I tested with, but I re-ran the tests at b8da356 and everything is working -- sorry for the noise.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this PR is better than master (aka it doesn't panic) I would be fine merging it in as is and iterating on the design subsequently. Alternately, if you plan to make more changes as part of this PR @houqp I can wait to merge it in

@alamb alamb added the datafusion Changes in the datafusion crate label Jun 24, 2021
@houqp
Copy link
Member Author

houqp commented Jun 24, 2021

let's wait for my alternative solutions and review them together before the merge unless there is urgent issue in master that this PR addresses. I would like to avoid merging in a premature abstraction to reduce noise :)

@alamb
Copy link
Contributor

alamb commented Jun 27, 2021

Marking as draft so it is clearer from the list of PRs that this one is not quite ready to go (and thus I don't merge it accidentally)

@alamb alamb marked this pull request as draft June 27, 2021 10:54
get rid of shared field and move column expansion logic into plan
builder and optimizer.
@houqp houqp marked this pull request as ready for review July 4, 2021 04:35
@houqp houqp requested a review from alamb July 4, 2021 04:35
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conversion handled by shared method in serde/mod.rs

}
};

JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join schemas are now consistent across all join types and constraints. We implement the join column "merge" semantic externally in plan builder and optimizer.

if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
return Ok(i);
}
fn index_of_column_by_name(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored this code into its own helper method to reduce duplicated code between field_with_qualified_name and index_of_column. This should also make index_of_column more robust.

@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
}
}

/// Recursively replace all Column expressions in a given expression tree with Column expressions
/// provided by the hash map argument.
pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used in predicate push down optimizer to push predicates to both sides of the join clause.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use pub(crate)?

pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> {
struct ColumnNormalizer<'a, 'b> {
schemas: &'a [&'b DFSchemaRef],
pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change from schema to logical plan so we can extract join columns for join clauses with using constraints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I wrote some tests that will need to be adjusted in #689 but that is no big deal

on: JoinOnRef,
join_type: &JoinType,
) -> Schema {
pub fn build_join_schema(left: &Schema, right: &Schema, join_type: &JoinType) -> Schema {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same schema build simplification as the one we introduced in logical plan builder.

@@ -560,7 +560,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// SELECT c1 AS m FROM t HAVING c1 > 10;
// SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10;
//
resolve_aliases_to_exprs(&having_expr, &alias_map)
let having_expr = resolve_aliases_to_exprs(&having_expr, &alias_map)?;
normalize_col(having_expr, &projected_plan)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed if having expression referenced using join columns with unqualified column name.

@houqp
Copy link
Member Author

houqp commented Jul 4, 2021

@alamb reimplemented the logic based on your suggestion in #605 (comment).

Turns out there are many more edge-cases that need to be handled for using join other than wildcard expansion:

  • predicate push down on join columns
  • normalize unqualified column expressions that reference join columns with qualifiers

I have implemented support for all these edge-cases, but decided to leave out the wildcard expansion change as a follow up PR to keep the diff easier to review.

UPDATE: filed #678 as follow up.

Copy link
Member

@jimexist jimexist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@alamb
Copy link
Contributor

alamb commented Jul 5, 2021

Thanks @houqp -- I'll try and review this later today but I may run out of time in which case I'll get it done tomorrow

// field to lookup is qualified but current field is unqualified.
(Some(_), None) => false,
// field to lookup is unqualified, no need to compare qualifier
_ => field.name() == name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to write out cases, (None, None) and (None, Some(_) and union them. this makes it clearer?

for schema in &schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
0 => continue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would this be possible?

Copy link
Member Author

@houqp houqp Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are iterating through schemas from all plan nodes in the provided plan tree, each plan node could have different schemas, so when we do the fields_with_unqualified_name look up, some of these plan nodes will no contain a field that matches self.name. We just pick the first plan node that contains schema field matches the unqualified name.

on.iter()
.map(|entry| {
std::iter::once(entry.0.clone())
.chain(std::iter::once(entry.1.clone()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just flatmap with a vec![entry.0.clone(), entry.1.clone()] - it's cleaner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted to avoid an extra memory allocation incurred by Vec::new, but i will double check to see if once chain is actually generating the optimal code without memory allocations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimexist sent #704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.

Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

.map(|f| {
std::iter::once(f.qualified_column())
// we need to push down filter using unqualified column as well
.chain(std::iter::once(f.unqualified_column()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect::<HashSet<_>>();
let right_columns = &right
.fields()
.iter()
.map(|f| f.qualified_column())
.map(|f| {
std::iter::once(f.qualified_column())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good @houqp -- I went through the test changes and code carefully and I think this is good to go.

@Dandandan I am not sure if you want to take a look at this given it changes how the Join relations are encoded.

})
.map(|(idx, _)| idx)
.collect();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it probably doesn't matter but you could avoid the Vec allocation by something like:

        let matches = self....; 
        match matches.next() {
        let name = matches.next() {
          None => // error about no field
          Some(name) { 
            if matches.next().is_some() => // error about ambiguous reference
            else name
          }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, fixed in #703.

pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> {
struct ColumnNormalizer<'a, 'b> {
schemas: &'a [&'b DFSchemaRef],
pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I wrote some tests that will need to be adjusted in #689 but that is no big deal

@@ -354,6 +356,43 @@ impl LogicalPlan {
| LogicalPlan::CreateExternalTable { .. } => vec![],
}
}

/// returns all `Using` join columns in a logical plan
pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this function feels like it might better belong in some sort of utils rather than a method on LogicalPlan -- perhaps https://github.com/houqp/arrow-datafusion/blob/qp_join/datafusion/src/optimizer/utils.rs#L50

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb this is used outside of the optimizers as well, for example, the logical plan builder. With that context, do you still think it should live in optimizer utils module?

@@ -232,6 +241,38 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
}
}

fn optimize_join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@@ -901,20 +979,61 @@ mod tests {
format!("{:?}", plan),
"\
Filter: #test.a LtEq Int64(1)\
\n Join: #test.a = #test.a\
\n Join: #test.a = #test2.a\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"| 1 | 7 | 10 | 4 | 70 |",
"| 2 | 8 | 20 | 5 | 80 |",
"+----+----+----+----+----+",
"+----+----+----+----+----+----+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes make sense to me

@alamb
Copy link
Contributor

alamb commented Jul 6, 2021

Not sure if this will conflict with #660

@alamb
Copy link
Contributor

alamb commented Jul 7, 2021

I am going to merge this PR in (as it conflicts with #689) -- I think we can make additional improvements as follow on PRs.

@alamb
Copy link
Contributor

alamb commented Jul 7, 2021

Thanks again @houqp

@alamb
Copy link
Contributor

alamb commented Jul 7, 2021

I also merge apache/master into this branch locally on my machine and re-ran the tests to verify there were no conflicts

@alamb alamb merged commit 18c581c into apache:master Jul 7, 2021
@alamb
Copy link
Contributor

alamb commented Jul 7, 2021

Aaand it looks like I forgot to fetch apache/master on my machine prior to testing -- fixed in #694

@houqp
Copy link
Member Author

houqp commented Jul 10, 2021

Apologize for being too busy last week and haven't had the time to update my PR, I am going to send a new PR to address all the feedbacks shortly.

@houqp houqp deleted the qp_join branch July 10, 2021 20:04
@houqp houqp added the api change Changes the API exposed to users of the crate label Jul 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
3 participants