Skip to content

Commit

Permalink
[SPARK-33786][SQL] The storage level for a cache should be respected …
Browse files Browse the repository at this point in the history
…when a table name is altered

### What changes were proposed in this pull request?

This PR proposes to retain the cache's storage level when a table name is altered by `ALTER TABLE ... RENAME TO ...`.

### Why are the changes needed?

Currently, when a table name is altered, the table's cache is refreshed (if exists), but the storage level is not retained. For example:
```scala
        def getStorageLevel(tableName: String): StorageLevel = {
          val table = spark.table(tableName)
          val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
          cachedData.cachedRepresentation.cacheBuilder.storageLevel
        }

        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
        sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
        sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
        val oldStorageLevel = getStorageLevel("old")

        sql("ALTER TABLE old RENAME TO new")
        val newStorageLevel = getStorageLevel("new")
```
`oldStorageLevel` will be `StorageLevel(memory, deserialized, 1 replicas)` whereas `newStorageLevel` will be `StorageLevel(disk, memory, deserialized, 1 replicas)`, which is the default storage level.

### Does this PR introduce _any_ user-facing change?

Yes, now the storage level for the cache will be retained.

### How was this patch tested?

Added a unit test.

Closes apache#30774 from imback82/alter_table_rename_cache_fix.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ef7f690)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and cloud-fan committed Dec 16, 2020
1 parent 50e72dc commit f3709a0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.{URI, URISyntaxException}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
Expand Down Expand Up @@ -193,18 +192,19 @@ case class AlterTableRenameCommand(
} else {
val table = catalog.getTableMetadata(oldName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
if (wasCached) {
// If `optStorageLevel` is defined, the old table was cached.
val optCachedData = sparkSession.sharedState.cacheManager.lookupCachedData(
sparkSession.table(oldName.unquotedString))
val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
if (optStorageLevel.isDefined) {
CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newName.unquotedString)
optStorageLevel.foreach { storageLevel =>
sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel)
}
}
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,4 +1285,24 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined)
}
}

test("SPARK-33786: Cache's storage level should be respected when a table name is altered.") {
withTable("old", "new") {
withTempPath { path =>
def getStorageLevel(tableName: String): StorageLevel = {
val table = spark.table(tableName)
val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
cachedData.cachedRepresentation.cacheBuilder.storageLevel
}
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
val oldStorageLevel = getStorageLevel("old")

sql("ALTER TABLE old RENAME TO new")
val newStorageLevel = getStorageLevel("new")
assert(oldStorageLevel === newStorageLevel)
}
}
}
}

0 comments on commit f3709a0

Please sign in to comment.