New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30494][SQL] Fix cached data leakage during replacing an existing view #27185
Conversation
… an existing view
@@ -125,6 +132,9 @@ case class CreateViewCommand( | |||
val viewIdent = tableMetadata.identifier | |||
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) | |||
|
|||
// uncache the cached data before replacing an exists view | |||
sparkSession.catalog.uncacheTable(viewIdent.quotedString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI. There is a condition } else if (replace) {
in line 130.
Test build #116607 has finished for PR 27185 at commit
|
retest it please |
Retest this please |
Test build #116629 has finished for PR 27185 at commit
|
Gentle ping @cloud-fan @gengliangwang @maropu @dongjoon-hyun @kiszk @wangyum |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
Outdated
Show resolved
Hide resolved
@@ -108,9 +108,16 @@ case class CreateViewCommand( | |||
verifyTemporaryObjectsNotExists(catalog) | |||
|
|||
if (viewType == LocalTempView) { | |||
if (replace && catalog.getTempView(name.table).isDefined) { | |||
sparkSession.catalog.uncacheTable(name.quotedString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about showing an INFO log for this case instead of implicitly uncaching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to adding INFO log, but not removing the uncacheTable
here, because after replacing, there is no way to drop the old cached data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't mean so and its ok to just add logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added.
Test build #116668 has finished for PR 27185 at commit
|
Gentle ping @cloud-fan @HyukjinKwon @maropu @dongjoon-hyun @kiszk @srowen |
Seems reasonable to me? |
I think it makes sense, but we should follow how similar things are done in DROP TABLE. |
I think they are similar with DROP TABLE
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName), cascade = !isTempView)
} catch {
case NonFatal(e) => log.warn(e.toString, e)
} I added the spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala Line 114 in 69ab94f
override def uncacheTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val cascade = !sessionCatalog.isTemporaryTable(tableIdent)
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade)
} Can I just use |
Test build #117717 has finished for PR 27185 at commit
|
shall we further generalize it? Currently we un-cache tables in several commands like DROP TABLE, TRUNCATE TABLE, etc. and now we find more missing places like CREATE VIEW. Instead of un-caching tables in the commands, I feel it's better to do it in low-level basic operations like |
To do that, |
Oh, checked code. |
ah I see. |
I think
Can we move sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala to catalyst package? |
The entry point of |
Yes. Another way is adding an But I am not sure whether or not worth to touch the interface |
retest please |
retest this please |
Test build #119788 has finished for PR 27185 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
Test build #120127 has finished for PR 27185 at commit
|
Test build #120128 has finished for PR 27185 at commit
|
@@ -21,7 +21,6 @@ import java.util.Locale | |||
|
|||
import org.apache.spark.sql.{Dataset, Row, SparkSession} | |||
import org.apache.spark.sql.catalyst.TableIdentifier | |||
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we revert this change in this file from this PR?
This kind of irrelevant change makes the backport difficult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
Also, @LantaoJin . Please fix the PR description. For example, the following doesn't work.
|
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty) | ||
sql("cache table tempView") | ||
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) | ||
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be redundant due to line 1133.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty) | ||
sql("cache table global_temp.tempGlobalTempView") | ||
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) | ||
assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be redundant due to 1145.
// | | ||
// + Project[1 AS 1] | ||
spark.sharedState.cacheManager.uncacheQuery(spark.table("view1"), cascade = false) | ||
assert(spark.sharedState.cacheManager.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an invalid test scope because this is testing uncacheQuery
at line 1164 instead of create or replace view
. Shall we revise this whole test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1164 and 1165 is to make sure our patch is working, no cached data leak for persisted view. Without this patch, the assert 1165 will fails. So I don't think it's an invalid test.
@@ -1122,4 +1122,47 @@ class CachedTableSuite extends QueryTest with SQLTestUtils | |||
assert(!spark.catalog.isCached("t1")) | |||
} | |||
} | |||
|
|||
test("SPARK-30494 avoid duplicated cached RDD when replace an existing view") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be misleading because the cached RDD is not duplicated. Two cached RDD are independent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. how about "Fix the leak of cached data when replace an existing view"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @LantaoJin . Could you revise the PR once more? Thanks!
Ah, you meant the
Ah I get it. |
Test build #120160 has finished for PR 27185 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @LantaoJin and all!
Merged to master/3.0.
…ng view ### What changes were proposed in this pull request? The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak. We can reproduce by below commands: ``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("create or replace temporary view temp1 as select 1") scala> spark.sql("cache table temp1") scala> spark.sql("create or replace temporary view temp1 as select 1, 2") scala> spark.sql("cache table temp1") scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined) ``` ### Why are the changes needed? Fix the memory leak, specially for long running mode. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add an unit test. Closes #27185 from LantaoJin/SPARK-30494. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 929b794) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Hi, @LantaoJin . Could you make a backport PR against |
…ng view ### What changes were proposed in this pull request? The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak. We can reproduce by below commands: ``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("create or replace temporary view temp1 as select 1") scala> spark.sql("cache table temp1") scala> spark.sql("create or replace temporary view temp1 as select 1, 2") scala> spark.sql("cache table temp1") scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined) ``` ### Why are the changes needed? Fix the memory leak, specially for long running mode. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add an unit test. Closes apache#27185 from LantaoJin/SPARK-30494. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 929b794)
…xisting view ### What changes were proposed in this pull request? This is backport of #27185 to branch-2.4. The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak. We can reproduce by below commands: ``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("create or replace temporary view temp1 as select 1") scala> spark.sql("cache table temp1") scala> spark.sql("create or replace temporary view temp1 as select 1, 2") scala> spark.sql("cache table temp1") scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined) ``` ### Why are the changes needed? Fix the memory leak, specially for long running mode. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a unit test. Closes #28000 from LantaoJin/SPARK-30494_2.4. Lead-authored-by: lajin <lajin@ebay.com> Co-authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ng view ### What changes were proposed in this pull request? The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak. We can reproduce by below commands: ``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("create or replace temporary view temp1 as select 1") scala> spark.sql("cache table temp1") scala> spark.sql("create or replace temporary view temp1 as select 1, 2") scala> spark.sql("cache table temp1") scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1, 2")).isDefined) scala> assert(spark.sharedState.cacheManager.lookupCachedData(sql("select 1")).isDefined) ``` ### Why are the changes needed? Fix the memory leak, specially for long running mode. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add an unit test. Closes apache#27185 from LantaoJin/SPARK-30494. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
The cached RDD for plan "select 1" stays in memory forever until the session close. This cached data cannot be used since the view temp1 has been replaced by another plan. It's a memory leak.
We can reproduce by below commands:
Why are the changes needed?
Fix the memory leak, specially for long running mode.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add an unit test.