-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35332][SQL] Make cache plan disable configs configurable #32482
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138303 has finished for PR 32482 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @cloud-fan @c21
.doc("Configurations needs to be turned off, to avoid regression for cached query, so that " + | ||
"the outputPartitioning of the underlying cached query plan can be leveraged later.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is for developers, so it is difficult for a user to understand it. Could you brush up it more?
.checkValue(_.forall(v => sqlConfEntries.containsKey(v) && | ||
sqlConfEntries.get(v).defaultValue.exists(_.isInstanceOf[Boolean])), | ||
"config should be boolean type") | ||
.createWithDefault(Seq(ADAPTIVE_EXECUTION_ENABLED.key, AUTO_BUCKETED_SCAN_ENABLED.key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any usecase to turn off these rules separately? I think it's okay just to use a boolean flag for this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I agree with starting with simpler config format if possible.
@@ -1554,4 +1554,39 @@ class CachedTableSuite extends QueryTest with SQLTestUtils | |||
assert(!spark.catalog.isCached(viewName)) | |||
} | |||
} | |||
|
|||
test("SPARK-35332: Make cache plan disable configs configurable") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests for more patterns? e.g.,
sql("""SET spark.sql.cache.disableConfigs=spark.sql.adaptive.enabled""")
sql("CACHE TABLE test_table1 AS <query 1>")
spark.table("test_table1").explain(true) <= AQE disabled
sql("""SET spark.sql.cache.disableConfigs=""")
sql("CACHE TABLE test_table2 AS <query 2>")
spark.table("test_table2").explain(true) <= AQE enabled
spark.table("test_table1").explain(true) <= AQE disabled
sql("CACHE TABLE test_table3 AS <query 1>")
spark.table("test_table3").explain(true) <= AQE disabled
@@ -1090,6 +1090,18 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val CACHE_DISABLE_CONFIGS = | |||
buildConf("spark.sql.cache.disableConfigs") | |||
.doc("Configurations needs to be turned off, to avoid regression for cached query, so that " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, internal?
.internal()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we expose something to the users, we have to consider the side-effects when the users make mistake. What happen when they touch this config in a wrong way? For example, the case removing ADAPTIVE_EXECUTION_ENABLED
from the conf. Is it okay and safe to expose this?
Also, cc @sunchao since this is related to the bucketed scan too.
}.getMessage | ||
assert(msg.contains("config should be boolean type")) | ||
|
||
Seq(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "").foreach { disableConfig => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we also test for auto bucketed scan? thanks.
.checkValue(_.forall(v => sqlConfEntries.containsKey(v) && | ||
sqlConfEntries.get(v).defaultValue.exists(_.isInstanceOf[Boolean])), | ||
"config should be boolean type") | ||
.createWithDefault(Seq(ADAPTIVE_EXECUTION_ENABLED.key, AUTO_BUCKETED_SCAN_ENABLED.key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I agree with starting with simpler config format if possible.
Thank you @maropu @c21 @dongjoon-hyun . Agree, the current config seems overkill to user, it's better to just make it as Refactor this PR to address:
|
@@ -1175,7 +1175,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils | |||
} | |||
|
|||
test("cache supports for intervals") { | |||
withTable("interval_cache") { | |||
withTable("interval_cache", "t1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related this pr, but affected the new added test with t1
.
Kubernetes integration test starting |
Kubernetes integration test status failure |
In general, I think it's better to optimize the cached plan more aggressively for better performance, even though it may cause perf regression due to output partitioning change, which should be rare. About the config name, how about |
cc @viirya as well as he was finding the bug and raised the concern for auto bucketed scan for cached query. |
.doc(s"When true, some configs are disabled during executing cache plan that is to avoid " + | ||
s"performance regression if other queries hit the cached plan. Currently, the disabled " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't need s"".
*/ | ||
private[sql] def getOrCloneSessionWithConfigsOff( | ||
session: SparkSession, | ||
configurations: Seq[ConfigEntry[Boolean]]): SparkSession = { | ||
val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) | ||
if (configsEnabled.isEmpty) { | ||
if (!session.sessionState.conf.getConf(SQLConf.CACHE_DISABLE_CONFIGS_ENABLED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If getOrCloneSessionWithConfigsOff
is used for disabling other configs? It is also automatically under control by CACHE_DISABLE_CONFIGS_ENABLED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, it would introduce potential issue in future. Moved to CacheManager
.
@@ -1069,21 +1069,26 @@ object SparkSession extends Logging { | |||
} | |||
|
|||
/** | |||
* Returns a cloned SparkSession with all specified configurations disabled, or | |||
* the original SparkSession if all configurations are already disabled. | |||
* When CACHE_DISABLE_CONFIGS_ENABLED is enabled, returns a cloned SparkSession with all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This getOrCloneSessionWithConfigsOff
is not claimed to be only used for cache plan. It is a bit logically wrong to have a so called cache disable config here.
So it sounds like this wants to have an aggressive option when preparing cache plan, right? The config name can be refined actually. The first glance confuses me. |
Test build #138316 has finished for PR 32482 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138349 has finished for PR 32482 at commit
|
Kubernetes integration test starting |
sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") | ||
assert(spark.table("t1").rdd.partitions.length == 2) | ||
|
||
sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's turn each of these SET ...
to withSQLConf
Test build #138355 has finished for PR 32482 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138371 has finished for PR 32482 at commit
|
Test build #138373 has finished for PR 32482 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine otherwise.
val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING = | ||
buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning") | ||
.internal() | ||
.doc(s"When false, some configs are disabled during executing cache plan that is to avoid " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
Whether to forcibly enable some optimization rules that can change the output partitioning
of a cached query when executing it for caching. If it is set to true,
queries may need an extra shuffle to read the cached data. This configuration is disabled by default.
Currently, the optimization rules enabled by this configuration are
${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { | ||
|
||
withTempView("t1", "t2", "t3") { | ||
withCache("t1", "t2", "t3") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTempView
drops caches, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, removed it.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138449 has finished for PR 32482 at commit
|
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") | ||
assert(spark.table("t2").rdd.partitions.length == 1) | ||
|
||
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this one nested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm expecting something like
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
test1
}
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
test2
}
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") {
test3
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it.
sql("CACHE TABLE t2") | ||
assert(spark.table("t2").rdd.partitions.length == 1) | ||
|
||
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138504 has finished for PR 32482 at commit
|
thanks, merging to master! |
@@ -328,4 +326,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |||
if (needToRefresh) fileIndex.refresh() | |||
needToRefresh | |||
} | |||
|
|||
/** | |||
* If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled, just return original session. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled" -> do we mean enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, forgot to fix the comment, create followup #32543.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with one minor comment.
What changes were proposed in this pull request?
Add a new config to make cache plan disable configs configurable.
Why are the changes needed?
The disable configs of cache plan if to avoid the perfermance regression, but not all the query will slow than before due to AQE or bucket scan enabled. It's useful to make a new config so that user can decide if some configs should be disabled during cache plan.
Does this PR introduce any user-facing change?
Yes, a new config.
How was this patch tested?
Add test.