Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16174][SQL] Improve OptimizeIn optimizer to remove literal repetitions #13876

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate
}

override def children: Seq[Expression] = value +: list
lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])

override def nullable: Boolean = children.exists(_.nullable)
override def foldable: Boolean = children.forall(_.foldable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] {
}

/**
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
* which is much faster
* Optimize IN predicates:
* 1. Removes literal repetitions.
* 2. Replaces [[In (value, seq[Literal])]] with optimized version
* [[InSet (value, HashSet[Literal])]] which is much faster.
*/
case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, here is regression. Originally, v could be non-deterministic.

case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
list.size > conf.optimizerInSetConversionThreshold =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.size > conf.optimizerInSetConversionThreshold) {
val hSet = newList.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
} else if (newList.size < list.size) {
expr.copy(list = newList)
} else { // newList.length == list.length
expr
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ class OptimizeInSuite extends PlanTest {

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("OptimizedIn test: Remove deterministic repetitions") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"),
Seq(Literal(1), Literal(1), Literal(2), Literal(2), Literal(1), Literal(2))))
.where(In(UnresolvedAttribute("b"),
Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0),
Rand(0), Rand(0))))
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2))))
.where(In(UnresolvedAttribute("b"),
Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0),
Rand(0), Rand(0))))
.analyze

comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In clause not optimized to InSet when less than 10 items") {
val originalQuery =
testRelation
Expand Down