From 6650fcf4ab313c4fb3c48560fd575327b8162845 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 30 Sep 2017 20:13:40 -0700 Subject: [PATCH 1/2] fix --- .../apache/spark/sql/internal/CatalogImpl.scala | 15 +++++++++++---- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 14 +++++++++++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 142b005850a49..ff3e95660a892 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -474,13 +474,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively. - // Non-temp tables: refresh the metadata cache. - sessionCatalog.refreshTable(tableIdent) - + val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent) // If this table is cached as an InMemoryRelation, drop the original // cached version and make the new version cached lazily. val table = sparkSession.table(tableIdent) + + if (tableMetadata.tableType == CatalogTableType.VIEW) { + // Temp or persistent views: refresh (or invalidate) any metadata/data cached + // in the plan recursively. + table.queryExecution.analyzed.foreach(_.refresh()) + } else { + // Non-temp tables: refresh the metadata cache. + sessionCatalog.refreshTable(tableIdent) + } + if (isCached(table)) { // Uncache the logicalPlan. sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 0c28a1b609bb8..e71aba72c31fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -31,14 +31,22 @@ import org.apache.spark.sql.test.SQLTestUtils class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-16337 temporary view refresh") { - withTempView("view_refresh") { + checkRefreshView(isTemp = true) + } + + test("view refresh") { + checkRefreshView(isTemp = false) + } + + private def checkRefreshView(isTemp: Boolean) { + withView("view_refresh") { withTable("view_table") { // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) .write.saveAsTable("view_table") - // Read the table in - spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh") + val temp = if (isTemp) "TEMPORARY" else "" + spark.sql(s"CREATE $temp VIEW view_refresh AS SELECT * FROM view_table WHERE id > -1") assert(sql("select count(*) from view_refresh").first().getLong(0) == 100) // Delete a file using the Hadoop file system interface since the path returned by From d677c779488dc989bca2f1e3aefb94152227ddf9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 3 Oct 2017 09:23:06 -0700 Subject: [PATCH 2/2] comment update. --- .../scala/org/apache/spark/sql/internal/CatalogImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index ff3e95660a892..fdd25330c5e67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -475,8 +475,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent) - // If this table is cached as an InMemoryRelation, drop the original - // cached version and make the new version cached lazily. val table = sparkSession.table(tableIdent) if (tableMetadata.tableType == CatalogTableType.VIEW) { @@ -488,6 +486,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sessionCatalog.refreshTable(tableIdent) } + // If this table is cached as an InMemoryRelation, drop the original + // cached version and make the new version cached lazily. if (isCached(table)) { // Uncache the logicalPlan. sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true)