From 59085d3862522265246d0e80992f4fb031a98c79 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 13 May 2016 17:25:35 +0900 Subject: [PATCH 1/3] Add a test to check if filter and then select works. --- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 3b9feae4a31c..d07c895f50b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -205,6 +205,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ("b", 2)) } + test("filter and then select") { + val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() + checkDataset( + ds.filter(_._1 == "b").select(expr("_1").as[String]), + ("b")) + } + test("foreach") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() val acc = sparkContext.accumulator(0) From 2620409239e808578dbbd9e0099d32f041879f8e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 13 May 2016 17:39:26 +0900 Subject: [PATCH 2/3] Fix EmbedSerializerInFilter rule to keep exprIds of output of surrounded SerializeFromObject. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 +++++++++++-- .../optimizer/TypedFilterOptimizationSuite.scala | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 928ba213b593..da9bab557788 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -107,7 +107,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :: Batch("Typed Filter Optimization", fixedPoint, - EmbedSerializerInFilter) :: + EmbedSerializerInFilter, + RemoveAliasOnlyProject) :: Batch("LocalRelation", fixedPoint, ConvertToLocalRelation) :: Batch("OptimizeCodegen", Once, @@ -1560,7 +1561,15 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { val newCondition = condition transform { case a: Attribute if a == d.output.head => d.deserializer } - Filter(newCondition, d.child) + val filter = Filter(newCondition, d.child) + + // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. + // We will remove it later in RemoveAliasOnlyProject rule. + val objAttrs = filter.output.zip(s.output).map { + case (fout, sout) => + Alias(fout, fout.name)(exprId = sout.exprId) + } + Project(objAttrs, filter) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala index 1fae64e3bc6b..289c16aef47a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala @@ -67,7 +67,7 @@ class TypedFilterOptimizationSuite extends PlanTest { val deserializer = UnresolvedDeserializer(encoderFor[(Int, Int)].deserializer) val condition = callFunction(f, BooleanType, deserializer) - val expected = input.where(condition).analyze + val expected = input.where(condition).select('_1.as("_1"), '_2.as("_2")).analyze comparePlans(optimized, expected) } From da35c7eb38ed10cff4c657d5d68bf2d78d4e2e1b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 13 May 2016 18:12:01 +0900 Subject: [PATCH 3/3] Fix style. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index da9bab557788..f129b51b667a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1565,9 +1565,8 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { // Adds an extra Project here, to preserve the output expr id of `SerializeFromObject`. // We will remove it later in RemoveAliasOnlyProject rule. - val objAttrs = filter.output.zip(s.output).map { - case (fout, sout) => - Alias(fout, fout.name)(exprId = sout.exprId) + val objAttrs = filter.output.zip(s.output).map { case (fout, sout) => + Alias(fout, fout.name)(exprId = sout.exprId) } Project(objAttrs, filter) }