Skip to content

Commit

Permalink
[SPARK-22178][SQL] Refresh Persistent Views by REFRESH TABLE Command
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
The underlying tables of persistent views are not refreshed when users issue the REFRESH TABLE command against the persistent views.

## How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19405 from gatorsmile/refreshView.
  • Loading branch information
gatorsmile committed Oct 3, 2017
1 parent 4c5158e commit e65b6b7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)
val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
val table = sparkSession.table(tableIdent)

if (tableMetadata.tableType == CatalogTableType.VIEW) {
// Temp or persistent views: refresh (or invalidate) any metadata/data cached
// in the plan recursively.
table.queryExecution.analyzed.foreach(_.refresh())
} else {
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)
}

// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val table = sparkSession.table(tableIdent)
if (isCached(table)) {
// Uncache the logicalPlan.
sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ import org.apache.spark.sql.test.SQLTestUtils
class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("SPARK-16337 temporary view refresh") {
withTempView("view_refresh") {
checkRefreshView(isTemp = true)
}

test("view refresh") {
checkRefreshView(isTemp = false)
}

private def checkRefreshView(isTemp: Boolean) {
withView("view_refresh") {
withTable("view_table") {
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.saveAsTable("view_table")

// Read the table in
spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
val temp = if (isTemp) "TEMPORARY" else ""
spark.sql(s"CREATE $temp VIEW view_refresh AS SELECT * FROM view_table WHERE id > -1")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)

// Delete a file using the Hadoop file system interface since the path returned by
Expand Down

0 comments on commit e65b6b7

Please sign in to comment.