Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2205] [SQL] Avoid unnecessary exchange operators in multi-way joins #7773

Closed

Conversation

JoshRosen
Copy link
Contributor

This PR adds PartitioningCollection, which is used to represent the outputPartitioning for SparkPlans with multiple children (e.g. ShuffledHashJoin). So, a SparkPlan can have multiple descriptions of its partitioning schemes. Taking ShuffledHashJoin as an example, it has two descriptions of its partitioning schemes, i.e. left.outputPartitioning and right.outputPartitioning. So when we have a query like select * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x) will only have three Exchange operators (when shuffled joins are needed) instead of four.

The code in this PR was authored by @yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.

Review on Reviewable

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #38962 has finished for PR 7773 at commit 801b807.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen force-pushed the multi-way-join-planning-improvements branch from 801b807 to 4a99204 Compare July 30, 2015 03:26
case x =>
throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this change for now. Once we have the nullSafe concept, we can better describe how the result of this join operator is partitioned. For example, right now, it is not safe to say that the output of this operator is partitioned by the rightKeys when we have a left outer join (because rows with null keys are not clustered).

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #38966 has finished for PR 7773 at commit 4a99204.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@@ -57,6 +57,8 @@ case class BroadcastHashOuterJoin(
override def requiredChildDistribution: Seq[Distribution] =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug fix.

@@ -94,8 +95,12 @@ sealed trait Partitioning {
*/
def compatibleWith(other: Partitioning): Boolean

/** Returns the expressions that are used to key the partitioning. */
def keyExpressions: Seq[Expression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems keyExpressions is not used at all and I do not remember when we added it. So, I am removing it.

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #38980 has finished for PR 7773 at commit 73913f7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #38989 has finished for PR 7773 at commit 2963857.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@JoshRosen JoshRosen changed the title [SPARK-2205] [SQL] [WIP] Avoid unnecessary exchange operators in multi-way joins [SPARK-2205] [SQL] Avoid unnecessary exchange operators in multi-way joins Jul 30, 2015

test("PartitioningCollection") {
// First, we disable broadcast join.
val origThreshold = conf.autoBroadcastJoinThreshold
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could use the new SQLTestUtils withConf and withTempTable helper functions, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make this change now.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

test("PartitioningCollection") {
// First, we disable broadcast join.
val origThreshold = conf.autoBroadcastJoinThreshold
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you have to set AUTO_BROADCASTJOIN_THRESHOLD to -1 to disable broadcast, not 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like the implementation might be out of sync w.r.t. the docs for AUTO_BROADCASTJOIN_THRESHOLD...

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39080 has finished for PR 7773 at commit 2963857.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39089 has finished for PR 7773 at commit 8104ea8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@@ -122,7 +127,10 @@ case object SinglePartition extends Partitioning {
case _ => false
}

override def keyExpressions: Seq[Expression] = Nil
override def guarantees(other: Partitioning): Boolean = other match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't SinglePartition technically guarantee any partitioning which produces a single partition, such as HashPartitioning with a single partition? I guess that hash partitioning with one partition shouldn't ever occur, though.

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39092 has finished for PR 7773 at commit 8acac75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@JoshRosen
Copy link
Contributor Author

Let's wait and see if we can merge #7807 to remove compatibleWith, since that would end up avoiding the potential for confusing between compatibleWith and guarantees.

asfgit pushed a commit that referenced this pull request Jul 31, 2015
… from Exchange

While reviewing yhuai's patch for SPARK-2205 (#7773), I noticed that Exchange's `compatible` check may be incorrectly returning `false` in many cases.  As far as I know, this is not actually a problem because the `compatible`, `meetsRequirements`, and `needsAnySort` checks are serving only as short-circuit performance optimizations that are not necessary for correctness.

In order to reduce code complexity, I think that we should remove these checks and unconditionally rewrite the operator's children.  This should be safe because we rewrite the tree in a single bottom-up pass.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #7807 from JoshRosen/SPARK-9489 and squashes the following commits:

9d76ce9 [Josh Rosen] [SPARK-9489] Remove compatibleWith, meetsRequirements, and needsAnySort checks from Exchange
@JoshRosen JoshRosen force-pushed the multi-way-join-planning-improvements branch 2 times, most recently from 58b27eb to 0c18da3 Compare July 31, 2015 00:53
@JoshRosen JoshRosen force-pushed the multi-way-join-planning-improvements branch from 0c18da3 to 5c45924 Compare July 31, 2015 00:53
@SparkQA
Copy link

SparkQA commented Jul 31, 2015

Test build #39131 has finished for PR 7773 at commit 5c45924.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

* guarantees the same partitioning scheme described by `other`.
*/
// TODO: Add an example once we have the `nullSafe` concept.
def guarantees(other: Partitioning): Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think semanticEqual is a better name? I think this method is basically doing a equality check. For example, if other is a HashPartitioning('a :: Nil, 10) and this is a SinglePartition. We probably do not want to return true because the parent of this operator can be a join and the sibling of this operator can be HashPartitioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should only consider a name like semanticEqual or semanticEquiv if a.guarantees(b) implies b.guarantees(a) and vice-versa.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, makes sense. Then, semanticEqual is not a good name because once we have the concept of nullSafe. This method will not have the commutative property because nullSafe hash partitioning can be treated as nullUnsafe hash partitioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Let's leave this for now.

@SparkQA
Copy link

SparkQA commented Aug 1, 2015

Test build #39370 has finished for PR 7773 at commit 5c45924.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 2, 2015

Test build #39444 has finished for PR 7773 at commit 5c45924.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@yhuai
Copy link
Contributor

yhuai commented Aug 2, 2015

The failed test (HiveCompatibilitySuite's semijoin) is tracked by https://issues.apache.org/jira/browse/SPARK-9482.

@yhuai
Copy link
Contributor

yhuai commented Aug 2, 2015

test this please

@yhuai
Copy link
Contributor

yhuai commented Aug 3, 2015

@JoshRosen If you think changes in this PR are good, how about we merge it once it passes tests?

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39474 has finished for PR 7773 at commit 5c45924.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PartitioningCollection(partitionings: Seq[Partitioning])

@yhuai
Copy link
Contributor

yhuai commented Aug 3, 2015

OK. I am merging it to master.

@asfgit asfgit closed this in 114ff92 Aug 3, 2015
@JoshRosen
Copy link
Contributor Author

Yeah, sorry for timing out here. Consider this a post-hoc LGTM.

@JoshRosen JoshRosen deleted the multi-way-join-planning-improvements branch August 3, 2015 03:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants