Skip to content

Commit

Permalink
Shouldn't inferFilters if it contains SubqueryExpression
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Nov 23, 2017
1 parent 21a7bfd commit c2f6a49
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
Expand Up @@ -87,12 +87,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
InferFiltersFromConstraints,
BooleanSimplification,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -633,6 +633,26 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
}
}

private def canInferJoinFilters(constraints: Set[Expression]): Boolean = {
val subqueryExps = constraints.filter { exp =>
exp match {
case i: In =>
i.list.head match {
case s: SubqueryExpression => true
case _ => false
}
case e: Exists =>
e.children.head match {
case s: SubqueryExpression => true
case _ => false
}
case _ =>
false
}
}
subqueryExps.isEmpty
}

private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val newFilters = filter.constraints --
Expand All @@ -658,7 +678,12 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
case None =>
additionalConstraints.reduceOption(And)
}
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join

if (newConditionOpt.isDefined && canInferJoinFilters(constraints)) {
Join(left, right, joinType, newConditionOpt)
} else {
join
}
}
}

Expand Down
30 changes: 30 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
Expand Down Expand Up @@ -2757,4 +2758,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-22573 Shouldn't inferFilters if it contains SubqueryExpression") {
withTempView("lineitem", "orders", "customer") {
Seq((1, 1.0)).toDF("L_ORDERKEY", "L_QUANTITY").createTempView("lineitem")
Seq((1, 1, 12.3, "s1")).toDF("O_ORDERKEY", "O_CUSTKEY", "O_TOTALPRICE", "O_ORDERDATE").
createTempView("orders")
Seq((1, "n1")).toDF("C_CUSTKEY", "C_NAME").createTempView("customer")

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val executedPlan = spark.sql(
"""
|SELECT C_NAME, C_CUSTKEY, O_ORDERKEY, O_ORDERDATE, O_TOTALPRICE,
|SUM(L_QUANTITY) AS SUM_QUANTITY
|FROM CUSTOMER, ORDERS, LINEITEM
|WHERE O_ORDERKEY IN (SELECT L_ORDERKEY
| FROM LINEITEM
| GROUP BY L_ORDERKEY
| HAVING SUM(L_QUANTITY) > 300)
| AND C_CUSTKEY = O_CUSTKEY
| AND O_ORDERKEY = L_ORDERKEY
|GROUP BY C_NAME, C_CUSTKEY, O_ORDERKEY, O_ORDERDATE, O_TOTALPRICE
|ORDER BY O_TOTALPRICE DESC, O_ORDERDATE
""".stripMargin).queryExecution.executedPlan
val isSubqueryExists = executedPlan.treeString
.contains("SortMergeJoin [_1#223], [L_ORDERKEY#226#278], LeftSemi")
assert(!isSubqueryExists)
}
}
}
}

0 comments on commit c2f6a49

Please sign in to comment.