Skip to content

Commit

Permalink
[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-6969

Author: Yin Huai <yhuai@databricks.com>

Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits:

1e5142b [Yin Huai] Add todo.
92b2498 [Yin Huai] Minor updates.
367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.
  • Loading branch information
yhuai authored and marmbrus committed Apr 21, 2015
1 parent 03fd921 commit 6265cba
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
17 changes: 17 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(databaseName, tableName)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = DataFrame(sqlContext, logicalPlan)
// Uncache the logicalPlan.
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sqlContext.cacheManager.cacheQuery(df, Some(tableName))
}

Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.hive

import java.io.File

import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils

class CachedTableSuite extends QueryTest {

Expand Down Expand Up @@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
assertCached(table("udfTest"))
uncacheTable("udfTest")
}

test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
// Cache the table.
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").save(tempPath.toString, "parquet", SaveMode.Append)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql("REFRESH TABLE refreshTable")
// We are using the new data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())

// Drop the table and create it again.
sql("DROP TABLE refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH TABLE command should not make a uncached
// table cached.
sql("REFRESH TABLE refreshTable")
checkAnswer(
table("refreshTable"),
table("src").unionAll(table("src")).collect())
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")

sql("DROP TABLE refreshTable")
Utils.deleteRecursively(tempPath)
}
}

0 comments on commit 6265cba

Please sign in to comment.