From 03f1cfef01e1a1b6dec9aa1b16bbe4c74c2bed88 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 23 Oct 2014 13:02:42 -0700 Subject: [PATCH 1/4] Clean-up / add tests to SameResult suite. --- .../spark/sql/catalyst/plans/SameResultSuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala index e8a793d107451..11e6831b24768 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ /** - * Provides helper methods for comparing plans. + * Tests for the sameResult function of [[LogicalPlan]]. */ class SameResultSuite extends FunSuite { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -52,11 +52,15 @@ class SameResultSuite extends FunSuite { assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b)) assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a)) - assertSameResult(testRelation, testRelation2.select('a), false) - assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false) + assertSameResult(testRelation, testRelation2.select('a), result = false) + assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), result = false) } test("filters") { assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b)) } + + test("sorts") { + assertSameResult(testRelation.orderBy('a.asc), testRelation2.orderBy('a.asc)) + } } From 63a23e4903c6c17051e321b4ec36a7d199e31cb4 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 23 Oct 2014 13:03:25 -0700 Subject: [PATCH 2/4] Perform caching on analyzed instead of optimized plan. --- .../org/apache/spark/sql/CacheManager.scala | 10 +++++----- .../org/apache/spark/sql/SQLContext.scala | 6 +++--- .../apache/spark/sql/CachedTableSuite.scala | 20 ++++++++++++++++++- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 5ab2b5316ab10..3ced11a5e6c11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -82,7 +82,7 @@ private[sql] trait CacheManager { private[sql] def cacheQuery( query: SchemaRDD, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.optimizedPlan + val planToCache = query.queryExecution.analyzed if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { @@ -96,8 +96,8 @@ private[sql] trait CacheManager { /** Removes the data for the given SchemaRDD from the cache */ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.optimizedPlan - val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache)) + val planToCache = query.queryExecution.analyzed + val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) cachedData.remove(dataIndex) @@ -106,12 +106,12 @@ private[sql] trait CacheManager { /** Optionally returns cached data for the given SchemaRDD */ private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock { - lookupCachedData(query.queryExecution.optimizedPlan) + lookupCachedData(query.queryExecution.analyzed) } /** Optionally returns cached data for the given LogicalPlan. */ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.find(_.plan.sameResult(plan)) + cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0e4a9ca60b00d..590dbf3cb893d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -374,13 +374,13 @@ class SQLContext(@transient val sparkContext: SparkContext) def logical: LogicalPlan lazy val analyzed = ExtractPythonUdfs(analyzer(logical)) - lazy val optimizedPlan = optimizer(analyzed) - lazy val withCachedData = useCachedData(optimizedPlan) + lazy val withCachedData = useCachedData(analyzed) + lazy val optimizedPlan = optimizer(withCachedData) // TODO: Don't just pick the first one... lazy val sparkPlan = { SparkPlan.currentContext.set(self) - planner(withCachedData).next() + planner(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 444bc95009c31..22e801b1c0696 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -53,17 +53,33 @@ class CachedTableSuite extends QueryTest { sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } + test("cache temp table") { + testData.select('key).registerTempTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) + cacheTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + uncacheTable("tempTable") + //sql("DROP TABLE tempTable") + } + + test("cache table as select") { + sql("CACHE TABLE tempTable AS SELECT key FROM testData") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + uncacheTable("tempTable") + } + test("too big for memory") { val data = "*" * 10000 sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData") table("bigData").persist(StorageLevel.MEMORY_AND_DISK) assert(table("bigData").count() === 200000L) - table("bigData").unpersist() + table("bigData").unpersist(blocking = true) } test("calling .cache() should use in-memory columnar caching") { table("testData").cache() assertCached(table("testData")) + table("testData").unpersist(blocking = true) } test("calling .unpersist() should drop in-memory columnar cache") { @@ -108,6 +124,8 @@ class CachedTableSuite extends QueryTest { case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r }.size } + + uncacheTable("testData") } test("read from cached table and uncache") { From 5c72fb71eed3df8e58060b3847e13018c621b910 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 23 Oct 2014 13:08:02 -0700 Subject: [PATCH 3/4] Add a test case / question about uncaching semantics. --- .../org/apache/spark/sql/CachedTableSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 22e801b1c0696..09827e67edca7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -68,6 +68,21 @@ class CachedTableSuite extends QueryTest { uncacheTable("tempTable") } + test("uncaching temp table") { + testData.select('key).registerTempTable("tempTable1") + testData.select('key).registerTempTable("tempTable2") + cacheTable("tempTable1") + + assertCached(sql("SELECT COUNT(*) FROM tempTable1")) + assertCached(sql("SELECT COUNT(*) FROM tempTable2")) + + // Is this valid? + uncacheTable("tempTable2") + + // Should this be cached? + assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0) + } + test("too big for memory") { val data = "*" * 10000 sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData") From 9c822d43f4906747090d04ab04e5a4dcd0bb61e5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 23 Oct 2014 13:30:35 -0700 Subject: [PATCH 4/4] remove commented out code --- .../src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 09827e67edca7..da5a358df3b1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -59,7 +59,6 @@ class CachedTableSuite extends QueryTest { cacheTable("tempTable") assertCached(sql("SELECT COUNT(*) FROM tempTable")) uncacheTable("tempTable") - //sql("DROP TABLE tempTable") } test("cache table as select") {