From 74d107dab2e700e9277c47f53c43c1b4ac7de4bb Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 23 Jun 2016 13:25:33 -0700 Subject: [PATCH 01/12] [SPARK-16174][SQL] Add RemoveLiteralRepetitionFromIn optimizer --- .../sql/catalyst/optimizer/Optimizer.scala | 15 +++++++++++++++ .../catalyst/optimizer/OptimizeInSuite.scala | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9ee17350695aa..80fffb3496f47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -93,6 +93,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) // Constant folding and strength reduction NullPropagation, FoldablePropagation, + RemoveLiteralRepetitionFromIn, OptimizeIn(conf), ConstantFolding, ReorderAssociativeOperator, @@ -819,6 +820,20 @@ object ConstantFolding extends Rule[LogicalPlan] { } } +/** + * Removes literal repetitions from IN predicate + */ +object RemoveLiteralRepetitionFromIn extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsUp { + case i @ In(v, list) if list.exists(_.isInstanceOf[Literal]) => + val (literals, others) = list.partition(_.isInstanceOf[Literal]) + val newList = ExpressionSet(literals).toSeq ++ others + if (newList.length == list.length) i else i.copy(v, newList) + } + } +} + /** * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]] * which is much faster diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index f1a4ea8280ab7..3c4b3ab9ea0c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -37,11 +37,30 @@ class OptimizeInSuite extends PlanTest { NullPropagation, ConstantFolding, BooleanSimplification, + RemoveLiteralRepetitionFromIn, OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + test("RemoveRepetitionFromIn test") { + val originalQuery = + testRelation + .where(In(UnresolvedAttribute("a"), + Seq(Literal(1), Literal(1), Literal(2), Literal(2), Literal(1), Literal(2)))) + .where(In(UnresolvedAttribute("b"), Seq(Literal(1), Literal(1), UnresolvedAttribute("b")))) + .analyze + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2)))) + .where(In(UnresolvedAttribute("b"), Seq(Literal(1), UnresolvedAttribute("b")))) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("OptimizedIn test: In clause not optimized to InSet when less than 10 items") { val originalQuery = testRelation From 779ade837c4eb43ee15befc17e01b2a059bf960e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 23 Jun 2016 16:07:18 -0700 Subject: [PATCH 02/12] Merge into OptimizeIn optimizer and handle all deterministic cases. --- .../sql/catalyst/optimizer/Optimizer.scala | 35 ++++++++----------- .../catalyst/optimizer/OptimizeInSuite.scala | 9 +++-- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 80fffb3496f47..49d46a1d02b14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -93,7 +93,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) // Constant folding and strength reduction NullPropagation, FoldablePropagation, - RemoveLiteralRepetitionFromIn, OptimizeIn(conf), ConstantFolding, ReorderAssociativeOperator, @@ -821,30 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] { } /** - * Removes literal repetitions from IN predicate - */ -object RemoveLiteralRepetitionFromIn extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case q: LogicalPlan => q transformExpressionsUp { - case i @ In(v, list) if list.exists(_.isInstanceOf[Literal]) => - val (literals, others) = list.partition(_.isInstanceOf[Literal]) - val newList = ExpressionSet(literals).toSeq ++ others - if (newList.length == list.length) i else i.copy(v, newList) - } - } -} - -/** - * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]] - * which is much faster + * Removes literal repetitions from IN predicate and replaces [[In (value, seq[Literal])]] + * with optimized version[[InSet (value, HashSet[Literal])]] which is much faster. */ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && - list.size > conf.optimizerInSetConversionThreshold => - val hSet = list.map(e => e.eval(EmptyRow)) - InSet(v, HashSet() ++ hSet) + case i @ In(v, l) => + val (deterministics, others) = l.partition(_.deterministic) + val newList = ExpressionSet(deterministics).toSeq ++ others + if (newList.forall(_.isInstanceOf[Literal]) && + newList.size > conf.optimizerInSetConversionThreshold) { + val hSet = newList.map(e => e.eval(EmptyRow)) + InSet(v, HashSet() ++ hSet) + } else if (newList.length < l.length) { + i.copy(v, newList) + } else { // netList.length == l.length + i + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 3c4b3ab9ea0c7..b70b2f577047c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -37,7 +37,6 @@ class OptimizeInSuite extends PlanTest { NullPropagation, ConstantFolding, BooleanSimplification, - RemoveLiteralRepetitionFromIn, OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil } @@ -48,14 +47,18 @@ class OptimizeInSuite extends PlanTest { testRelation .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(1), Literal(2), Literal(2), Literal(1), Literal(2)))) - .where(In(UnresolvedAttribute("b"), Seq(Literal(1), Literal(1), UnresolvedAttribute("b")))) + .where(In(UnresolvedAttribute("b"), + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"), + Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0), + Rand(0), Rand(0)))) .analyze val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = testRelation .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2)))) - .where(In(UnresolvedAttribute("b"), Seq(Literal(1), UnresolvedAttribute("b")))) + .where(In(UnresolvedAttribute("b"), + Seq(UnresolvedAttribute("a"), Round(UnresolvedAttribute("a"), 0), Rand(0), Rand(0)))) .analyze comparePlans(optimized, correctAnswer) From e99fa4a2d25260a2d08c6e57cb6b68fa99e2731f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 23 Jun 2016 16:11:31 -0700 Subject: [PATCH 03/12] Change testcase name and fix typo. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 49d46a1d02b14..ab6ca7950afd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -835,7 +835,7 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { InSet(v, HashSet() ++ hSet) } else if (newList.length < l.length) { i.copy(v, newList) - } else { // netList.length == l.length + } else { // newList.length == l.length i } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index b70b2f577047c..92333c17edfd1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -42,7 +42,7 @@ class OptimizeInSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - test("RemoveRepetitionFromIn test") { + test("OptimizedIn test: Remove deterministic repetitions") { val originalQuery = testRelation .where(In(UnresolvedAttribute("a"), From 4f54c646ab005d82708fc9ed9c01ecd10697ea82 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 23 Jun 2016 16:21:10 -0700 Subject: [PATCH 04/12] Update optimizer descriptions. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ab6ca7950afd3..964fbce4ee777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -820,8 +820,10 @@ object ConstantFolding extends Rule[LogicalPlan] { } /** - * Removes literal repetitions from IN predicate and replaces [[In (value, seq[Literal])]] - * with optimized version[[InSet (value, HashSet[Literal])]] which is much faster. + * Optimize IN predicates: + * 1. Removes deterministic repetitions. + * 2. Replaces [[In (value, seq[Literal])]] with optimized version + * [[InSet (value, HashSet[Literal])]] which is much faster. */ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { From 6628c7bc514b0da04d5e6848cc1a7ceaad2908c3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 25 Jun 2016 04:10:22 -0700 Subject: [PATCH 05/12] According to SPARK-16081, replace `l` with `list`. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 964fbce4ee777..e5b543b0be9d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -828,16 +828,16 @@ object ConstantFolding extends Rule[LogicalPlan] { case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case i @ In(v, l) => - val (deterministics, others) = l.partition(_.deterministic) + case i @ In(v, list) => + val (deterministics, others) = list.partition(_.deterministic) val newList = ExpressionSet(deterministics).toSeq ++ others if (newList.forall(_.isInstanceOf[Literal]) && newList.size > conf.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) - } else if (newList.length < l.length) { + } else if (newList.length < list.length) { i.copy(v, newList) - } else { // newList.length == l.length + } else { // newList.length == list.length i } } From 314bc74209d1810611053a56ff19b50d45d6fdf2 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 11:11:14 -0700 Subject: [PATCH 06/12] Simplify the logic to handle only all-deterministic cases. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- .../spark/sql/catalyst/optimizer/OptimizeInSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e5b543b0be9d4..78f42db2c564b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -828,9 +828,8 @@ object ConstantFolding extends Rule[LogicalPlan] { case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case i @ In(v, list) => - val (deterministics, others) = list.partition(_.deterministic) - val newList = ExpressionSet(deterministics).toSeq ++ others + case i @ In(v, list) if list.forall(_.deterministic) => + val newList = ExpressionSet(list).toSeq if (newList.forall(_.isInstanceOf[Literal]) && newList.size > conf.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 92333c17edfd1..0877207728b38 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -58,7 +58,9 @@ class OptimizeInSuite extends PlanTest { testRelation .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2)))) .where(In(UnresolvedAttribute("b"), - Seq(UnresolvedAttribute("a"), Round(UnresolvedAttribute("a"), 0), Rand(0), Rand(0)))) + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"), + Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0), + Rand(0), Rand(0)))) .analyze comparePlans(optimized, correctAnswer) From 23d6e307cc1ad7599b7504de4312ec31f06cc07e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 14:51:34 -0700 Subject: [PATCH 07/12] Add lazy val. --- .../org/apache/spark/sql/catalyst/expressions/predicates.scala | 1 + .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index a3b098afe5728..b3c3280c1e0c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate } override def children: Seq[Expression] = value +: list + lazy val optimizable = children.forall(_.deterministic) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 78f42db2c564b..0acd50db84298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -828,7 +828,7 @@ object ConstantFolding extends Rule[LogicalPlan] { case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case i @ In(v, list) if list.forall(_.deterministic) => + case i @ In(v, list) if i.optimizable => val newList = ExpressionSet(list).toSeq if (newList.forall(_.isInstanceOf[Literal]) && newList.size > conf.optimizerInSetConversionThreshold) { From 125036aae82a54b9504ab3d1ae315e3bea885739 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 16:00:24 -0700 Subject: [PATCH 08/12] Rename variable. --- .../org/apache/spark/sql/catalyst/expressions/predicates.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index b3c3280c1e0c5..744c391cb33b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -132,7 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate } override def children: Seq[Expression] = value +: list - lazy val optimizable = children.forall(_.deterministic) + lazy val inSetConvertible = children.forall(_.deterministic) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0acd50db84298..0fa2976ae487a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -828,7 +828,7 @@ object ConstantFolding extends Rule[LogicalPlan] { case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case i @ In(v, list) if i.optimizable => + case i @ In(v, list) if i.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.forall(_.isInstanceOf[Literal]) && newList.size > conf.optimizerInSetConversionThreshold) { From 63a4a7931a542742585c6d08be7ff65299c8ea47 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 17:39:09 -0700 Subject: [PATCH 09/12] Optimize only literals. --- .../spark/sql/catalyst/expressions/predicates.scala | 3 ++- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 ++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 744c391cb33b6..b000d9aa83706 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -132,7 +132,8 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate } override def children: Seq[Expression] = value +: list - lazy val inSetConvertible = children.forall(_.deterministic) + lazy val inSetConvertible = + children.forall(_.deterministic) && list.forall(_.isInstanceOf[Literal]) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0fa2976ae487a..38f2111dde029 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -821,23 +821,22 @@ object ConstantFolding extends Rule[LogicalPlan] { /** * Optimize IN predicates: - * 1. Removes deterministic repetitions. + * 1. Removes literal repetitions. * 2. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case i @ In(v, list) if i.inSetConvertible => + case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq - if (newList.forall(_.isInstanceOf[Literal]) && - newList.size > conf.optimizerInSetConversionThreshold) { + if (newList.size > conf.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) - } else if (newList.length < list.length) { - i.copy(v, newList) + } else if (newList.size < list.size) { + expr.copy(value = v, list = newList) } else { // newList.length == list.length - i + expr } } } From ccf972dc9c258651a5d9977a4e76d87805f4c1c6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 17:51:21 -0700 Subject: [PATCH 10/12] Fix regression. --- .../org/apache/spark/sql/catalyst/expressions/predicates.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index b000d9aa83706..734bacf727e36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -132,8 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate } override def children: Seq[Expression] = value +: list - lazy val inSetConvertible = - children.forall(_.deterministic) && list.forall(_.isInstanceOf[Literal]) + lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal]) override def nullable: Boolean = children.exists(_.nullable) override def foldable: Boolean = children.forall(_.foldable) From f068e4b4216b37cf593f7f6820d56e60e0363982 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 23:01:18 -0700 Subject: [PATCH 11/12] Replace `copy`. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 38f2111dde029..c8d2d5291c74e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -834,7 +834,7 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } else if (newList.size < list.size) { - expr.copy(value = v, list = newList) + In(v, newList) } else { // newList.length == list.length expr } From eead3dbabfea80b115829548140eb8fca9680aa4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 6 Jul 2016 23:11:31 -0700 Subject: [PATCH 12/12] Fix again. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c8d2d5291c74e..03d15eabdd863 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -834,7 +834,7 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } else if (newList.size < list.size) { - In(v, newList) + expr.copy(list = newList) } else { // newList.length == list.length expr }