Skip to content

Commit

Permalink
[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrde…
Browse files Browse the repository at this point in the history
…ring correctly during physical planning

## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.

Author: maryannxue <maryann.xue@gmail.com>

Closes #19281 from maryannxue/spark-21998.
  • Loading branch information
maryannxue authored and gatorsmile committed Sep 22, 2017
1 parent 5ac9685 commit 5960686
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 21 deletions.
Expand Up @@ -96,6 +96,29 @@ object SortOrder {
sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions)
}

/**
* Returns if a sequence of SortOrder satisfies another sequence of SortOrder.
*
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A
* or of A's prefix. Here are examples of ordering A satisfying ordering B:
* <ul>
* <li>ordering A is [x, y] and ordering B is [x]</li>
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1]</li>
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1]</li>
* </ul>
*/
def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = {
if (ordering2.isEmpty) {
true
} else if (ordering2.length > ordering1.length) {
false
} else {
ordering2.zip(ordering1).forall {
case (o2, o1) => o1.satisfies(o2)
}
}
}
}

/**
Expand Down
Expand Up @@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {

// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
val orderingMatched = if (requiredOrdering.length > child.outputOrdering.length) {
false
} else {
requiredOrdering.zip(child.outputOrdering).forall {
case (requiredOrder, childOutputOrder) =>
childOutputOrder.satisfies(requiredOrder)
}
}

if (!orderingMatched) {
SortExec(requiredOrdering, global = false, child = child)
} else {
child
}
} else {
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
child
} else {
SortExec(requiredOrdering, global = false, child = child)
}
}

Expand Down
Expand Up @@ -102,13 +102,22 @@ case class SortMergeJoinExec(
}

/**
* For SMJ, child's output must have been sorted on key or expressions with the same order as
* key, so we can get ordering for key from child's output ordering.
* The utility method to get output ordering for left or right side of the join.
*
* Returns the required ordering for left or right child if childOutputOrdering does not
* satisfy the required ordering; otherwise, which means the child does not need to be sorted
* again, returns the required ordering for this child with extra "sameOrderExpressions" from
* the child's outputOrdering.
*/
private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder])
: Seq[SortOrder] = {
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
val requiredOrdering = requiredOrders(keys)
if (SortOrder.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key)
}
} else {
requiredOrdering
}
}

Expand Down
62 changes: 62 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Expand Up @@ -24,6 +24,8 @@ import scala.language.existentials
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
}
}

test("test SortMergeJoin output ordering") {
val joinQueries = Seq(
"SELECT * FROM testData JOIN testData2 ON key = a",
"SELECT * FROM testData t1 JOIN " +
"testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
"SELECT * FROM testData t1 JOIN " +
"testData2 t2 ON t1.key = t2.a JOIN " +
"testData3 t3 ON t2.a = t3.a JOIN " +
"testData t4 ON t1.key = t4.key")

def assertJoinOrdering(sqlString: String): Unit = {
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val physicalJoins = physical.collect {
case j: SortMergeJoinExec => j
}
val executed = df.queryExecution.executedPlan
val executedJoins = executed.collect {
case j: SortMergeJoinExec => j
}
// This only applies to the above tested queries, in which a child SortMergeJoin always
// contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
// appear as parent of SortMergeJoin.
executed.foreach {
case s: SortExec => s.foreach {
case j: SortMergeJoinExec => fail(
s"No extra sort should be added since $j already satisfies the required ordering"
)
case _ =>
}
case _ =>
}
val joinPairs = physicalJoins.zip(executedJoins)
val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
assert(joinPairs.size == numOfJoins)

joinPairs.foreach {
case(join1, join2) =>
val leftKeys = join1.leftKeys
val rightKeys = join1.rightKeys
val outputOrderingPhysical = join1.outputOrdering
val outputOrderingExecuted = join2.outputOrdering

// outputOrdering should always contain join keys
assert(
SortOrder.orderingSatisfies(
outputOrderingPhysical, leftKeys.map(SortOrder(_, Ascending))))
assert(
SortOrder.orderingSatisfies(
outputOrderingPhysical, rightKeys.map(SortOrder(_, Ascending))))
// outputOrdering should be consistent between physical plan and executed plan
assert(outputOrderingPhysical == outputOrderingExecuted,
s"Operator $join1 did not have the same output ordering in the physical plan as in " +
s"the executed plan.")
}
}

joinQueries.foreach(assertJoinOrdering)
}
}

0 comments on commit 5960686

Please sign in to comment.