From fc63b896b9b2ba7192b110158edbfc61f32bc7b6 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Sun, 30 Aug 2015 18:18:51 -0700 Subject: [PATCH 1/2] Cache Table is not working while subquery has alias in its project list --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 15 ++++++++++++--- .../apache/spark/sql/hive/CachedTableSuite.scala | 14 ++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9bb466ac2d29c..8f8747e105932 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { /** Args that have cleaned such that differences in expression id should not affect equality */ protected lazy val cleanArgs: Seq[Any] = { val input = children.flatMap(_.output) + def cleanExpression(e: Expression) = e match { + case a: Alias => + // As the root of the expression, Alias will always take an arbitrary exprId, we need + // to erase that for equality testing. + val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers) + BindReferences.bindReference(cleanedExprId, input, allowFailures = true) + case other => BindReferences.bindReference(other, input, allowFailures = true) + } + productIterator.map { // Children are checked using sameResult above. case tn: TreeNode[_] if containsChild(tn) => null - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case s: Option[_] => s.map { - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case other => other } case s: Seq[_] => s.map { - case e: Expression => BindReferences.bindReference(e, input, allowFailures = true) + case e: Expression => cleanExpression(e) case other => other } case other => other diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 39d315aaeab57..b6517fdb6e6d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -203,4 +203,18 @@ class CachedTableSuite extends QueryTest { sql("DROP TABLE refreshTable") Utils.deleteRecursively(tempPath) } + + test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") { + import org.apache.spark.sql.hive.execution.HiveTableScan + sql("select key, value, key + 1 from src").registerTempTable("abc") + cacheTable("abc") + + val sparkPlan = sql( + """select a.key, b.key, c.key from + |abc a join abc b on a.key=b.key + |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan + + assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) + assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) + } } From b6c0cb92c9e9c37325b5783d8b9e0f4e1c5169cc Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 31 Aug 2015 19:36:38 -0700 Subject: [PATCH 2/2] move the unit test from Hive packge to sql/core --- .../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++ .../apache/spark/sql/hive/CachedTableSuite.scala | 14 -------------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index af7590c3d3c17..e1d7fc6d872d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.execution.PhysicalRDD + import scala.concurrent.duration._ import scala.language.postfixOps @@ -336,4 +338,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { assert((accsSize - 2) == Accumulators.originals.size) } } + + test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") { + ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil) + .toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc") + ctx.cacheTable("abc") + + val sparkPlan = sql( + """select a.key, b.key, c.key from + |abc a join abc b on a.key=b.key + |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan + + assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) + assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index b6517fdb6e6d4..39d315aaeab57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -203,18 +203,4 @@ class CachedTableSuite extends QueryTest { sql("DROP TABLE refreshTable") Utils.deleteRecursively(tempPath) } - - test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") { - import org.apache.spark.sql.hive.execution.HiveTableScan - sql("select key, value, key + 1 from src").registerTempTable("abc") - cacheTable("abc") - - val sparkPlan = sql( - """select a.key, b.key, c.key from - |abc a join abc b on a.key=b.key - |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan - - assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) - assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) - } }