From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/13] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdba..5a5b71e52dd79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da02..5e00546a74c00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/13] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00aa..47962ebe6ef82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c00..61d9dcd37572b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/13] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572b..3427152b2da02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 21071d1bc9fe0d5869fa94f21f617ee5dbaae390 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Dec 2015 10:33:59 -0800 Subject: [PATCH 04/13] correct missingInput. --- .../main/scala/org/apache/spark/sql/execution/Generate.scala | 4 ++++ .../org/apache/spark/sql/execution/SparkStrategies.scala | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index 54b8cb58285c2..f789af761a478 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -43,6 +43,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * it. * @param outer when true, each input row will be output at least once, even if the output of the * given `generator` is empty. `outer` has no effect when `join` is false. + * @param generatorOutput the output schema of the Generator. * @param output the output attributes of this node, which constructed in analysis phase, * and we can not change it, as the parent node bound with it already. */ @@ -51,9 +52,12 @@ case class Generate( join: Boolean, outer: Boolean, output: Seq[Attribute], + generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def missingInput: AttributeSet = super.missingInput -- generatorOutput + val boundGenerator = BindReferences.bindReference(generator, child.output) protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 688555cf136e8..d88bc0815939e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -353,9 +353,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Except(planLater(left), planLater(right)) :: Nil case logical.Intersect(left, right) => execution.Intersect(planLater(left), planLater(right)) :: Nil - case g @ logical.Generate(generator, join, outer, _, _, child) => + case g @ logical.Generate(generator, join, outer, _, generatorOutput, child) => execution.Generate( - generator, join = join, outer = outer, g.output, planLater(child)) :: Nil + generator, join = join, outer = outer, g.output, generatorOutput, + planLater(child)) :: Nil case logical.OneRowRelation => execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => From 22d8ba5bf1d4ba9e25de02a4619ff3718f257c1b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Dec 2015 11:11:55 -0800 Subject: [PATCH 05/13] added test cases --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 4c3e12af7203d..63352a6338a4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -131,6 +131,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq } + assert(!df2.queryExecution.toString.contains("!")) checkAnswer( df2 @@ -157,10 +158,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("explode alias and star") { - val df = Seq((Array("a"), 1)).toDF("a", "b") + val df = Seq((Array("a"), 1)).toDF("a", "b").select(explode($"a").as("a"), $"*") + assert(!df.queryExecution.toString.contains("!")) checkAnswer( - df.select(explode($"a").as("a"), $"*"), + df, Row("a", Seq("a"), 1) :: Nil) } @@ -177,9 +179,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("selectExpr with udtf") { - val df = Seq((Map("1" -> 1), 1)).toDF("a", "b") + val df = Seq((Map("1" -> 1), 1)).toDF("a", "b").selectExpr("explode(a)") + assert(!df.queryExecution.toString.contains("!")) checkAnswer( - df.selectExpr("explode(a)"), + df, Row("1", 1) :: Nil) } From b7edd97e823c57c3f53e0a5d8b2c9a953e69082b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Dec 2015 22:18:15 -0800 Subject: [PATCH 06/13] fixed missingInputs for the other four Dataset operators --- .../org/apache/spark/sql/execution/basicOperators.scala | 4 ++++ .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index b3e4688557ba0..3aa559a30e2f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -307,6 +307,7 @@ case class MapPartitions[T, U]( uEncoder: ExpressionEncoder[U], output: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def missingInput: AttributeSet = AttributeSet.empty override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => @@ -325,6 +326,7 @@ case class AppendColumns[T, U]( uEncoder: ExpressionEncoder[U], newColumns: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def missingInput: AttributeSet = super.missingInput -- newColumns // We are using an unsafe combiner. override def canProcessSafeRows: Boolean = false @@ -357,6 +359,7 @@ case class MapGroups[K, T, U]( groupingAttributes: Seq[Attribute], output: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def missingInput: AttributeSet = AttributeSet.empty override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -396,6 +399,7 @@ case class CoGroup[Key, Left, Right, Result]( rightGroup: Seq[Attribute], left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def missingInput: AttributeSet = AttributeSet.empty override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index de012a9a56454..2d877c0c713c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -38,12 +38,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("toDS with RDD") { val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() + val mapPartitionsDS = ds.mapPartitions(_ => Iterator(1)) + assert(!mapPartitionsDS.queryExecution.toString.contains("!")) + checkAnswer( - ds.mapPartitions(_ => Iterator(1)), + mapPartitionsDS, 1, 1, 1) } - test("SPARK-12404: Datatype Helper Serializablity") { val ds = sparkContext.parallelize(( new Timestamp(0), @@ -317,6 +319,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1").keyAs[String] val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) } + assert(!agged.queryExecution.toString.contains("!")) checkAnswer( agged, @@ -385,6 +388,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val cogrouped = ds1.groupBy(_._1).cogroup(ds2.groupBy(_._1)) { case (key, data1, data2) => Iterator(key -> (data1.map(_._2).mkString + "#" + data2.map(_._2).mkString)) } + assert(!cogrouped.queryExecution.toString.contains("!")) checkAnswer( cogrouped, From 6b4ba7458398ecd74c394fba0b062b2d8bfa8752 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 20 Dec 2015 20:56:58 -0800 Subject: [PATCH 07/13] address comments. --- .../main/scala/org/apache/spark/sql/execution/Generate.scala | 4 +--- .../org/apache/spark/sql/execution/SparkStrategies.scala | 5 ++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index f789af761a478..0c613e91b979f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -43,7 +43,6 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * it. * @param outer when true, each input row will be output at least once, even if the output of the * given `generator` is empty. `outer` has no effect when `join` is false. - * @param generatorOutput the output schema of the Generator. * @param output the output attributes of this node, which constructed in analysis phase, * and we can not change it, as the parent node bound with it already. */ @@ -52,11 +51,10 @@ case class Generate( join: Boolean, outer: Boolean, output: Seq[Attribute], - generatorOutput: Seq[Attribute], child: SparkPlan) extends UnaryNode { - override def missingInput: AttributeSet = super.missingInput -- generatorOutput + override def expressions: Seq[Expression] = generator :: Nil val boundGenerator = BindReferences.bindReference(generator, child.output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d88bc0815939e..688555cf136e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -353,10 +353,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Except(planLater(left), planLater(right)) :: Nil case logical.Intersect(left, right) => execution.Intersect(planLater(left), planLater(right)) :: Nil - case g @ logical.Generate(generator, join, outer, _, generatorOutput, child) => + case g @ logical.Generate(generator, join, outer, _, _, child) => execution.Generate( - generator, join = join, outer = outer, g.output, generatorOutput, - planLater(child)) :: Nil + generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => From 63058e32ebe178616af54702852a9e83fa025df9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 20 Dec 2015 22:09:36 -0800 Subject: [PATCH 08/13] address comments. --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 6 +++--- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 63352a6338a4d..b3c8d3ecbba49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -131,7 +131,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq } - assert(!df2.queryExecution.toString.contains("!")) + assert(df2.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df2 @@ -159,7 +159,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("explode alias and star") { val df = Seq((Array("a"), 1)).toDF("a", "b").select(explode($"a").as("a"), $"*") - assert(!df.queryExecution.toString.contains("!")) + assert(df.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df, @@ -180,7 +180,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("selectExpr with udtf") { val df = Seq((Map("1" -> 1), 1)).toDF("a", "b").selectExpr("explode(a)") - assert(!df.queryExecution.toString.contains("!")) + assert(df.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df, Row("1", 1) :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 2d877c0c713c0..e89ed19b9dda6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -39,7 +39,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("toDS with RDD") { val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() val mapPartitionsDS = ds.mapPartitions(_ => Iterator(1)) - assert(!mapPartitionsDS.queryExecution.toString.contains("!")) + assert(mapPartitionsDS.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( mapPartitionsDS, @@ -319,7 +319,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1").keyAs[String] val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) } - assert(!agged.queryExecution.toString.contains("!")) + assert(agged.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( agged, @@ -388,7 +388,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val cogrouped = ds1.groupBy(_._1).cogroup(ds2.groupBy(_._1)) { case (key, data1, data2) => Iterator(key -> (data1.map(_._2).mkString + "#" + data2.map(_._2).mkString)) } - assert(!cogrouped.queryExecution.toString.contains("!")) + assert(cogrouped.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( cogrouped, From a3f4aec249f855584e14b8d38e425024474ca8bb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Dec 2015 16:20:48 -0800 Subject: [PATCH 09/13] added producedAttributes. --- .../spark/sql/catalyst/plans/QueryPlan.scala | 11 ++++++----- .../catalyst/plans/logical/LocalRelation.scala | 6 ++++-- .../catalyst/plans/logical/basicOperators.scala | 8 ++++---- .../apache/spark/sql/execution/ExistingRDD.scala | 10 +++++++--- .../spark/sql/execution/LocalTableScan.scala | 4 +++- .../execution/aggregate/TungstenAggregate.scala | 5 +++++ .../spark/sql/execution/basicOperators.scala | 8 ++++---- .../scala/org/apache/spark/sql/QueryTest.scala | 15 +++++++++++++++ 8 files changed, 48 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index b9db7838db08a..a6ab72e2fdb56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -43,16 +43,17 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy def inputSet: AttributeSet = AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output)) + /** + * The set of all attributes that are produced by this node. + */ + def producedAttributes: AttributeSet = AttributeSet.empty + /** * Attributes that are referenced by expressions but not provided by this nodes children. * Subclasses should override this method if they produce attributes internally as it is used by * assertions designed to prevent the construction of invalid plans. - * - * Note that virtual columns should be excluded. Currently, we only support the grouping ID - * virtual column. */ - def missingInput: AttributeSet = - (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName) + def missingInput: AttributeSet = references -- inputSet -- producedAttributes /** * Runs [[transform]] with `rule` on all expressions present in this query operator. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index e3e7a11dba973..80d618ed93ce8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.{analysis, CatalystTypeConverters, InternalRow} import org.apache.spark.sql.types.{StructField, StructType} object LocalRelation { @@ -62,6 +62,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) case _ => false } + override def producedAttributes: AttributeSet = outputSet + override lazy val statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index ec42b763f18ee..f6b9f03f259c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -494,7 +494,7 @@ case class MapPartitions[T, U]( uEncoder: ExpressionEncoder[U], output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } /** Factory for constructing new `AppendColumn` nodes. */ @@ -520,7 +520,7 @@ case class AppendColumns[T, U]( newColumns: Seq[Attribute], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output ++ newColumns - override def missingInput: AttributeSet = super.missingInput -- newColumns + override def producedAttributes: AttributeSet = AttributeSet(newColumns) } /** Factory for constructing new `MapGroups` nodes. */ @@ -555,7 +555,7 @@ case class MapGroups[K, T, U]( groupingAttributes: Seq[Attribute], output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } /** Factory for constructing new `CoGroup` nodes. */ @@ -598,5 +598,5 @@ case class CoGroup[Key, Left, Right, Result]( rightGroup: Seq[Attribute], left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b8a43025882e5..b6321005c4989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation} +import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{Row, SQLContext} @@ -86,6 +86,8 @@ private[sql] case class LogicalRDD( case _ => false } + override def producedAttributes: AttributeSet = outputSet + @transient override lazy val statistics: Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. @@ -108,6 +110,8 @@ private[sql] case class PhysicalRDD( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } + + override def producedAttributes: AttributeSet = outputSet } private[sql] object PhysicalRDD { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index ba7f6287ac6c3..04a0620a4f27f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} /** @@ -31,6 +31,8 @@ private[sql] case class LocalTableScan( private lazy val rdd = sqlContext.sparkContext.parallelize(rows) + override def producedAttributes: AttributeSet = outputSet + protected override def doExecute(): RDD[InternalRow] = rdd override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index b8849c827048a..9d758eb3b7c32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -55,6 +55,11 @@ case class TungstenAggregate( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) + override def producedAttributes: AttributeSet = + AttributeSet(aggregateAttributes) ++ + AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ + AttributeSet(aggregateBufferAttributes) + override def requiredChildDistribution: List[Distribution] = { requiredChildDistributionExpressions match { case Some(exprs) if exprs.length == 0 => AllTuples :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 3aa559a30e2f2..7554648bca9e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -307,7 +307,7 @@ case class MapPartitions[T, U]( uEncoder: ExpressionEncoder[U], output: Seq[Attribute], child: SparkPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => @@ -326,7 +326,7 @@ case class AppendColumns[T, U]( uEncoder: ExpressionEncoder[U], newColumns: Seq[Attribute], child: SparkPlan) extends UnaryNode { - override def missingInput: AttributeSet = super.missingInput -- newColumns + override def producedAttributes: AttributeSet = AttributeSet(newColumns) // We are using an unsafe combiner. override def canProcessSafeRows: Boolean = false @@ -359,7 +359,7 @@ case class MapGroups[K, T, U]( groupingAttributes: Seq[Attribute], output: Seq[Attribute], child: SparkPlan) extends UnaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -399,7 +399,7 @@ case class CoGroup[Key, Left, Right, Result]( rightGroup: Seq[Attribute], left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def missingInput: AttributeSet = AttributeSet.empty + override def producedAttributes: AttributeSet = outputSet override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index bc22fb8b7bdb4..1ef2597e86e94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -123,6 +123,8 @@ abstract class QueryTest extends PlanTest { |""".stripMargin) } + assertEmptyMissingInput(df) + QueryTest.checkAnswer(analyzedDF, expectedAnswer) match { case Some(errorMessage) => fail(errorMessage) case None => @@ -177,6 +179,19 @@ abstract class QueryTest extends PlanTest { s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + planWithCaching) } + + /** + * Asserts that a given [[Queryable]] does not have missing inputs in all the analyzed plans. + */ + def assertEmptyMissingInput(query: Queryable): Unit = { + assert(query.queryExecution.analyzed.missingInput.isEmpty, + s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}") + assert(query.queryExecution.optimizedPlan.missingInput.isEmpty, + s"The optimized logical plan has missing inputs: ${query.queryExecution.optimizedPlan}") + assert(query.queryExecution.executedPlan.missingInput.isEmpty, + s"The physical plan has missing inputs: ${query.queryExecution.executedPlan}") + } + } object QueryTest { From 02ee2777ffe36e71b6ac32fbd744ea2d5bee3017 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 22 Dec 2015 14:02:14 -0800 Subject: [PATCH 10/13] resolved all the test case failure. --- .../spark/sql/catalyst/plans/logical/LocalRelation.scala | 2 -- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 1 + .../org/apache/spark/sql/execution/ExistingRDD.scala | 2 -- .../org/apache/spark/sql/execution/LocalTableScan.scala | 2 -- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 1 + .../sql/execution/aggregate/SortBasedAggregate.scala | 9 +++++++++ .../execution/columnar/InMemoryColumnarTableScan.scala | 2 ++ .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 3 --- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 8 +------- .../org/apache/spark/sql/ExtraStrategiesSuite.scala | 7 +++---- .../apache/spark/sql/hive/execution/HiveTableScan.scala | 3 +++ .../spark/sql/hive/execution/ScriptTransformation.scala | 2 ++ 12 files changed, 22 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 80d618ed93ce8..572d7d2f0b537 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -62,8 +62,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) case _ => false } - override def producedAttributes: AttributeSet = outputSet - override lazy val statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8f8747e105932..6d859551f8c52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -295,6 +295,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { */ abstract class LeafNode extends LogicalPlan { override def children: Seq[LogicalPlan] = Nil + override def producedAttributes: AttributeSet = outputSet } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b6321005c4989..99a9c3b6d0763 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -110,8 +110,6 @@ private[sql] case class PhysicalRDD( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } - - override def producedAttributes: AttributeSet = outputSet } private[sql] object PhysicalRDD { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 04a0620a4f27f..2b626cb4d6b1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -31,8 +31,6 @@ private[sql] case class LocalTableScan( private lazy val rdd = sqlContext.sparkContext.parallelize(rows) - override def producedAttributes: AttributeSet = outputSet - protected override def doExecute(): RDD[InternalRow] = rdd override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ec98f81041343..fe9b2ad4a0bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -279,6 +279,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private[sql] trait LeafNode extends SparkPlan { override def children: Seq[SparkPlan] = Nil + override def producedAttributes: AttributeSet = outputSet } private[sql] trait UnaryNode extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala index c5470a6989de7..c4587ba677b2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala @@ -36,6 +36,15 @@ case class SortBasedAggregate( child: SparkPlan) extends UnaryNode { + private[this] val aggregateBufferAttributes = { + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) + } + + override def producedAttributes: AttributeSet = + AttributeSet(aggregateAttributes) ++ + AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ + AttributeSet(aggregateBufferAttributes) + override private[sql] lazy val metrics = Map( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala index 3c5a8cb2aa935..9027c831c7f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala @@ -66,6 +66,8 @@ private[sql] case class InMemoryRelation( private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) extends LogicalPlan with MultiInstanceRelation { + override def producedAttributes: AttributeSet = outputSet + private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = if (_batchStats == null) { child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b3c8d3ecbba49..e59dc0dcfdf3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -131,7 +131,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq } - assert(df2.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df2 @@ -159,7 +158,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("explode alias and star") { val df = Seq((Array("a"), 1)).toDF("a", "b").select(explode($"a").as("a"), $"*") - assert(df.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df, @@ -180,7 +178,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("selectExpr with udtf") { val df = Seq((Map("1" -> 1), 1)).toDF("a", "b").selectExpr("explode(a)") - assert(df.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( df, Row("1", 1) :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e89ed19b9dda6..ee485150dca15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -38,11 +38,8 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("toDS with RDD") { val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() - val mapPartitionsDS = ds.mapPartitions(_ => Iterator(1)) - assert(mapPartitionsDS.queryExecution.executedPlan.missingInput.isEmpty) - checkAnswer( - mapPartitionsDS, + ds.mapPartitions(_ => Iterator(1)), 1, 1, 1) } @@ -319,8 +316,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1").keyAs[String] val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) } - assert(agged.queryExecution.executedPlan.missingInput.isEmpty) - checkAnswer( agged, ("a", 30), ("b", 3), ("c", 1)) @@ -388,7 +383,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val cogrouped = ds1.groupBy(_._1).cogroup(ds2.groupBy(_._1)) { case (key, data1, data2) => Iterator(key -> (data1.map(_._2).mkString + "#" + data2.map(_._2).mkString)) } - assert(cogrouped.queryExecution.executedPlan.missingInput.isEmpty) checkAnswer( cogrouped, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala index 78a98798eff64..359a1e7f8424a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala @@ -15,16 +15,14 @@ * limitations under the License. */ -package test.org.apache.spark.sql +package org.apache.spark.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Literal, GenericInternalRow, Attribute} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.{Row, Strategy, QueryTest} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.unsafe.types.UTF8String case class FastOperator(output: Seq[Attribute]) extends SparkPlan { @@ -34,6 +32,7 @@ case class FastOperator(output: Seq[Attribute]) extends SparkPlan { sparkContext.parallelize(Seq(row)) } + override def producedAttributes: AttributeSet = outputSet override def children: Seq[SparkPlan] = Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 806d2b9b0b7d4..8141136de5311 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -51,6 +51,9 @@ case class HiveTableScan( require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") + override def producedAttributes: AttributeSet = outputSet ++ + AttributeSet(partitionPruningPred.flatMap(_.references)) + // Retrieve the original attributes based on expression ID so that capitalization matches. val attributes = requestedAttributes.map(relation.attributeMap) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index b30117f0de997..1de21e7d719ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -60,6 +60,8 @@ case class ScriptTransformation( override def otherCopyArgs: Seq[HiveContext] = sc :: Nil + override def producedAttributes: AttributeSet = outputSet -- inputSet + private val serializedHiveConf = new SerializableConfiguration(sc.hiveconf) protected override def doExecute(): RDD[InternalRow] = { From c324534ef0eded7efe9e4b9762c925a62a4c84b3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 22 Dec 2015 14:04:33 -0800 Subject: [PATCH 11/13] reverted it back. --- .../scala/org/apache/spark/sql/execution/LocalTableScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 2b626cb4d6b1d..ba7f6287ac6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.expressions.Attribute /** From 3dcd673358b41c7e84836385d0bd5b1de392a5e5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 22 Dec 2015 14:06:18 -0800 Subject: [PATCH 12/13] reverted it back. --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index ee485150dca15..de012a9a56454 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -43,6 +43,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 1, 1, 1) } + test("SPARK-12404: Datatype Helper Serializablity") { val ds = sparkContext.parallelize(( new Timestamp(0), @@ -316,6 +317,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupBy($"_1").keyAs[String] val agged = grouped.mapGroups { case (g, iter) => (g, iter.map(_._2).sum) } + checkAnswer( agged, ("a", 30), ("b", 3), ("c", 1)) From 5539cbe57b7f4a024f5a1413a2746e57c16cf34a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 22 Dec 2015 14:07:23 -0800 Subject: [PATCH 13/13] reverted it back. --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e59dc0dcfdf3a..4c3e12af7203d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -157,10 +157,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("explode alias and star") { - val df = Seq((Array("a"), 1)).toDF("a", "b").select(explode($"a").as("a"), $"*") + val df = Seq((Array("a"), 1)).toDF("a", "b") checkAnswer( - df, + df.select(explode($"a").as("a"), $"*"), Row("a", Seq("a"), 1) :: Nil) } @@ -177,9 +177,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("selectExpr with udtf") { - val df = Seq((Map("1" -> 1), 1)).toDF("a", "b").selectExpr("explode(a)") + val df = Seq((Map("1" -> 1), 1)).toDF("a", "b") checkAnswer( - df, + df.selectExpr("explode(a)"), Row("1", 1) :: Nil) }