From 9eb4c2537a7453ecce8c6e15153b1d94f000d477 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 13 Jun 2016 15:05:21 +0900 Subject: [PATCH 1/5] Add a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable. --- .../scala/org/apache/spark/sql/CachedTableSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 44bafa55bc330..684a5e37c8de4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -552,4 +552,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext selectStar, Seq(Row(1, "1"))) } + + test("SPARK-15915 CacheManager should use canonicalized plan for planToCache") { + val localRelation = Seq(1, 2, 3).toDF() + localRelation.createOrReplaceTempView("localRelation") + + spark.catalog.cacheTable("localRelation") + assert( + localRelation.queryExecution.withCachedData.collect { + case i: InMemoryRelation => i + }.size == 1) + } } From 52641f4612bd2a461ef678778f34b464a97d953d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 13 Jun 2016 15:15:25 +0900 Subject: [PATCH 2/5] Modify CacheManager to use canonicalized plan for planToCache. --- .../apache/spark/sql/execution/CacheManager.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4e95754e9bef7..32ed7b8c95aa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -87,7 +87,7 @@ private[sql] class CacheManager extends Logging { query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.analyzed + val planToCache = query.queryExecution.analyzed.canonicalized if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { @@ -106,7 +106,7 @@ private[sql] class CacheManager extends Logging { /** Removes the data for the given [[Dataset]] from the cache */ private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed + val planToCache = query.queryExecution.analyzed.canonicalized val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) @@ -120,7 +120,7 @@ private[sql] class CacheManager extends Logging { private[sql] def tryUncacheQuery( query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { - val planToCache = query.queryExecution.analyzed + val planToCache = query.queryExecution.analyzed.canonicalized val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 if (found) { @@ -137,7 +137,8 @@ private[sql] class CacheManager extends Logging { /** Optionally returns cached data for the given [[LogicalPlan]]. */ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.find(cd => plan.sameResult(cd.plan)) + val canonicalized = plan.canonicalized + cachedData.find(cd => canonicalized.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ @@ -155,8 +156,9 @@ private[sql] class CacheManager extends Logging { * function will over invalidate. */ private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock { + val canonicalized = plan.canonicalized cachedData.foreach { - case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty => + case data if data.plan.collect { case p if p.sameResult(canonicalized) => p }.nonEmpty => data.cachedRepresentation.recache() case _ => } From 11dc433975719288a3694746c60a571d3f349b22 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 14 Jun 2016 08:24:29 +0900 Subject: [PATCH 3/5] Use canonicalized plan to compare. --- .../sql/catalyst/plans/logical/LocalRelation.scala | 10 ++++++---- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 +++++--- .../sql/execution/datasources/LogicalRelation.scala | 8 +++++--- .../org/apache/spark/sql/hive/MetastoreRelation.scala | 2 +- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 87b8647655f4a..9d64f35efcc6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -65,10 +65,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } } - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LocalRelation(otherOutput, otherData) => - otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case LocalRelation(otherOutput, otherData) => + otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data + case _ => false + } } override lazy val statistics = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ee72a70cced1a..e2c23a4ba8670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -87,9 +87,11 @@ private[sql] case class LogicalRDD( override def newInstance(): LogicalRDD.this.type = LogicalRDD(output.map(_.newInstance()), rdd)(session).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } } override protected def stringArgs: Iterator[Any] = Iterator(output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index a418d02983eb9..39c8606fd14b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -60,9 +60,11 @@ case class LogicalRelation( com.google.common.base.Objects.hashCode(relation, output) } - override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation, _, _) => relation == otherRelation - case _ => false + override def sameResult(otherPlan: LogicalPlan): Boolean = { + otherPlan.canonicalized match { + case LogicalRelation(otherRelation, _, _) => relation == otherRelation + case _ => false + } } // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 9c820144aee12..55accb8c06544 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -185,7 +185,7 @@ private[hive] case class MetastoreRelation( /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { - plan match { + plan.canonicalized match { case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName case _ => false From 3d27607982a3794c438221153b8e078389da146b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 14 Jun 2016 10:05:22 +0900 Subject: [PATCH 4/5] Revert "Modify CacheManager to use canonicalized plan for planToCache." This reverts commit 52641f4612bd2a461ef678778f34b464a97d953d. --- .../apache/spark/sql/execution/CacheManager.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 32ed7b8c95aa5..4e95754e9bef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -87,7 +87,7 @@ private[sql] class CacheManager extends Logging { query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.analyzed.canonicalized + val planToCache = query.queryExecution.analyzed if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { @@ -106,7 +106,7 @@ private[sql] class CacheManager extends Logging { /** Removes the data for the given [[Dataset]] from the cache */ private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed.canonicalized + val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) @@ -120,7 +120,7 @@ private[sql] class CacheManager extends Logging { private[sql] def tryUncacheQuery( query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { - val planToCache = query.queryExecution.analyzed.canonicalized + val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 if (found) { @@ -137,8 +137,7 @@ private[sql] class CacheManager extends Logging { /** Optionally returns cached data for the given [[LogicalPlan]]. */ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - val canonicalized = plan.canonicalized - cachedData.find(cd => canonicalized.sameResult(cd.plan)) + cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ @@ -156,9 +155,8 @@ private[sql] class CacheManager extends Logging { * function will over invalidate. */ private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock { - val canonicalized = plan.canonicalized cachedData.foreach { - case data if data.plan.collect { case p if p.sameResult(canonicalized) => p }.nonEmpty => + case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty => data.cachedRepresentation.recache() case _ => } From 0c9ec86f7e2646466b91d75b26dc29de31b98d31 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 14 Jun 2016 10:10:19 +0900 Subject: [PATCH 5/5] Update a test name. --- .../src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 684a5e37c8de4..3306ac42a3650 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -553,7 +553,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext Seq(Row(1, "1"))) } - test("SPARK-15915 CacheManager should use canonicalized plan for planToCache") { + test("SPARK-15915 Logical plans should use canonicalized plan when override sameResult") { val localRelation = Seq(1, 2, 3).toDF() localRelation.createOrReplaceTempView("localRelation")