Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18582][SQL] Whitelist LogicalPlan operators allowed in correlated subqueries #16046

Closed
wants to merge 34 commits into from

Conversation

nsyca
Copy link
Contributor

@nsyca nsyca commented Nov 28, 2016

What changes were proposed in this pull request?

This fix puts an explicit list of operators that Spark supports for correlated subqueries.

How was this patch tested?

Run sql/test, catalyst/test and add a new test case on Generate.

nsyca added 29 commits July 29, 2016 17:43
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
@nsyca
Copy link
Contributor Author

nsyca commented Nov 28, 2016

We are reviewing existing test cases on subqueries, especially correlated subqueries, and will open a new JIRA, as part of the umbrella JIRA SPARK-18455, to add new test cases to extend the coverage in this area.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69267 has finished for PR 16046 at commit ca9e1a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// up to the operator producing the correlated values.

// Category 1:
// Leaf node can be anywhere in a correlated subquery.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: LeafNode can only be at the leaves of a tree. So ... can be anywhere... is stretching it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the addition of SubqueryAlias, Distinct, Repartition and BroadcastHint in group 1 (see the next comment), I will rephrase the comment here.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good overall. I left a few minor comments.

// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
// SubqueryAlias can be anywhere in a correlated subquery.
case p: SubqueryAlias =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to check failOnOuterReference for SubqueryAlias, Distinct, Repartition or BroadcastHint. These operators do not contain expressions.

Copy link
Contributor Author

@nsyca nsyca Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will put the code for these 4 operators in group 1, the same group as LeafNode.

Note Distinct, in the future, could be on a subset of the columns of its child's output and might contain expressions. An example is if we have (C1, C2) as output of Distinct's child but through some analysis we know that the relationship of C1 -> C2 is N : 1, we can optimize the Distinct operation to hash/sort only on C1.

// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation has
// only equality correlated predicates.
// It cannot be on a correlation path if the correlation has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: has -> contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change.

// Inner join, like Filter, can be anywhere.
// LeftSemi is a special case of Inner join which returns
// only the first matched row to the right table.
case _: InnerLike | LeftSemi =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot pull-out a predicate which is defined in the right side of a LeftSemi join. LeftSemi join should be treated like all a LeftOuter join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were doing too many code reviews yesterday/today. ;-) In your PR for SPARK-18597, you placed LeftSemi in the same category as InnerJoin.

The Left in LeftSemi does not mean it's a left outer join. It means a "semi" match from the "left" table of an inner join, returning only the first match of the right table (and ignoring subsequent matches). This is why I do not like the terminology. It's confusing. I prefer to call it a (left) early-out (inner) join, or simply early-out join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit that have been reviewing a lot of PRs. However I am quite sure that you cannot define a correlated predicate in the plan on the right hand side of a LEFT SEMI/early-out join because we only output the column of the plan on the left hand side. For example:

select *
from   tbl_a
where  exists (select 1
               from tbl_b
               left semi join( select id
                               from tbl_c
                               where tbl_c.id = tbl_a.id) c
                on c.id = tbl_b.id)

In this example we could not move the correlated predicate tbl_c.id = tbl_a.id because the Left Semi join does not output cid. BTW: In this case it would actually be OK to convert the Left Semi join into an Inner join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I write this lengthy response here is to convince you that we should leave LeftSemi in the same group as InnerJoin. Please bear with me.

The example you gave here demonstrates a limitation of the subquery supported in Spark today. We should plan to be able to handle this case of deep correlation in the future:

select  *
from    t
where   exists (select 1
                from   t2
                where  t2.c1=t1.c1
                and    exists (select 1
                               from   t3
                               where  t3.c2=t1.c1))

And if we do, then we will need to allow LeftSemi to output the columns from the right table.

One way to imagine a use case of LeftSemi is if we have a look up join where the join predicate forms a N:1 relationship, just like between a foreign key and its primary key. The join is effectively a LeftSemi that is guaranteed we only need to find the first matched row and move on to the next row of the left table (just like a hash join that we need to probe the first matched and stop early seeking the next matched in the hash chain). From a run-time viewpoint, a LeftSemi is (almost, more on this later) better than a regular InnerJoin that it does not need to probe for the next matched rows regardless of the chosen join methods: nested-loop join, sort-merge join, or hash join. A LeftSemi, however, dictates which tables can be the left and the right. As the name implies, the left table of a LeftSemi needs to be the N-side of the N:1 join. So in the case that N-side is the smaller table, it could be better to do the regular inner join with 1-side as the left table then perform a compensation on top of the join to remove the duplicate matched rows. Having said that, we can also implement a RightSemi join in the runtime layer so that we can pick any join, LeftSemi, RightSemi, or InnerJoin with a compensation, based on the cost.

Until we wade into the CBO, the whole planning business would be an interesting area, isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to extract predicates from the right hand side of a LeftSemi join. My problem with this is far more practical. Join with a LeftSemi join type does not output any right hand side attributes (see: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L273), the plan breaks are as soon as you extract a correlated predicate from the right hand side of the join and try to rewrite the tree using that predicate. That is all.

It would - fortunately - be quite simple to support this. Just rewrite every LeftSemi join with underlying predicates into an Inner join. I am not entirely sure if we should support this, technically the query is incorrect. Lets defer this to a follow-up PR.

The example you give is different. I do think we should support that. Please note that the example you give will not have any left semi joins during analysis, the left semi joins are introduced during optimization; this makes it relatively straight forward to detect such a nested case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point now. As it is today, pulling up the correlated pred from the right operand of LeftSemi will break.

// but must not host any outer references.
// Note:
// Generator with join=false is treated as Category 4.
case p @ Generate(generator, join, _, _, _, _) if (join) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern match directly on join=true, in this case: case p @ Generate(generator, true, _, _, _, _) =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is generate part of group 2 or group 3? I would say group 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the if (join) is too verbose. I still need to sharpen my Scala coding skill. More concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate with join=true is group 2. The reason I placed in group 3 is because the code handles two different cases when join=true and join=false.

By the way, how to construct an SQL with join=false? I could not find an example in the existing test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I interpret Generate with join=true just like an inner join with the child as the left table and the generator as the right table, which may host outer references from the child of Generate. This is sometimes called the side-way information passing. Currently generator is an expression but it could really be modelled as a (sub)plan, which can contain deep correlation -- where correlated point is at a descendant operator of the root operator of the subplan. In fact, Generate with join=true is just a syntactic sugar of a Join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you only use the columns generated by generator, the optimizer will set join=false. There is no way to do this in SQL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate is much like a join. You are right to envisage a scenario in which we would allow a correlated predicate inside a Generate expression, and I would be happy to add such cases to the white list when that time comes :)... but for now I think we should not allow them just yet.

// Note:
// Generator with join=false is treated as Category 4.
case p @ Generate(generator, join, _, _, _, _) if (join) =>
if (containsOuter(generator)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is going on here. Why only check all the expressions in the operator, when the generator contains an outer reference? Generate only has one expression, the generator, so I think you can safely call failOnOuterReference(p) directly.

Copy link
Contributor Author

@nsyca nsyca Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I was thinking of

if (containsOuter(generator)) {
  failAnalysis( ... )
}

but the code clearly does not reflect my thought. I will make the change.

/** Make sure a plans' subtree does not contain a tagged predicate. */
def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = {
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (p.collect(predicateMap).nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change this line into p.collectFirst(predicateMap).nonEmpty that is a little more efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will make the change in the next PR.

@nsyca
Copy link
Contributor Author

nsyca commented Nov 30, 2016

This latest push 1d32958 addresses all the review comments except the placement of LeftSemi, and includes the removal of an extra space in Optimizer.scala.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69369 has finished for PR 16046 at commit 1d32958.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor Author

nsyca commented Dec 1, 2016

@hvanhovell On the placement of LeftSemi, have you got a chance to read my response? Please share your thought. How are we going to implement the support of deep correlation like the examples you and I raised?

@hvanhovell
Copy link
Contributor

LGTM - pending Jenkins.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69500 has finished for PR 16046 at commit 0c9d0b5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

retest this please

@nsyca
Copy link
Contributor Author

nsyca commented Dec 1, 2016

Thank you. I was reading the test log just to see if my code might cause the failure. It does not look like it does.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69515 has finished for PR 16046 at commit 0c9d0b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Dec 3, 2016

Merging to master/2.1. Thanks!

asfgit pushed a commit that referenced this pull request Dec 3, 2016
…ted subqueries

## What changes were proposed in this pull request?

This fix puts an explicit list of operators that Spark supports for correlated subqueries.

## How was this patch tested?

Run sql/test, catalyst/test and add a new test case on Generate.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #16046 from nsyca/spark18455.0.

(cherry picked from commit 4a3c096)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@asfgit asfgit closed this in 4a3c096 Dec 3, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…ted subqueries

## What changes were proposed in this pull request?

This fix puts an explicit list of operators that Spark supports for correlated subqueries.

## How was this patch tested?

Run sql/test, catalyst/test and add a new test case on Generate.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes apache#16046 from nsyca/spark18455.0.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ted subqueries

## What changes were proposed in this pull request?

This fix puts an explicit list of operators that Spark supports for correlated subqueries.

## How was this patch tested?

Run sql/test, catalyst/test and add a new test case on Generate.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes apache#16046 from nsyca/spark18455.0.
@nsyca nsyca deleted the spark18455.0 branch March 14, 2017 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants