Skip to content

Commit

Permalink
Yin's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sameeragarwal committed Mar 8, 2016
1 parent d3c4739 commit 21d8897
Showing 1 changed file with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,87 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
import org.apache.spark.sql.test.SharedSQLContext


class ReorderedPredicateSuite extends QueryTest with SharedSQLContext with PredicateHelper {
class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {

setupTestData()

// Verifies that the IsNotNull operators precede rest of the operators
private def verifyOrder(condition: Expression): Unit = {
splitConjunctivePredicates(condition).sliding(2).foreach { case Seq(x, y) =>
// Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
// and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
private def verifyStableOrder(before: Expression, after: Expression): Unit = {
val oldPredicates = splitConjunctivePredicates(before)
splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
// Verify IsNotNull operator ordering
assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])

// Verify stable sort order
if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
(!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
}
}
}

test("null ordering in filter predicates") {
val physicalPlan = sql(
val query = sql(
"""
|SELECT * from testData
|WHERE value != '5' AND value IS NOT NULL
|WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
""".stripMargin)
.queryExecution.sparkPlan
assert(physicalPlan.find(_.isInstanceOf[Filter]).isDefined)
physicalPlan.collect {
.queryExecution

val logicalPlan = query.optimizedPlan
val physicalPlan = query.sparkPlan
assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)

val logicalCondition = logicalPlan.collect {
case logical.Filter(condition, _) =>
condition
}.head

val physicalCondition = physicalPlan.collect {
case Filter(condition, _) =>
verifyOrder(condition)
}
condition
}.head

verifyStableOrder(logicalCondition, physicalCondition)
}

test("null ordering in join predicates") {
sqlContext.cacheManager.clearCache()
val physicalPlan = sql(
val query = sql(
"""
|SELECT * FROM testData t1
|LEFT SEMI JOIN testData t2
|ON t1.key = t2.key
|AND t1.key + t2.key != 5
|AND CONCAT(t1.value, t2.value) IS NOT NULL
""".stripMargin)
.queryExecution.sparkPlan
.queryExecution

val logicalPlan = query.optimizedPlan
val physicalPlan = query.sparkPlan
assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
physicalPlan.collect {

val logicalCondition = logicalPlan.collect {
case Join(_, _, _, condition) =>
condition.get
}.head

val physicalCondition = physicalPlan.collect {
case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
verifyOrder(conditionOpt.get)
}
conditionOpt.get
}.head

verifyStableOrder(logicalCondition, physicalCondition)
}
}

0 comments on commit 21d8897

Please sign in to comment.