Skip to content

Commit

Permalink
[SPARK-33228][SQL] Don't uncache data when replacing a view having th…
Browse files Browse the repository at this point in the history
…e same logical plan

### What changes were proposed in this pull request?

SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache when replacing an existing view. But, this change drops cache even when replacing a view having the same logical plan. A sequence of queries to reproduce this as follows;
```
// Spark v2.4.6+
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
      +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
               +- *(1) Range (0, 1, step=1, splits=4)

scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
      +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
               +- *(1) Range (0, 1, step=1, splits=4)

// If one re-runs the same query `df.createOrReplaceTempView("t")`, the cache's swept away
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)

// Until v2.4.6
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
   +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
            +- *(1) Range (0, 1, step=1, splits=4)

scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
   +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
            +- *(1) Range (0, 1, step=1, splits=4)
```

### Why are the changes needed?

bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #30140 from maropu/FixBugInReplaceView.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
maropu committed Oct 27, 2020
1 parent ffda450 commit d1f4a34
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,19 @@ case class CreateViewCommand(

val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
if (replace && catalog.getTempView(name.table).isDefined) {
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
if (replace && catalog.getTempView(name.table).isDefined &&
!catalog.getTempView(name.table).get.sameResult(child)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
if (replace && catalog.getGlobalTempView(name.table).isDefined) {
if (replace && catalog.getGlobalTempView(name.table).isDefined &&
!catalog.getGlobalTempView(name.table).get.sameResult(child)) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val globalTempView = TableIdentifier(name.table, Option(db))
logDebug(s"Try to uncache ${globalTempView.quotedString} before replacing.")
logInfo(s"Try to uncache ${globalTempView.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(spark.sharedState.cacheManager.isEmpty)
}
}

test("SPARK-33228: Don't uncache data when replacing an existing view having the same plan") {
withTempView("tempView") {
spark.catalog.clearCache()
val df = spark.range(1).selectExpr("id a", "id b")
df.cache()
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
df.createOrReplaceTempView("tempView")
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
df.createOrReplaceTempView("tempView")
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
}

withTempView("tempGlobalTempView") {
spark.catalog.clearCache()
val df = spark.range(1).selectExpr("id a", "id b")
df.cache()
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
df.createOrReplaceGlobalTempView("tempGlobalTempView")
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
df.createOrReplaceGlobalTempView("tempGlobalTempView")
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
}
}
}

0 comments on commit d1f4a34

Please sign in to comment.