Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ object SubqueryExpression {
case _ => false
}.isDefined
}

/**
* Returns true when an expression contains a subquery
*/
def hasSubquery(e: Expression): Boolean = {
e.find {
case _: SubqueryExpression => true
case _ => false
}.isDefined
}
}

object SubExprUtils extends PredicateHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule


/**
* If one or both of the datasets in the logical [[Except]] operator are purely transformed using
* [[Filter]], this rule will replace logical [[Except]] operator with a [[Filter]] operator by
* flipping the filter condition of the right child.
* {{{
* SELECT a1, a2 FROM Tab1 WHERE a2 = 12 EXCEPT SELECT a1, a2 FROM Tab1 WHERE a1 = 5
* ==> SELECT DISTINCT a1, a2 FROM Tab1 WHERE a2 = 12 AND (a1 is null OR a1 <> 5)
* }}}
*
* Note:
* Before flipping the filter condition of the right node, we should:
* 1. Combine all it's [[Filter]].
* 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition).
*/
object ReplaceExceptWithFilter extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.conf.replaceExceptWithFilter) {
return plan
}

plan.transform {
case Except(left, right) if isEligible(left, right) =>
Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
}
}

private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition

val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap

filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
}

// TODO: This can be further extended in the future.
private def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left, right) match {
case (_, right @ (Project(_, _: Filter) | Filter(_, _))) => verifyConditions(left, right)
case _ => false
}

private def verifyConditions(left: LogicalPlan, right: LogicalPlan): Boolean = {
val leftProjectList = projectList(left)
val rightProjectList = projectList(right)

left.output.size == left.output.map(_.name).distinct.size &&
left.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
right.find(_.expressions.exists(SubqueryExpression.hasSubquery)).isEmpty &&
Project(leftProjectList, nonFilterChild(skipProject(left))).sameResult(
Project(rightProjectList, nonFilterChild(skipProject(right))))
}

private def projectList(node: LogicalPlan): Seq[NamedExpression] = node match {
case p: Project => p.projectList
case x => x.output
}

private def skipProject(node: LogicalPlan): LogicalPlan = node match {
case p: Project => p.child
case x => x
}

private def nonFilterChild(plan: LogicalPlan) = plan.find(!_.isInstanceOf[Filter]).getOrElse {
throw new IllegalStateException("Leaf node is expected")
}

private def combineFilters(plan: LogicalPlan): LogicalPlan = {
@tailrec
def iterate(plan: LogicalPlan, acc: LogicalPlan): LogicalPlan = {
if (acc.fastEquals(plan)) acc else iterate(acc, CombineFilters(acc))
}
iterate(plan, CombineFilters(plan))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,19 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
.internal()
.doc("When true, the apply function of the rule verifies whether the right node of the" +
" except operation is of type Filter or Project followed by Filter. If yes, the rule" +
" further verifies 1) Excluding the filter operations from the right (as well as the" +
" left node, if any) on the top, whether both the nodes evaluates to a same result." +
" 2) The left and right nodes don't contain any SubqueryExpressions. 3) The output" +
" column names of the left node are distinct. If all the conditions are met, the" +
" rule will replace the except operation with a Filter by flipping the filter" +
" condition(s) of the right node.")
.booleanConf
.createWithDefault(true)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1200,6 +1213,8 @@ class SQLConf extends Serializable with Logging {

def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)

def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not being used, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this function is not used anywhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.{Alias, Not}
import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -31,6 +32,7 @@ class ReplaceOperatorSuite extends PlanTest {
val batches =
Batch("Replace Operators", FixedPoint(100),
ReplaceDistinctWithAggregate,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
ReplaceDeduplicateWithAggregate) :: Nil
Expand All @@ -50,6 +52,108 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter while both the nodes are of type Filter") {
val attributeA = 'a.int
val attributeB = 'b.int

val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
val table3 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))

val query = Except(table2, table3)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
(attributeA >= 2 && attributeB < 1)),
Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter while only right node is of type Filter") {
val attributeA = 'a.int
val attributeB = 'b.int

val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))

val query = Except(table1, table2)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
(attributeA >= 2 && attributeB < 1)), table1)).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter while both the nodes are of type Project") {
val attributeA = 'a.int
val attributeB = 'b.int

val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Project(Seq(attributeA, attributeB), table1)
val table3 = Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))

val query = Except(table2, table3)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
(attributeA >= 2 && attributeB < 1)),
Project(Seq(attributeA, attributeB), table1))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter while only right node is of type Project") {
val attributeA = 'a.int
val attributeB = 'b.int

val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
val table3 = Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))

val query = Except(table2, table3)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
(attributeA >= 2 && attributeB < 1)),
Filter(attributeB === 2, Filter(attributeA === 1, table1)))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter while left node is Project and right node is Filter") {
val attributeA = 'a.int
val attributeB = 'b.int

val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
val table3 = Filter(attributeB === 2, Filter(attributeA === 1, table1))

val query = Except(table2, table3)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(table1.output, table1.output,
Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
(attributeA === 1 && attributeB === 2)),
Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1))))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Except with Left-anti Join") {
val table1 = LocalRelation('a.int, 'b.int)
val table2 = LocalRelation('c.int, 'd.int)
Expand Down
57 changes: 57 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/except.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- Tests different scenarios of except operation
create temporary view t1 as select * from values
("one", 1),
("two", 2),
("three", 3),
("one", NULL)
as t1(k, v);

create temporary view t2 as select * from values
("one", 1),
("two", 22),
("one", 5),
("one", NULL),
(NULL, 5)
as t2(k, v);


-- Except operation that will be replaced by left anti join
SELECT * FROM t1 EXCEPT SELECT * FROM t2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To see whether these SQL use the rule you added, you can add EXPLAIN EXTENDED before the actual query SELECT * FROM t1 EXCEPT SELECT * FROM t2

You will see the optimized plan and check whether the rule is applicable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, because our plan is not stable, I am not asking you for outputting the plan in the test cases. This is just to help you debug the codes. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good to know. I have been manually testing it via spark shell.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All verified via spark shell using createOrReplaceTempView

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And please let me know when it is right time to squash the commits..



-- Except operation that will be replaced by Filter: SPARK-22181
SELECT * FROM t1 EXCEPT SELECT * FROM t1 where v <> 1 and v <> 2;


-- Except operation that will be replaced by Filter: SPARK-22181
SELECT * FROM t1 where v <> 1 and v <> 22 EXCEPT SELECT * FROM t1 where v <> 2 and v >= 3;


-- Except operation that will be replaced by Filter: SPARK-22181
SELECT t1.* FROM t1, t2 where t1.k = t2.k
EXCEPT
SELECT t1.* FROM t1, t2 where t1.k = t2.k and t1.k != 'one';


-- Except operation that will be replaced by left anti join
SELECT * FROM t2 where v >= 1 and v <> 22 EXCEPT SELECT * FROM t1;


-- Except operation that will be replaced by left anti join
SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1
MINUS
SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE t1.k = 'one';


-- Except operation that will be replaced by left anti join
SELECT t1.k
FROM t1
WHERE t1.v <= (SELECT max(t2.v)
FROM t2
WHERE t2.k = t1.k)
MINUS
SELECT t1.k
FROM t1
WHERE t1.v >= (SELECT min(t2.v)
FROM t2
WHERE t2.k = t1.k);
Loading