Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33786][SQL] The storage level for a cache should be respected when a table name is altered. #30774

Closed
wants to merge 3 commits into from

Conversation

imback82
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to retain the cache's storage level when a table name is altered by ALTER TABLE ... RENAME TO ....

Why are the changes needed?

Currently, when a table name is altered, the table's cache is refreshed (if exists), but the storage level is not retained. For example:

        def getStorageLevel(tableName: String): StorageLevel = {
          val table = spark.table(tableName)
          val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
          cachedData.cachedRepresentation.cacheBuilder.storageLevel
        }

        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
        sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
        sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
        val oldStorageLevel = getStorageLevel("old")

        sql("ALTER TABLE old RENAME TO new")
        val newStorageLevel = getStorageLevel("new")

oldStorageLevel will be StorageLevel(memory, deserialized, 1 replicas) whereas newStorageLevel will be StorageLevel(disk, memory, deserialized, 1 replicas), which is the default storage level.

Does this PR introduce any user-facing change?

Yes, now the storage level for the cache will be retained.

How was this patch tested?

Added a unit test.

@github-actions github-actions bot added the SQL label Dec 15, 2020
@@ -195,16 +195,21 @@ case class AlterTableRenameCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing implementation uses Catalog APIs (isCached), whereas this PR uses CacheManager directly. If this approach is not desired, we can update Catalog API to expose StorageLevel.

@imback82
Copy link
Contributor Author

I will also update v2 implementation (RenameTableExec) once this PR is done.

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37400/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37400/

@imback82
Copy link
Contributor Author

cc @cloud-fan @viirya

sparkSession.table(oldName.unquotedString))
optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
}.getOrElse(None)
optStorageLevel.foreach { _ =>
Copy link
Contributor

@cloud-fan cloud-fan Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if (optStorageLevel.isDefine). It's clearer as a check to see if the table is cached or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Test build #132798 has finished for PR 30774 at commit eb60d1e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
if (wasCached) {
// If `optStorageLevel` is defined, the old table was cached.
val optStorageLevel = Try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will lookupCachedData throw exception actually? If it's not cached, None should be returned. Isn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookupCachedData doesn't throw an exception. I believe the existing code is wrapped in Try because isCached is calling spark.table (could be related to "this can happen with Hive tables when the underlying catalog is in-memory"). So, I was keeping the same behavior.

Do you think Try can be removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think we can remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove it. We use lookupCachedData in many places and I think we don't need to add Try here.

if (wasCached) {
sparkSession.catalog.cacheTable(newName.unquotedString)
optStorageLevel.foreach { storageLevel =>
sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this miss the tableName if there is in the original cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get this question. This is creating a new cache with a new table name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you can check the change like #30769. Especially how it recaches the table. There is a cacheName parameter. If the table was cached with a cache name, when recaching it, I think we should keep it.

    val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
    session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
    session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
    if (recacheTable && cache.isDefined) {
      // save the cache name and cache level for recreation
      val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
      val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel

      // recache with the same name and cache level.
      val ds = Dataset.ofRows(session, v2Relation)
      session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code seems also recache with the new name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the refresh table command for v2 doesn't recache the table before #30769.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the previous code in AlterTableRenameCommand. We shouldn't change its behavior regarding cache name in this bug fix PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okay, actually it also sounds like a bug if alter table command changes the cache name. I'm fine to leave it unchanged here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe cache name is used for debug purpose only (for RDD name and InMemoryTableScanExec). So if the cache name - which is tied to the table name - doesn't change when the table is changed, wouldn't it cause a confusion since it will still refer to the old table name? I can do a follow up PR if this seems like a bug.

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Test build #132814 has finished for PR 30774 at commit 11603d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37415/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37415/

@cloud-fan
Copy link
Contributor

GA passed, merging to master/3.1, thanks!

@cloud-fan cloud-fan closed this in ef7f690 Dec 16, 2020
cloud-fan pushed a commit that referenced this pull request Dec 16, 2020
…when a table name is altered

### What changes were proposed in this pull request?

This PR proposes to retain the cache's storage level when a table name is altered by `ALTER TABLE ... RENAME TO ...`.

### Why are the changes needed?

Currently, when a table name is altered, the table's cache is refreshed (if exists), but the storage level is not retained. For example:
```scala
        def getStorageLevel(tableName: String): StorageLevel = {
          val table = spark.table(tableName)
          val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
          cachedData.cachedRepresentation.cacheBuilder.storageLevel
        }

        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
        sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
        sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
        val oldStorageLevel = getStorageLevel("old")

        sql("ALTER TABLE old RENAME TO new")
        val newStorageLevel = getStorageLevel("new")
```
`oldStorageLevel` will be `StorageLevel(memory, deserialized, 1 replicas)` whereas `newStorageLevel` will be `StorageLevel(disk, memory, deserialized, 1 replicas)`, which is the default storage level.

### Does this PR introduce _any_ user-facing change?

Yes, now the storage level for the cache will be retained.

### How was this patch tested?

Added a unit test.

Closes #30774 from imback82/alter_table_rename_cache_fix.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ef7f690)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

@imback82 can you open a backport PR for 3.0? thanks!

@dongjoon-hyun
Copy link
Member

Thank you for the fix, @imback82 and @cloud-fan .

cc @sunchao

cloud-fan pushed a commit that referenced this pull request Dec 16, 2020
…cted when a table name is altered

### What changes were proposed in this pull request?

This is a back port of #30774.

This PR proposes to retain the cache's storage level when a table name is altered by `ALTER TABLE ... RENAME TO ...`.

### Why are the changes needed?

Currently, when a table name is altered, the table's cache is refreshed (if exists), but the storage level is not retained. For example:
```scala
        def getStorageLevel(tableName: String): StorageLevel = {
          val table = spark.table(tableName)
          val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
          cachedData.cachedRepresentation.cacheBuilder.storageLevel
        }

        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
        sql(s"CREATE TABLE old USING parquet LOCATION '${path.toURI}'")
        sql("CACHE TABLE old OPTIONS('storageLevel' 'MEMORY_ONLY')")
        val oldStorageLevel = getStorageLevel("old")

        sql("ALTER TABLE old RENAME TO new")
        val newStorageLevel = getStorageLevel("new")
```
`oldStorageLevel` will be `StorageLevel(memory, deserialized, 1 replicas)` whereas `newStorageLevel` will be `StorageLevel(disk, memory, deserialized, 1 replicas)`, which is the default storage level.

### Does this PR introduce _any_ user-facing change?

Yes, now the storage level for the cache will be retained.

### How was this patch tested?

Added a unit test.

Closes #30793 from imback82/alter_table_rename_cache_fix_3.0.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants