Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26065][SQL] Change query hint from a LogicalPlan to a field #23036

Closed
wants to merge 8 commits into from

Conversation

maryannxue
Copy link
Contributor

@maryannxue maryannxue commented Nov 14, 2018

What changes were proposed in this pull request?

The existing query hint implementation relies on a logical plan node ResolvedHint to store query hints in logical plans, and on Statistics in physical plans. Since ResolvedHint is not really a logical operator and can break the pattern matching for existing and future optimization rules, it is a issue to the Optimizer as the old AnalysisBarrier was to the Analyzer.

Given the fact that all our query hints are either 1) a join hint, i.e., broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only need to add a hint field on the Join plan and that will be a good enough solution for the current hint usage.

This PR is to let Join node have a hint for its left sub-tree and another hint for its right sub-tree and each hint is a merged result of all the effective hints specified in the corresponding sub-tree. The "effectiveness" of a hint, i.e., whether that hint should be propagated to the Join node, is currently consistent with the hint propagation rules originally implemented in the Statistics approach. Note that the ResolvedHint node still has to live through the analysis stage because of the Dataset interface, but it will be got rid of and moved to the Join node in the "pre-optimization" stage.

This PR also introduces a change in how hints work with join reordering. Before this PR, hints would stop join reordering. For example, in "a.join(b).join(c).hint("broadcast").join(d)", the broadcast hint would stop d from participating in the cost-based join reordering while still allowing reordering from under the hint node. After this PR, though, the broadcast hint will not interfere with join reordering at all, and after reordering if a relation associated with a hint stays unchanged or equivalent to the original relation, the hint will be retained, otherwise will be discarded. For example, the original plan is like "a.join(b).hint("broadcast").join(c).hint("broadcast").join(d)", thus the join order is "a JOIN b JOIN c JOIN d". So if after reordering the join order becomes "a JOIN b JOIN (c JOIN d)", the plan will be like "a.join(b).hint("broadcast").join(c.join(d))"; but if after reordering the join order becomes "a JOIN c JOIN b JOIN d", the plan will be like "a.join(c).join(b).hint("broadcast").join(d)".

How was this patch tested?

Added new tests.

@maryannxue
Copy link
Contributor Author

maryannxue commented Nov 14, 2018

@SparkQA
Copy link

SparkQA commented Nov 14, 2018

Test build #98836 has finished for PR 23036 at commit 785a423.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good jobs! The major issues are the test case coverage.

// returned by cache lookup should not have hint info. If we lookup the cache with a
// semantically same plan with a different hint info, `CacheManager.useCachedData` will take
// care of it and retain the hint info in the lookup input plan.
statsOfPlanToCache.copy(hints = HintInfo())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val df2 = testRelation.join(testRelation)
val df1Optimized = Optimize.execute(df1.analyze)
val df2Optimized = Optimize.execute(df2.analyze)
assertSameResult(df1Optimized, df2Optimized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a new test suite for EliminateResolvedHint. We only need to compare the plans and verifies the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is to test Join.doCanonicalize().

@@ -453,6 +454,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case Some(serde) => table.identifier :: serde :: Nil
case _ => table.identifier :: Nil
}
case hint: JoinHint if hint.leftHint.isEmpty && hint.rightHint.isEmpty => Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid adding this? Let us add override def simpleString in case class Join? Does this help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can override stringArgs in Join.

case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
commonNaturalJoinProcessing(left, right, joinType, usingCols, None, hint)
case j @ Join(left, right, NaturalJoin(joinType), condition, hint)
if j.resolvedExceptNatural =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: two more space

@@ -40,23 +40,33 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
if (!conf.cboEnabled || !conf.joinReorderEnabled) {
plan
} else {
// Use a map to track the hints on the join items. If a join relation turns out unchanged
// at the end of the join reorder, we can apply the original hint back to it if any.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a few test cases to ensure this works as expected.

@@ -164,25 +167,35 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
* was involved in an explicit cross join. Also returns the entire list of join conditions for
* the left-deep tree.
*/
def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
def flattenJoin(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the changes in this function, we need a few test cases in the rule ReorderJoin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are in JoinHintsSuite: "hint preserved after join reorder".

// except and intersect are semi/anti-joins which won't return more data then
// their left argument, so the broadcast hint should be propagated here
case i: Intersect => collectHints(i.left)
case e: Except => collectHints(e.left)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test cases for Intersect and Except are needed.

@@ -40,23 +40,33 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
if (!conf.cboEnabled || !conf.joinReorderEnabled) {
plan
} else {
// Use a map to track the hints on the join items. If a join relation turns out unchanged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to define "unchanged"? If (a join b) join c becomes (b join a) join c, is there any hit we want to retain?

val (leftPlans, leftConditions) = extractInnerJoins(left)
val (rightPlans, rightConditions) = extractInnerJoins(right)
case Join(left, right, _: InnerLike, Some(cond), hint) =>
hint.leftHint.map(hintMap.put(left, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for purely side-effect function, use foreach instead of map

* Hint that is associated with a [[Join]] node, with [[HintInfo]] on its left child and on its
* right child respectively.
*/
case class JoinHint(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is wrong here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean should be 2 spaces instead of 4 before leftHint and rightHint?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100648 has finished for PR 23036 at commit 93f33d9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100652 has finished for PR 23036 at commit ee0c844.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100653 has finished for PR 23036 at commit 470d682.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

input: Seq[(LogicalPlan, InnerLike)],
conditions: Seq[Expression],
leftPlans: Seq[LogicalPlan],
hintMap: Map[Seq[LogicalPlan], HintInfo]): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the map key is Seq[LogicalPlan]?

Copy link
Contributor Author

@maryannxue maryannxue Jan 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After ReorderJoin, new conditions might be pushed into a join relation. For example, in https://github.com/apache/spark/pull/23036/files#diff-fb10f33381c6d7cc8bfbde63d7f2c557R109, the join order has remained the same, but the first join between "a" and "b" now has a new condition, as "a.a1 = b.b1". I'd still wanna treat it as the same join, thus retaining its hint. If we were to compare the join LogicalPlan, they would not match. Since ReorderJoin is simply dealing with left-deep trees, as long as seq of join child relations are fixed, the join order is fixed too. So we can compare the seq of join child relations instead, in order to accommodate this "new conditions pushed down" situation.

def collectHints(plan: LogicalPlan): Seq[HintInfo] = {
plan match {
case h: ResolvedHint => collectHints(h.child) :+ h.hints
case u: UnaryNode => collectHints(u.child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's safe to collect hint through other operators. e.g. Generate is a unary node which produces more data than its child, and we may add more hints in the future which can't be propagated through operators.

I think a safer way is to only collect hints from the ResolvedHint operator if it's a child of Join.

Copy link
Contributor Author

@maryannxue maryannxue Jan 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is following the original behavior (which is the original hint bottom-up propagation logic in stats visitor) although I'm more inclined to make it work as you suggested here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll start a follow-up PR if any hint behavior needs to be revisited

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an umbrella JIRA and includes all these follow-up JIRAs. For example, add a new conf for enabling/disabling the silent ignorance of inapplicable hints.

@SparkQA
Copy link

SparkQA commented Jan 5, 2019

Test build #100761 has finished for PR 23036 at commit f51e31d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class JoinHint(leftHint: Option[HintInfo], rightHint: Option[HintInfo])

@@ -115,6 +115,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateResolvedHint,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add it to nonExcludableRules

@gatorsmile
Copy link
Member

LGTM except a few minor comments.

@@ -288,7 +288,8 @@ case class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression])
condition: Option[Expression],
hint: JoinHint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about Option[JoinHint ] with default to None?

Copy link
Contributor Author

@maryannxue maryannxue Jan 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make sure that whenever the constructor is called, the caller is clearly aware of this hint parameter and will set it right. This happens mostly in the optimizer where the rules transform a join node into a new one, and not with a copy constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can skip setting the default value then, but on the other side the default value helps making the diff of this patch smaller and I think in general makes sense, since most of the time we are not concerned about the hint. Anyway I am fine also without the default value.

super.doCanonicalize().asInstanceOf[Join].copy(hint = JoinHint.NONE)

// Do not include an empty join hint in string description
protected override def stringArgs: Iterator[Any] = super.stringArgs.filter { e =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we move to a Option[JoinHint] is this still needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with Option[JoinHint] without the default value, if it can help us get rid of this hack.

@SparkQA
Copy link

SparkQA commented Jan 6, 2019

Test build #100844 has finished for PR 23036 at commit 17b7cce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Replaces [[ResolvedHint]] operators from the plan. Move the [[HintInfo]] to associated [[Join]]
* operators, otherwise remove it if no [[Join]] operator is matched.
*/
object EliminateResolvedHint extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to run it at the beginning of optimizer? Can we run it at the end of analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of the Dataset interface. The ResolvedHint of the join children nodes would have been gone by the time we construct a join node.


// Ignore hint for canonicalization
protected override def doCanonicalize(): LogicalPlan =
super.doCanonicalize().asInstanceOf[Join].copy(hint = JoinHint.NONE)
Copy link
Contributor

@cloud-fan cloud-fan Jan 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about copy(hint = JoinHint.NONE). doCanonicalize()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd cause stack overflow.

super.doCanonicalize().asInstanceOf[Join].copy(hint = JoinHint.NONE)

// Do not include an empty join hint in string description
protected override def stringArgs: Iterator[Any] = super.stringArgs.filter { e =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

val hintArg = if (hint.leftHint.isEmpty && hint.rightHint.isEmpty) Nil else Seq(hint)
Seq(left, right, joinType, condition) ++ hintArg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to do it in a way that would be "extendable", say, it would work with any future change of the constructor (although we don't expect the constructor of logical operators to change much).

@cloud-fan
Copy link
Contributor

Thanks for the nice cleanup! LGTM except some minor comments.

@SparkQA
Copy link

SparkQA commented Jan 7, 2019

Test build #100899 has finished for PR 23036 at commit 97377dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 98be895 Jan 7, 2019
asfgit pushed a commit that referenced this pull request Jan 13, 2019
## What changes were proposed in this pull request?

This is to fix a bug in #23036 that would cause a join hint to be applied on node it is not supposed to after join reordering. For example,
```
  val join = df.join(df, "id")
  val broadcasted = join.hint("broadcast")
  val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
```
There should only be 2 broadcast hints on `join2`, but after join reordering there would be 4. It is because the hint application in join reordering compares the attribute set for testing relation equivalency.
Moreover, it could still be problematic even if the child relations were used in testing relation equivalency, due to the potential exprId conflict in nested self-join.

As a result, this PR simply reverts the join reorder hint behavior change introduced in #23036, which means if a join hint is present, the join node itself will not participate in the join reordering, while the sub-joins within its children still can.

## How was this patch tested?

Added new tests

Closes #23524 from maryannxue/query-hint-followup-2.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

The existing query hint implementation relies on a logical plan node `ResolvedHint` to store query hints in logical plans, and on `Statistics` in physical plans. Since `ResolvedHint` is not really a logical operator and can break the pattern matching for existing and future optimization rules, it is a issue to the Optimizer as the old `AnalysisBarrier` was to the Analyzer.

Given the fact that all our query hints are either 1) a join hint, i.e., broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only need to add a hint field on the Join plan and that will be a good enough solution for the current hint usage.

This PR is to let `Join` node have a hint for its left sub-tree and another hint for its right sub-tree and each hint is a merged result of all the effective hints specified in the corresponding sub-tree. The "effectiveness" of a hint, i.e., whether that hint should be propagated to the `Join` node, is currently consistent with the hint propagation rules originally implemented in the `Statistics` approach. Note that the `ResolvedHint` node still has to live through the analysis stage because of the `Dataset` interface, but it will be got rid of and moved to the `Join` node in the "pre-optimization" stage.

This PR also introduces a change in how hints work with join reordering. Before this PR, hints would stop join reordering. For example, in "a.join(b).join(c).hint("broadcast").join(d)", the broadcast hint would stop d from participating in the cost-based join reordering while still allowing reordering from under the hint node. After this PR, though, the broadcast hint will not interfere with join reordering at all, and after reordering if a relation associated with a hint stays unchanged or equivalent to the original relation, the hint will be retained, otherwise will be discarded. For example, the original plan is like "a.join(b).hint("broadcast").join(c).hint("broadcast").join(d)", thus the join order is "a JOIN b JOIN c JOIN d". So if after reordering the join order becomes "a JOIN b JOIN (c JOIN d)", the plan will be like "a.join(b).hint("broadcast").join(c.join(d))"; but if after reordering the join order becomes "a JOIN c JOIN b JOIN d", the plan will be like "a.join(c).join(b).hint("broadcast").join(d)".

## How was this patch tested?

Added new tests.

Closes apache#23036 from maryannxue/query-hint.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…tive Hints

## What changes were proposed in this pull request?

This is to fix a bug in apache#23036, which would lead to an exception in case of two consecutive hints.

## How was this patch tested?

Added a new test.

Closes apache#23501 from maryannxue/query-hint-followup.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is to fix a bug in apache#23036 that would cause a join hint to be applied on node it is not supposed to after join reordering. For example,
```
  val join = df.join(df, "id")
  val broadcasted = join.hint("broadcast")
  val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
```
There should only be 2 broadcast hints on `join2`, but after join reordering there would be 4. It is because the hint application in join reordering compares the attribute set for testing relation equivalency.
Moreover, it could still be problematic even if the child relations were used in testing relation equivalency, due to the potential exprId conflict in nested self-join.

As a result, this PR simply reverts the join reorder hint behavior change introduced in apache#23036, which means if a join hint is present, the join node itself will not participate in the join reordering, while the sub-joins within its children still can.

## How was this patch tested?

Added new tests

Closes apache#23524 from maryannxue/query-hint-followup-2.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants