From f1e8df1eeafc168f272949e1f47562fd7e04b6cb Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 15 May 2015 02:07:04 +0000 Subject: [PATCH 1/2] [SPARK-6743][SQL] Fix empty projections of cached data --- project/SparkBuild.scala | 1 + .../main/scala/org/apache/spark/sql/Row.scala | 3 +++ .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../columnar/InMemoryColumnarTableScan.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++-- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 1b87e4e98bd83..b9515a12bc573 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -324,6 +324,7 @@ object Hive { |import org.apache.spark.sql.functions._ |import org.apache.spark.sql.hive._ |import org.apache.spark.sql.hive.test.TestHive._ + |import org.apache.spark.sql.hive.test.TestHive.implicits._ |import org.apache.spark.sql.types._""".stripMargin, cleanupCommands in console := "sparkContext.stop()", // Some of our log4j jars make it impossible to submit jobs from this JVM to Hive Map/Reduce diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 4190b7ffe1c8f..0d460b634d9b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -55,6 +55,9 @@ object Row { // TODO: Improve the performance of this if used in performance critical part. new GenericRow(rows.flatMap(_.toSeq).toArray) } + + /** Returns an empty row. */ + val empty = apply() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4fd5105c27443..5f8a9e99a4428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1474,7 +1474,7 @@ class DataFrame private[sql]( /** * :: Experimental :: - * Creates a table at the given path from the the contents of this DataFrame + * (Scala-specific) Creates a table at the given path from the the contents of this DataFrame * based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of * partition columns. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 0ded1cce68391..05e1bf5ef042f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -314,7 +314,7 @@ private[sql] case class InMemoryColumnarTableScan( columnAccessors(i).extractTo(nextRow, i) i += 1 } - nextRow + if(attributes.isEmpty) Row.empty else nextRow } override def hasNext: Boolean = columnAccessors(0).hasNext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8cdbe076cbd85..1e5ac1875c273 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -39,6 +39,19 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { import org.apache.spark.sql.test.TestSQLContext.implicits._ val sqlCtx = TestSQLContext + test("SPARK-6743: no columns from cache") { + Seq( + (83,0,38), + (26,0,79), + (43,81,24) + ).toDF("a", "b", "c").registerTempTable("cachedData") + + cacheTable("cachedData") + checkAnswer( + sql("SELECT t1.b FROM cachedData, cachedData t1 GROUP BY t1.b"), + Row(0) :: Row(81) :: Nil) + } + test("self join with aliases") { Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str").registerTempTable("df") @@ -141,7 +154,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { sql("SELECT ABS(2.5)"), Row(2.5)) } - + test("aggregation with codegen") { val originalValue = conf.codegenEnabled setConf(SQLConf.CODEGEN_ENABLED, "true") @@ -193,7 +206,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { "SELECT value, sum(key) FROM testData3x GROUP BY value", (1 to 100).map(i => Row(i.toString, 3 * i))) testCodeGen( - "SELECT sum(key), SUM(CAST(key as Double)) FROM testData3x", + "SELECT sum(key), SUM(CAST(key as Double)) FROM testData3x", Row(5050 * 3, 5050 * 3.0) :: Nil) // AVERAGE testCodeGen( From aad7eabc142a914a8d2222ce1cd1d94848e74709 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 15 May 2015 02:11:59 +0000 Subject: [PATCH 2/2] rxins comments --- .../spark/sql/columnar/InMemoryColumnarTableScan.scala | 2 +- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 05e1bf5ef042f..a59d42cdd6028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -314,7 +314,7 @@ private[sql] case class InMemoryColumnarTableScan( columnAccessors(i).extractTo(nextRow, i) i += 1 } - if(attributes.isEmpty) Row.empty else nextRow + if (attributes.isEmpty) Row.empty else nextRow } override def hasNext: Boolean = columnAccessors(0).hasNext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1e5ac1875c273..8dd4e8889063f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -41,9 +41,9 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("SPARK-6743: no columns from cache") { Seq( - (83,0,38), - (26,0,79), - (43,81,24) + (83, 0, 38), + (26, 0, 79), + (43, 81, 24) ).toDF("a", "b", "c").registerTempTable("cachedData") cacheTable("cachedData")