Skip to content

Conversation

@korowa
Copy link
Contributor

@korowa korowa commented May 22, 2022

Which issue does this PR close?

Closes #2509 , closes #2496 .

Rationale for this change

Support non equality predicates / filters in JOIN ON SQL clause

What changes are included in this PR?

Logical plan

Join logical plan now requires Option<Expr> field - filter which should be applied to "equijoined" data. Join planning logic left almost untouched:

  • Inner still planned as Join -> Filter (it allows proper filter pushdown)
  • in case of Left / Right planner still pushes down predicates relates only to inner join input, and now it allows predicates based on outer input
  • Full allows predicates in ON clause

Physical plan

Now, after building left/right indices vectors as a result of equijoin part of ON clause, HashJoin applies filter expression (if any has been provided) to batch of rows with according indices and produce new vectors with indices of joined rows after filtering. Intermediate batch contains only required for filter expression columns.

HashJoinExec physical plan node requires new Option<JoinFilter> field - JoinFilter structure encapsulates all necessary data to create intermediate batch and apply filter:

  • physical expression - filter expression itself, built against intermediate batch schema, it thus requires following two fields to evaluate expression while execution
  • column indices - stores indices and join sides on columns included in intermediate batch
  • schema - intermediate batch schema

Are there any user-facing changes?

Plan builder and DF join methods now require optional expression as an argument.

Does this PR break compatibility with Ballista?

New fields added to both logical and physical plan join nodes.
Related PR - apache/datafusion-ballista#36

@github-actions github-actions bot added datafusion development-process Related to development process of DataFusion labels May 22, 2022
@korowa
Copy link
Contributor Author

korowa commented May 23, 2022

Though this PR is functionally fine, I suppose it not to be most efficient in terms of runtime and resource usage (rebuild stage in case of RIGHT/OUTER join is a bit confusing) but it seems to fits in with columnar style data processing.

Anyway, suggestions/comments/questions are welcome - it wold be great to validate if it's an appropriate (at least for now) join filter implementation or there are better ways of doing this.

@andygrove
Copy link
Member

Thank you @korowa for being the first to test out the new process around building against Ballista with changes there as well. Please let me know if you have any feedback on the process or suggestions to make it easier.

@yjshen
Copy link
Member

yjshen commented May 23, 2022

@korowa Great to see this happening! How do you think to support filter in SortMergeJoinExec as well?

Cc @richox, you might be interested in this as well.

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa Great work! 👍 i have left some comments.

let mask = as_boolean_array(&filter_result);

let left_filtered = PrimitiveArray::<UInt64Type>::from(
compute::filter(&left_indices, mask)?.data().clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here, we apply filter after build_batch_from_indices means : apply filter after join operator. Is there a chance filter before join.
if wrong plz correct me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at physical level filter always applied after actual join operation.

And you're right - there are cases when it's fine to filter inputs before join step - but here it's responsibility of logical planner / optimizer - they both are able to check if predicate (or its part) could be pushed before join - closer to scan.

So, when it comes to physical join, logical plan supposed to contain only t1.field < t2.field-like predicates which could not be applied before join

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @korowa that moving filters out of the On clause is better left to the planner and optimizer.

This is an important point though -- perhaps we could add a comment to the docstring of the HashJoin operator explaining that filter should ideally only contain expressions that can not be pushed to the inputs of the join, one way or the other

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa @alamb ❤️ Thanks your explanations! I think after merge this Pr, we should add a check in filter_push_down.rs to improve this situation.

@alamb
Copy link
Contributor

alamb commented May 23, 2022

I have this on my queue to review carefully tomorrow

korowa and others added 2 commits May 23, 2022 22:09
@korowa
Copy link
Contributor Author

korowa commented May 23, 2022

Thank you @korowa for being the first to test out the new process around building against Ballista with changes there as well. Please let me know if you have any feedback on the process or suggestions to make it easier.

The guide in PR template is pretty clear, thank you!

Talking about suggestions - the only thing that comes into my mind is Github Action triggered by "PR merged" event which could

  1. reset dev/build-arrow-ballista.sh
  2. search for related open PR in ballista, and merge it

sounds complicated but unfortunately this is the only way I can imagine to keep two repos synced between each other 😞

@korowa
Copy link
Contributor Author

korowa commented May 23, 2022

@korowa Great to see this happening! How do you think to support filter in SortMergeJoinExec as well?

Cc @richox, you might be interested in this as well.

I'll check it out - there definitely should be point where joined records from both sides are collected as batch, which could be used for filter evaluation.

@alamb alamb changed the title Optional filter in JOIN ON clause Add support for non equality predicates in ON clause of LEFT, RIGHT, and FULL joins May 24, 2022
@alamb alamb changed the title Add support for non equality predicates in ON clause of LEFT, RIGHT, and FULL joins Support for non equality predicates in ON clause of LEFT, RIGHT, and FULL joins May 24, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @korowa -- this is very very cool. 😍

I think this PR needs a few more tests but otherwise looks very nice. At a minimum, I think we need:

  1. basic sql_integration tests for RIGHT and FULL join with non equijoin ON predicates
  2. tests for the logic that does single table predicate pushdown to join inputs

I had some code structure comments / suggestions, but I don't think any of them is required. I just had some ideas which may help to improve the code readability.

It would also be neat to add some more sophisticated tests that use multiple record batches, as join inputs, but perhaps that is beyond the scope of this PR

cc @Dandandan

&right,
JoinType::Inner,
(vec![Column::from_name("a")], vec![Column::from_name("a")]),
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend tests:

  1. A filter on an inner join and ensuring that the filter pushdown pushes it correctly
  2. A filter on an outer (left and full) showing that it is not pushed down (or that it is only pushed down on the non-preserved relation)

(if the extra filter is not pushed, I think it would be fine to file a follow on ticket to add that functionality -- I bet some others in the community might want to pick it up)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushdown logic for ON condition is not supported in this PR, I've added tests (currently ignored) for this, and added an issue #2619


assert!(res.is_err());
assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id >= Utf8(\"44\")]");
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id >= Utf8(\"44\")]");
"SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
let expected = vec![
"+-------+---------+---------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This answer looks good to me

})
.collect::<Result<join_utils::JoinOn>>()?;

let join_filter = match filter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intermediate schema is very much tied to how the Join is implemented, right? In other words, if we changed the order that the hash Join internally stored its columns I wonder if some / all of this code would need to change (and be hard to find / know to change). Thus I wonder if it might be best put into the join node itself

Copy link
Contributor Author

@korowa korowa May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, but decided that it would be kind of mess to add a bunch of logical plan related imports and entities to hash_join.
And I guess, if join internal storage is going to change, that will lead us to changing ColumnIndices somehow, and planner still is going to be affected (but I may be wrong).

let mask = as_boolean_array(&filter_result);

let left_filtered = PrimitiveArray::<UInt64Type>::from(
compute::filter(&left_indices, mask)?.data().clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @korowa that moving filters out of the On clause is better left to the planner and optimizer.

This is an important point though -- perhaps we could add a comment to the docstring of the HashJoin operator explaining that filter should ideally only contain expressions that can not be pushed to the inputs of the join, one way or the other


match join_type {
JoinType::Inner | JoinType::Left => {
// For both INNER and LEFT joins, input arrays contains only indices for matched data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something bothers me about this code treating Left and Right joins differently -- I would expect they look like mirrors of each other and Full to be treated differently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my initial plan, but it turned out there are no nulls appended in case of left join while building matched indices. All null values are handled later, using visited left side indices

// In case of RIGHT and FULL join, left_indices could contain null values - these rows,
// where no match has been found, should retain in result arrays (thus join condition is satified)
//
// So, filter should be applied only to matched rows, and in case right (outer, batch) index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic I think should also apply left joins.

As for FULL joins, I think it means if there is no match for either left or right inputs, it should a row should still be produced.

I think the best way to resolve this comment is to add some tests showing correct answers for RIGHT and FULL joins in the sql_integration suite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration tests for more join types added

Ok(())
}

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests look good 👍

korowa and others added 2 commits May 25, 2022 23:39
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@xudong963 xudong963 added the enhancement New feature or request label May 26, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 thanks again @korowa !

@alamb alamb merged commit b6fb0dd into apache:master May 26, 2022
@korowa
Copy link
Contributor Author

korowa commented May 26, 2022

🚀 thanks again @korowa !

Thank you for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

development-process Related to development process of DataFusion enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support optional filter in Join Improve SQL planner & logical plan support for JOIN conditions

6 participants