Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13383][SQL] Keep broadcast hint after column pruning #11260

Closed
wants to merge 16 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Feb 19, 2016

JIRA: https://issues.apache.org/jira/browse/SPARK-13383

What changes were proposed in this pull request?

When we do column pruning in Optimizer, we put additional Project on top of a logical plan. However, when we already wrap a BroadcastHint on a logical plan, the added Project will hide BroadcastHint after later execution.

We should take care of BroadcastHint when we do column pruning.

How was the this patch tested?

Unit test is added.

@@ -432,7 +432,8 @@ class Analyzer(
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
case a: Attribute =>
attributeRewrites.get(a).getOrElse(a).withQualifiers(a.qualifiers)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is proposed to fix in #11261. I included this change here is because without this the following unit test can not pass.

@SparkQA
Copy link

SparkQA commented Feb 19, 2016

Test build #51527 has finished for PR 11260 at commit 9639223.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 20, 2016

ping @marmbrus @liancheng @rxin @davies

Project(allReferences.filter(c.outputSet.contains).toSeq, c)
c match {
case BroadcastHint(p) =>
BroadcastHint(Project(allReferences.filter(c.outputSet.contains).toSeq, p))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other operator will be inserted between Join and BroadcastHint?

Maybe we could have a rule to pull up BroadcastHint until Join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to do it is to define the hint in the join operator, and then it is safe (can never be pushed anywhere else)

@SparkQA
Copy link

SparkQA commented Feb 21, 2016

Test build #51625 has finished for PR 11260 at commit 37c3523.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -260,6 +260,20 @@ case class Join(
condition: Option[Expression])
extends BinaryNode with PredicateHelper {

private def isBrocastHint(plan: LogicalPlan): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what i meant was making the broadcast hint just a property on the logical plan, so it can never be pushed away from it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I see. But as we need to add two broadcast hint for left and right plans. Considering Join is broadly used in many places as pattern matching usage, I am afraid that we need to change too many places for this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. each case Join(left, right, joinType, condition) => would need to change to case Join(left, right, joinType, condition, leftBroadcastHint, rightBroadcastHint).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok for you? If so, I will make the change as that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well you don't need to make the hint a parameter of the case class, but just some field you can set ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I've made it as variables to set & copy from other Join operator. Please take a look if this update is good. Thanks.

@viirya
Copy link
Member Author

viirya commented Feb 22, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 22, 2016

Test build #51659 has finished for PR 11260 at commit 03dac45.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 22, 2016

retest this please.

@viirya
Copy link
Member Author

viirya commented Feb 22, 2016

Why jenkins can't retest?

@gatorsmile
Copy link
Member

It could be caused by the build failure. The shade JAR is too large now. After merging the latest build, I hit an issue when using mvn.

@marmbrus
Copy link
Contributor

This feels kind of hacky to me. @rxin why doesn't the hint just change the statistics again?

@SparkQA
Copy link

SparkQA commented Feb 22, 2016

Test build #51672 has finished for PR 11260 at commit ad63367.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 22, 2016

I still don't like this approach and find it too hacky.

I talked to @marmbrus more offline, and maybe it's easiest to just rely on stats overriding, e.g. when broadcast hint is declared, we set the size of the relation (just in the hint operator) to the smallest possible number (1?). Then it should be robust to pushdowns.

@viirya
Copy link
Member Author

viirya commented Feb 23, 2016

@rxin @marmbrus that is good idea. Let me update this.

@viirya
Copy link
Member Author

viirya commented Feb 23, 2016

retest this please.

@rxin
Copy link
Contributor

rxin commented Feb 23, 2016

We can remove the old broadcast hint matching in strategy now, can't we?

@viirya
Copy link
Member Author

viirya commented Feb 23, 2016

Indeed. We can now. Will update it.

@@ -82,7 +82,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object CanBroadcast {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably turn this patter matching into just an if/else statement now

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51731 has finished for PR 11260 at commit a3cadbb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -137,6 +139,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
assert(planned.size === 1)
}

test("broadcasthint sets relation statistics to smallest value") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry would be great to put this in the optimizer suites, rather than this file which is an end-to-end suite.

I just took a look at the suites available. I'd rename JoinOrderSuite to JoinOptimizationSuite, and then put this case there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I just can't find appropriate suite for it. I'd use JoinOptimizationSuite. Thanks.

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51738 has finished for PR 11260 at commit 3360007.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51743 has finished for PR 11260 at commit 1e35aa3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 23, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51763 has finished for PR 11260 at commit 01dfad4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 23, 2016

@rxin I've addressed your comments. Please see if this is appropriate. Thanks.

comparePlans(optimized, expected)

assert(optimized.collect {
case b @ BroadcastHint(_) if b.statistics.sizeInBytes == 1 => 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a nit, but I think what you really want to test here is something like:

val broadcastChildren = optimized.collect {
  case Join(_, CanBroadcast(r), _, _) => r
}

assert(broadcastChildren == 1)

With the current test something could break in Project (for example) that would prevent the broadcast from actually happening.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it seems we can't import CanBroadcast into this test, I do update according to your comment with a little change (just check its statistics.sizeInBytes). Please see if it is appropriate now. Thanks.

@marmbrus
Copy link
Contributor

Implementation LGTM overall, minor comment on tests.

@marmbrus
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51842 has finished for PR 11260 at commit b58710f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 24, 2016

I think the failed test is caused by updated column pruning rule.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51858 has finished for PR 11260 at commit 88535ee.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 24, 2016

retest this please.

@viirya
Copy link
Member Author

viirya commented Feb 24, 2016

Failure (at test_mllib.R#133): kmeans ...

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51868 has finished for PR 11260 at commit 88535ee.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 24, 2016

It is weird. Some other PRs like #11344 (just document change) also failed at this SparkR unit tests.

@viirya
Copy link
Member Author

viirya commented Feb 24, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51882 has finished for PR 11260 at commit 88535ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Looks good, merging to master.

@asfgit asfgit closed this in f373986 Feb 24, 2016
@viirya viirya deleted the keep-broadcasthint branch December 27, 2023 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants