From 6265cba00f6141575b4be825735d77d4cea500ab Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 21 Apr 2015 14:48:42 -0700 Subject: [PATCH] [SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used https://issues.apache.org/jira/browse/SPARK-6969 Author: Yin Huai Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits: 1e5142b [Yin Huai] Add todo. 92b2498 [Yin Huai] Minor updates. 367df92 [Yin Huai] Recache data in the command of REFRESH TABLE. --- .../org/apache/spark/sql/sources/ddl.scala | 17 +++++++ .../spark/sql/hive/CachedTableSuite.scala | 50 ++++++++++++++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 2e861b84b7133..78d494184e759 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { + // Refresh the given table's metadata first. sqlContext.catalog.refreshTable(databaseName, tableName) + + // If this table is cached as a InMemoryColumnarRelation, drop the original + // cached version and make the new version cached lazily. + val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName)) + // Use lookupCachedData directly since RefreshTable also takes databaseName. + val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty + if (isCached) { + // Create a data frame to represent the table. + // TODO: Use uncacheTable once it supports database name. + val df = DataFrame(sqlContext, logicalPlan) + // Uncache the logicalPlan. + sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) + // Cache it again. + sqlContext.cacheManager.cacheQuery(df, Some(tableName)) + } + Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index c188264072a84..fc6c3c35037b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.hive +import java.io.File + import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest} import org.apache.spark.storage.RDDBlockId +import org.apache.spark.util.Utils class CachedTableSuite extends QueryTest { @@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest { assertCached(table("udfTest")) uncacheTable("udfTest") } + + test("REFRESH TABLE also needs to recache the data (data source tables)") { + val tempPath: File = Utils.createTempDir() + tempPath.delete() + table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite) + sql("DROP TABLE IF EXISTS refreshTable") + createExternalTable("refreshTable", tempPath.toString, "parquet") + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Cache the table. + sql("CACHE TABLE refreshTable") + assertCached(table("refreshTable")) + // Append new data. + table("src").save(tempPath.toString, "parquet", SaveMode.Append) + // We are still using the old data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Refresh the table. + sql("REFRESH TABLE refreshTable") + // We are using the new data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").unionAll(table("src")).collect()) + + // Drop the table and create it again. + sql("DROP TABLE refreshTable") + createExternalTable("refreshTable", tempPath.toString, "parquet") + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + // Refresh the table. REFRESH TABLE command should not make a uncached + // table cached. + sql("REFRESH TABLE refreshTable") + checkAnswer( + table("refreshTable"), + table("src").unionAll(table("src")).collect()) + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + + sql("DROP TABLE refreshTable") + Utils.deleteRecursively(tempPath) + } }