-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution #24978
Conversation
Test build #106949 has finished for PR 24978 at commit
|
.doc("When true, enable runtime query re-optimization.") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an umbrella config, let's create a new config for ReduceNumShufflePartitions
. How about spark.sql.adaptive.reducePostShufflePartitions.enabled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Being a bit pedantic here, but let's try to do orthogonal changes like removing the RUNTIME_REOPTIMIZATION_ENABLED
flag in separate PRs. That makes the cognitive load of reviewing lower, and it also improves git blame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree to add a config for ReduceNumShufflePartitions
. I was removing RUNTIME_REOPTIMIZATION_ENABLED
because RUNTIME_REOPTIMIZATION_ENABLED
and ADAPTIVE_EXECUTION_ENABLED
look like the same config now. I didn't see a way to enable ReduceNumShufflePartitions
but disable runtime reoptimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The argument of not changing RUNTIME_REOPTIMIZATION_ENABLED
to ADAPTIVE_EXECUTION_ENABLED
is just to keep this PR independent and smaller.
Whatever name it is, we should have an umbrella flag that turns everything off. We could add another flag, though, just to turn off the SMJ -> BHJ runtime optimization, although we might have to do it in a slightly trickier way given that this opt comes naturally with the framework itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, I will use RUNTIME_REOPTIMIZATION_ENABLED
as the umbrella flag in this PR.
var newPartitioning: Partitioning, | ||
child: SparkPlan, | ||
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange { | ||
desiredPartitioning: Partitioning, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputPartitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
|
||
def minNumPostShufflePartitions: Int = | ||
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) | ||
|
||
def maxNumPostShufflePartitions: Int = | ||
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to set the default value here to something much higher (Int.Max?). The upper bound here is determined by the shuffles for which we are trying to reduce the number of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually use this as the initial shuffle partition number, which will be set in the ShuffleExchangeExec
. So this is expected to be a reasonable value, instead of Int.Max. This seems to be a little misleading if a user is not aware it is used as the initial shuffle partition number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we have two confs for adaptive and non-adaptive execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
People usually already have tuned spark.sql.shuffle.partitions
for each workload they run periodically. The initial shuffle partition number for adaptive execution allows user to set a global value (relative large) that can work for all queries. If user doesn't set it, it will default to spark.sql.shuffle.partitions
.
ThreadUtils.awaitResult(metricsFuture, Duration.Zero) | ||
} | ||
|
||
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about first collecting all the leaves, then checking if they are all QueryStageExec
nodes, and if they are collect the map stats outputs? That seems more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => | ||
val metricsFuture = stage.mapOutputStatisticsFuture | ||
assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready") | ||
ThreadUtils.awaitResult(metricsFuture, Duration.Zero) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, the shuffle query stage is already ready, why we still call the ThreadUtils.awaitResult
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we need to get the value from a Future
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for your explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future.get()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a scala Future and future.get()
seems to be only available for Java Future.
ThreadUtils.awaitResult(metricsFuture, Duration.Zero) | ||
} | ||
|
||
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the leaf node is BroadcastQueryStageExec
, Do we need to adjust the reduce number ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are ignoring BroadcastQueryStageExec
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have situations with BHJ where:
- The BHJ is from compile-time and one side is a broadcast stage;
- The BHJ is from a former AQE optimization, then one side is shuffle stage and the other is a broadcast stage.
I believe scenario 2 may still be applicable here, so maybe we can change the condition to " isAllQueryStage && shuffleStageCount > 0"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obviously we don't want to adjust num shuffle partitions if there is no shuffle in this stage, so +1 to change the condition to isAllQueryStage && shuffleStageCount > 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we are safe here. After checking isAllQueryStage
, we will get shuffle metrics from shuffle stages and we will only adjust num shuffle partitions if num shuffle metrics > 0.
// we should skip it when calculating the `partitionStartIndices`. | ||
val validMetrics = shuffleMetrics.filter(_ != null) | ||
if (validMetrics.nonEmpty) { | ||
val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need add a check of whether all the pre-shuffle partitions is same before calling the estimatePartitionStartIndices
method to avoid the assert exception in Line 130.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll add the check and the unit test.
// number of output partitions. | ||
case stage: ShuffleQueryStageExec => | ||
CoalescedShuffleReaderExec(stage, partitionStartIndices) | ||
case reused: ReusedQueryStageExec if reused.plan.isInstanceOf[ShuffleQueryStageExec] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use pattern match here as in other places too?
Alternatively, as we already have BroadcastQueryStageExec.isBroadcastQueryStageExec
, we could accordingly add ShuffleQueryStageExec.isShuffleQueryStageExec
and replace all such occurrences with just one "case".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
Test build #107043 has finished for PR 24978 at commit
|
Test build #107045 has finished for PR 24978 at commit
|
Test build #107047 has finished for PR 24978 at commit
|
test this please |
Test build #107053 has finished for PR 24978 at commit
|
} | ||
private def defaultNumPreShufflePartitions: Int = | ||
if (conf.adaptiveExecutionEnabled) { | ||
conf.maxNumPostShufflePartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mention it in the config doc that this config is used as the initial num pre shuffle partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added that.
LGTM |
Some general comments:
|
@carsonwang @maryannxue I encountered the following exception with q1 of TPC-DS in yarn mode when enable or disable |
The full log info is in google doc. |
return plan | ||
} | ||
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { | ||
// If not all leaf nodes are query stages, it's not safe to reduce the number of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean the case like https://github.com/Intel-bigdata/spark-adaptive/pull/54 ?
:nit Maybe we can say If not all leaf nodes are query stages, for example when some depending stage is already bucketed, there isn't a shuffle or query stage for that branch
to be clear because normally all leaves are QS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that case is covered. There are many cases the leaves are not query stage, for example in the stage that does a table scan, the leaf is a table scan operator. The comment seems to cover these.
val distinctNumPreShufflePartitions = | ||
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct | ||
|
||
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a user calls val datasetAfter = datasetBefore.repartition(300)
, then he wants the post-shuffle partitionNum to be 300 exactly. But in this case we will go inside this branch, and then the partition merging on reduce side will break the user's expectation? cc @cloud-fan
Should we check the original operation, if it's repartition
we don't do Adaptive Execution then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was also discussed in the original PR. For df.repartition(500), AE does use the specified number for repartition. That is, each stage will writes the map output with 500 partitions. However, in the following up stage, AE may launch tasks less than 500 as one task can process multiple continues blocks. I think we can still do adaptive execution when it is possible. We can add a option to disable AE for repartition later if that is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty good to add a config entry, I think it's the best to make it internal(hidden). At the same time, we can add a function called repartitonWithAdvice
, the different between it and repartition
is whether the newly added config entry is set to true.
RepartitionWithAdvice
means enabling AdaptiveExecution, to be different from the accurate repartition
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have this problem in ExchangeCoordinator
? At physical phase we have no idea if the shuffle comes from a df.repartition
or is added by EnsureRequirements
. We may need to add a boolean flag in ShuffleExchangeExec
to indicate that this shuffle can't change its num partitions. Maybe we can fix it in a followup.
// We may get different pre-shuffle partition number if user calls repartition manually. | ||
// We don't reduce shuffle partition number in that case. | ||
val distinctNumPreShufflePartitions = | ||
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 conditions we don't do Adaptive Execution, so plan
is directly returned.
Maybe better to aggregate these conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by aggregate ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind~
@maryannxue I am curious about why do we need a separate switch for SMJ->BHJ? In the new adaptive execution with re-optimization logic, there can be tons of switches we can add... |
@JkSelf did you use external shuffle service? master branch is incompatible with external shuffle service 2.x |
@cloud-fan Thanks for your quick reply. Yes, I used the external shuffle service. And the above exception is gone when disable external shuffle service. I am curious about why the master branch is incompatible with external shuffle service? Thanks. |
// have same number of output partitions. | ||
plan | ||
} else { | ||
val shuffleStages = plan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not work well with Union right? Is there a way we can identify subtrees in the stage where all the input partitions are the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, Union may have child stages with different partition number, see the unit test added. Currently we check the shuffle metrics to ensure the pre-shuffle partition number of the stages are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can get pretty complicated if we have multiple Union and Join in one stage. For now I'm OK with this simple approach that we always require all shuffles in one stage have the same number of partitions. BTW this is an existing issue.
|
||
override def apply(plan: SparkPlan): SparkPlan = { | ||
if (!conf.reducePostShufflePartitionsEnabled) { | ||
return plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without return, I'll need a else with more indentation. I saw a few rules write like this, so I was just copying that.
nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) | ||
j += 1 | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var nextShuffleInputSize = 0L
var j = 0
while (j < mapOutputStatistics.length) {
nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
j += 1
}
=>
val nextShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId(i)).sum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was original there in ExchangeCoordinator
. Not sure if it is in purpose to avoid creating many additional arrays when we write as you proposed. So let's leave it as is.
@@ -92,6 +94,30 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { | |||
} | |||
} | |||
|
|||
test("Change merge join to broadcast join and reduce number of shuffle partitions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue, added an end2end test where SMJ->BHJ and reduce-shuffle-num happen at the same time . Actually REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED
is enabled by default, so other test cases also reduce shuffle partition numbers if we don't disable it. But that doesn't seem to be a problem.
Test build #107133 has finished for PR 24978 at commit
|
Test build #107137 has finished for PR 24978 at commit
|
test this please |
Test build #107157 has finished for PR 24978 at commit
|
@@ -82,6 +83,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { | |||
test("Change merge join to broadcast join") { | |||
withSQLConf( | |||
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", | |||
SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "false", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the same test with REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED
set to true and checked the post shuffle partition. So I set it to false in this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary, but if you do wanna test it with "false", how about do a combo: Seq("true", "false").foreach
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me just remove this for now.
Test this please |
Test build #107206 has finished for PR 24978 at commit
|
thanks, merging to master! |
Thank you all! |
…xecution This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in apache#24706. This rule is used to adjust the post shuffle partitions based on the map output statistics. Added ReduceNumShufflePartitionsSuite Closes apache#24978 from carsonwang/reduceNumShufflePartitions. Authored-by: Carson Wang <carson.wang@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in #24706. This rule is used to adjust the post shuffle partitions based on the map output statistics.
How was this patch tested?
Added ReduceNumShufflePartitionsSuite