-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33075][SQL] Enable auto bucketed scan by default (disable only for cached query) #30138
Conversation
cc @cloud-fan , @maropu and @viirya if you guys have time to take a look, thanks. This is the followup from #29804 . |
Kubernetes integration test starting |
if (!session.sessionState.conf.adaptiveExecutionEnabled) { | ||
def getOrCloneSessionWithConfigsOff( | ||
session: SparkSession, | ||
configurations: Seq[String]): SparkSession = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to be more type safe, how about Seq[ConfigEntry[Boolean]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, updated, it's safer.
withTable("t1") { | ||
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { | ||
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") | ||
sql("CACHE TABLE tempTable AS SELECT i FROM t1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just CACHE TABLE t1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way is fine for me, if you think it's too redundant I can also change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea let's be simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can also save the uncache
at the end, as the table will be dropped at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.
Kubernetes integration test status failure |
* 1. AQE | ||
* 2. Automatic bucketed table scan | ||
*/ | ||
private val configsOff = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about configsOff
-> forceDisableConfigs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - sure, updated.
val inMemoryRelation = sessionWithAqeOff.withActive { | ||
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache) | ||
// Turn off configs so that the outputPartitioning of the underlying plan can be leveraged. | ||
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems we don't this line break;
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(query.sparkSession, configsOff)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - updated.
*/ | ||
def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = { | ||
if (!session.sessionState.conf.adaptiveExecutionEnabled) { | ||
def getOrCloneSessionWithConfigsOff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this method is not only for AQE now, could you move this method into a more suitable place, e.g., object SparkSessoin
or somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 move to other general object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good it makes sense to me, moved to object SparkSessoin
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is old, but why all the configurations (e.g., AQE) must be disabled for CacheManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because performance regression can happen. Could you check the previous discussion, e.g., #29804 (comment) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel introducing the configs to enable them (i.e. allow user to enable AQE for cached query, or allow user to enable aoth bucketed scan for cached query) is dangerous, as user can cause correctness bug to their pipeline if using them blindly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c21 I have such thought at first, but cann't find a negative case. Can you point out a case that can cause correctness bug ?
If I don's miss something, it just affect the perfermance about extra shuffle. For correctness, let's assuming a cache plan with AQE enabled:
- For lazy cache. the AQE framework will ensure the correctness of the new query with the cached plan .
- For force cache. if the output paritioning or ordering of cached plan has been affected by AQE then Spark will use
EnsureRequirements
to promise the correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to correctness? I thought this was performance related because they can change output partitions implicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you, @maropu - sorry my bad. This and AQE is for performance only, but not correctness. Then i am find with either adding or not adding another config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you join the discussion in https://issues.apache.org/jira/browse/SPARK-35332 ? I thinks the jira ticket is related to this topic.
Test build #130197 has finished for PR 30138 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address all comments and the PR is ready for review again, thanks. cc @cloud-fan , @maropu and @viirya .
* 1. AQE | ||
* 2. Automatic bucketed table scan | ||
*/ | ||
private val configsOff = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - sure, updated.
val inMemoryRelation = sessionWithAqeOff.withActive { | ||
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache) | ||
// Turn off configs so that the outputPartitioning of the underlying plan can be leveraged. | ||
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - updated.
*/ | ||
def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = { | ||
if (!session.sessionState.conf.adaptiveExecutionEnabled) { | ||
def getOrCloneSessionWithConfigsOff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good it makes sense to me, moved to object SparkSessoin
.
if (!session.sessionState.conf.adaptiveExecutionEnabled) { | ||
def getOrCloneSessionWithConfigsOff( | ||
session: SparkSession, | ||
configurations: Seq[String]): SparkSession = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, updated, it's safer.
withTable("t1") { | ||
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { | ||
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") | ||
sql("CACHE TABLE tempTable AS SELECT i FROM t1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure, updated.
Test build #130216 has finished for PR 30138 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
Test build #130217 has finished for PR 30138 at commit
|
* | ||
* @since 3.1.0 | ||
*/ | ||
def getOrCloneSessionWithConfigsOff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[spark]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why we need to add this? what's the issue we are preventing? also why private[spark]
but not private[sql]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, private[sql]
is better. I don't think we should expose this as public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya - sure, updated.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130228 has finished for PR 30138 at commit
|
@cloud-fan - wondering do you think if the PR is ready to go? Thanks. |
val sessionWithAqeOff = getOrCloneSessionWithAqeOff(query.sparkSession) | ||
val inMemoryRelation = sessionWithAqeOff.withActive { | ||
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache) | ||
// Turn off configs so that the outputPartitioning of the underlying plan can be leveraged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this comment seems duplicated with above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya - removed.
Test build #130262 has finished for PR 30138 at commit
|
retest this please |
* Returns a cloned SparkSession with all specified configurations disabled, or | ||
* the original SparkSession if all configurations are already disabled. | ||
* | ||
* @since 3.1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed for internal APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - removed.
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Thanks! Merged to master. |
Test build #130266 has finished for PR 30138 at commit
|
Test build #130270 has finished for PR 30138 at commit
|
Thanks @maropu , @viirya and @cloud-fan for review! |
… for cached query) ### What changes were proposed in this pull request? This PR is to enable auto bucketed table scan by default, with exception to only disable for cached query (similar to AQE). The reason why disabling auto scan for cached query is that, the cached query output partitioning can be leveraged later to avoid shuffle and sort when doing join and aggregate. ### Why are the changes needed? Enable auto bucketed table scan by default is useful as it can optimize query automatically under the hood, without users interaction. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test for cached query in `DisableUnnecessaryBucketedScanSuite.scala`. Also change a bunch of unit tests which should disable auto bucketed scan to make them work. Closes apache#30138 from c21/enable-auto-bucket. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
This PR is to enable auto bucketed table scan by default, with exception to only disable for cached query (similar to AQE). The reason why disabling auto scan for cached query is that, the cached query output partitioning can be leveraged later to avoid shuffle and sort when doing join and aggregate.
Why are the changes needed?
Enable auto bucketed table scan by default is useful as it can optimize query automatically under the hood, without users interaction.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test for cached query in
DisableUnnecessaryBucketedScanSuite.scala
. Also change a bunch of unit tests which should disable auto bucketed scan to make them work.