Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning #19281

Closed
wants to merge 3 commits into from

Conversation

maryannxue
Copy link
Contributor

What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.

For example, J = {A join B on key1 = key2}

  1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
  2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
  3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the behavior of getKeyOrdering(keys, childOutputOrdering) to:

  1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
  2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.

How was this patch tested?

Added new test cases.
Passed all integration tests.

*
* Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix.
*/
def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about moving this to SortOrder object :

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I had struggled where to put this, in SortOrder or SparkPlan. It doesn't look like anywhere else is using Seq[SortOrder] so far, so I chose to leave this out of SortOrder. I think, though, if we see potential usage of Seq[SortOrder] elsewhere, it might be worth to wrap it as a class. Agree? Either way, I could put it into SortOrder for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkPlan is the node for physical operator in SQL so doesn't feel like a good place to have this. Since one would want to have all methods related to SortOrder in a single place, the object class feels better option. We can revisit if there are more such methods being added to that object and refac to a class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us first move it to SortOrder.

@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
}

def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comparing the counts does not ensure that the sorts are in right place. I wish there was an easier way to pass that here but I can't think of any

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: since this is only used by one test case, we could put it inside the test case method and not make it class level

keys.zip(childOutputOrdering).map { case (key, childOrder) =>
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
val requiredOrdering = requiredOrders(keys)
if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. cc @wzhfy who last touched this code

@maropu
Copy link
Member

maropu commented Sep 20, 2017

@gatorsmile @cloud-fan could you trigger tests if it is worth fixing? Thanks.

}
} else {
true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please simplify it to

    if (required.isEmpty) {
      true
    } else if (required.length > actual.length) {
      false
    } else {
      required.zip(actual).forall { case (requiredOrder, actualOrder) =>
        actualOrder.satisfies(requiredOrder)
      }
    }

*
* Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix.
*/
def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us first move it to SortOrder.

keys.zip(childOutputOrdering).map { case (key, childOrder) =>
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
val requiredOrdering = requiredOrders(keys)
if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here to explain the reason.

@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
}

def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add function comments to explain what it does

@gatorsmile
Copy link
Member

If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering

This is not accurate. It depends on the length of required ordering and the length of child ordering.

@maryannxue
Copy link
Contributor Author

bq. This is not accurate. It depends on the length of required ordering and the length of child ordering.

You are right. I did it right in the code but made a mistake in the description here. Thanks for pointing out, @gatorsmile!

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 20, 2017

Test build #81966 has finished for PR 19281 at commit 7893935.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.

What's the harm of this? I think only EnsureRequirements cares about outputOrdering.

@maryannxue
Copy link
Contributor Author

@cloud-fan Please refer to https://issues.apache.org/jira/browse/SPARK-18591. The plan is to apply the optimization during physical planning stage, and specifically, when creating Aggregate physical operator. So without the outputOrdering being accurate, such optimizations cannot be done at physical planning stage. And even in post-physical-planning stage, it has to be pushed after EnsureRequirements.

@maryannxue
Copy link
Contributor Author

Thank you very much for the feedback, @tejasapatil, @gatorsmile! All the suggestions/comments have been addressed by my latest check-in.

@SparkQA
Copy link

SparkQA commented Sep 20, 2017

Test build #81980 has finished for PR 19281 at commit a14c9be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
*
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
* or of A's prefix.
Copy link
Contributor

@wzhfy wzhfy Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to add an example in comment: E.g. If ordering A is [x, y] and required ordering B is [x], then A satisfies B.

* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
* or of A's prefix.
*/
def orderingSatisfies(seq1: Seq[SortOrder], seq2: Seq[SortOrder]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seq1 => ordering, seq2 => requiredOrdering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had originally named them "actualOrdering" and "requiredOrdering" in SparkPlan, but since I've moved the function here to SortOrder, thought the names should be more general.

* The utility method to get output ordering for left or right side of the join.
*
* Returns the required ordering for left or right child if childOutputOrdering does not
* satisfy the required ordering; otherwise, in which case the child will not be re-sorted,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in case the child does not need to be sorted

// outputOrdering should be consistent between physical plan and executed plan
assert(outputOrderingPhysical == outputOrderingExecuted,
s"Physical operator $join1 did not have the same output ordering as " +
s"corresponding executed operator $join2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: physical/executed operator => physical/executed plan?

@maryannxue
Copy link
Contributor Author

Thank you, @wzhfy, very much for the comments! I have made corresponding changes in my latest commit.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #82054 has finished for PR 19281 at commit 78f41e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 5960686 Sep 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants