-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution #25295
Conversation
...ore/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizedLocalShuffleReader.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
Outdated
Show resolved
Hide resolved
We have done the functionality and performance tests in 3TB TPC-DS. And the result is shown in here. Q82 can show 1.76x performance improvement with this PR. And no queries have significant performance degradation. |
8a2c04f
to
ec30466
Compare
ec30466
to
a27a8b6
Compare
fixed the conflicts. |
@cloud-fan Can you help review if you have available time? Thanks for your help very much. |
Should this be a general optimization? When a reduce task needs to read some shuffle blocks that are happened to exist locally, we can read the shuffle files directly instead of going through the shuffle service. |
@cloud-fan Thanks for you reviews! When the shuffle blocks exist locally, the shuffle service already read the blocks locally even through shuffle service in ShuffleBlockFetcherIterator, I think. Correct me if wrong understanding! If so, whether need to optimize it to locally read? |
You are right, local shuffle blocks are already optimized. I wanted to say the host-local shuffle blocks. Anyway, seems it's not very related to what you are trying to do here. |
@@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { | |||
|
|||
case class CoalescedShuffleReaderExec( | |||
child: QueryStageExec, | |||
partitionStartIndices: Array[Int]) extends UnaryExecNode { | |||
partitionStartIndices: Array[Int], | |||
var isLocal: Boolean = false) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReduceNumShufflePartitions
and local shuffle reader are two different optimizations, and they are conflicting:
ReduceNumShufflePartitions
adjusts the numPartitions by assuming the partitions are post-shuffle partitions. Their data size depends on the shuffle blocks they need to read. If we change the shuffle to local shuffle reader, then the partitions become pre-shuffle partitions, and their data size is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change the shuffle to local shuffle reader, then the partitions become pre-shuffle partitions, and their data size is different.
@cloud-fan Here the local shuffle reader is still optimize the post-shuffle partitions. And I don't understand why the partitions become pre-shuffle partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without local shuffle reader, a task of ShuffledRDD
reads the shuffle blocks map1-reduce1
, map2-reduce1
, etc. With local shuffle reader, the task reads map1-reduce1
, map1-reduce2
, etc. The task output data size is different and we can't use the algorithm in ReduceNumShufflePartitions
anymore.
Furthermore, the RDD numPartitions also becomes different after switching to local shuffle reader, how can we apply the ReduceNumShufflePartitions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, Got it. In order to make code more clear, I will create LocalShuffleReaderExec
later. Thanks.
I think this is a good idea, but the implementation needs more polishing. What I expect to see is:
I'm a little worried to see the invasive changes in the underlying shuffle component. Can you briefly explain how your special |
BTW, local shuffle reader doesn't need to cooperate with other shuffle nodes in the same shuffle stage. We can adjust the numPartitions of local shuffle reader freely. This can be a followup. |
var dependency: ShuffleDependency[Int, InternalRow, InternalRow], | ||
metrics: Map[String, SQLMetric], | ||
specifiedPartitionStartIndices: Option[Array[Int]] = None, | ||
specifiedPartitionEndIndices: Option[Array[Int]] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a usage of specifiedPartitionEndIndices in current change. Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya
Currently not. We may need the specifiedPartitionEndIndices
variable to skip the partitions with 0 size in the following optimization. And I will retain and use it when create LocalShuffledRowRDD
later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then let's add it when you propose this optimization.
From my side, I think it may be beneficial to keep empty tasks, so that the local shuffle reader node can retain the output partitioning from the original plan and help us eliminate shuffles.
@@ -180,25 +180,45 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { | |||
|
|||
case class CoalescedShuffleReaderExec( | |||
child: QueryStageExec, | |||
partitionStartIndices: Array[Int]) extends UnaryExecNode { | |||
partitionStartIndices: Array[Int], | |||
var isLocal: Boolean = false) extends UnaryExecNode { | |||
|
|||
override def output: Seq[Attribute] = child.output | |||
|
|||
override def doCanonicalize(): SparkPlan = child.canonicalized | |||
|
|||
override def outputPartitioning: Partitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to override requiredChildDistribution, if isLocal is true?
I saw you check if there are additional shuffle exchange added by EnsureRequirements, to decide if local shuffle reader works or not. If don't change requiredChildDistribution, will EnsureRequirements bring additional shuffle exchange?
Maybe I miss anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya
Maybe not override requiredChildDistribution
. Because the requiredChildDistribution
of CoalescedShuffleReaderExec
is UnspecificedDistribution
whether the isLocal
is true
or false
, the EnsureRequirements
will not introduce the additional shuffle exchange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, don't you rely on see if EnsureRequirements introduces additional shuffle exchange, to decide doing local shuffle reader or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I need.
@cloud-fan |
@cloud-fan |
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec} | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
case class OptimizedLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be a verb, OptimizeLocalShuffleReader
private def setIsLocalToFalse(shuffleStage: QueryStageExec): QueryStageExec = { | ||
shuffleStage match { | ||
case stage: ShuffleQueryStageExec => | ||
stage.isLocalShuffle = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if possible let's not add mutable states to the plan
} | ||
|
||
// Add the new `LocalShuffleReaderExec` node if the value of `isLocalShuffle` is true | ||
val newPlan = plan.transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we traverse the tree once?
def isShuffleStage(plan: SparkPlan): Boolean = plan match {
case _: ShuffleQueryStageExec => true
case ReusedQueryStageExec(_: ShuffleQueryStageExec) => true
case _ => false
}
def canUseLocalShuffleReaderLeft(j: BroadcastHashJoinExec): Boolean = {
j.buildSide = BuildLeft && isShuffleStage(j.left)
}
def canUseLocalShuffleReaderRight ...
...
plan transformDown {
case join: BroadcastHashJoinExec if canUseLocalShuffleReaderLeft(join) =>
val localShuffleReader = ...
join.copy(left = localShuffleReader)
...
}
val newPlan = plan.transformUp { | ||
case stage: ShuffleQueryStageExec if (stage.isLocalShuffle) => | ||
LocalShuffleReaderExec(stage) | ||
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) if (stage.isLocalShuffle) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not strip the ReusedQueryStageExec
@@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec( | |||
// optimizations should be stage-independent. | |||
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | |||
ReuseAdaptiveSubquery(conf, subqueryCache), | |||
OptimizedLocalShuffleReader(conf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this may change the number of exchanges, we should put it in queryStagePreparationRules
Then the AQE framework can check the cost and give up the optimization if extra changes are introduced.
Note that, the current approach (check number of exchanges at the end of rule) is suboptimal. It's possible that the local shuffle reader can avoid exchanges downstream, which changes the stage boundaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already move it in queryStagePreparationRules
.
|
||
override def doCanonicalize(): SparkPlan = child.canonicalized | ||
|
||
override def outputPartitioning: Partitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be child.outputPartitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here for local shuffle reader, the partition number of Partitioning is the number of mappers. and the partition number of child.outputPartitioning is the number of reducers. So how can be child.outputPartitioning
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I mean child.child.outputPartitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the child.child.outputPartitioning
is UnknowPartitioning(0)
and the partiiton number is not equal.
} | ||
|
||
case class LocalShuffleReaderExec( | ||
child: QueryStageExec) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make it a leaf node to hide its QueryStageExec
. We don't expect any other rules to change the underlying shuffle stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestions. Updated.
extends RDD[InternalRow](dependency.rdd.context, Nil) { | ||
|
||
private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions | ||
private[this] val numPostShufflePartitions = dependency.rdd.partitions.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is wrong. This is the # of mappers and thus should be called numPreShufflePartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here for the LocalShuffledRowRDD
, the number of mappers is the post shuffle partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, which "shuffle" we are talking about here? If number of mappers is the post shuffle partitions, then it means these mappers are also reducers and there is another shuffle upstream.
* @param dep shuffle dependency object | ||
* @param startMapId the start map id | ||
* @param endMapId the end map id | ||
* @return a sequence of locations that each includes both a host and an executor id on that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
includes both a host and an executor id
is confusing. We can just say task location strinng (please refer to TaskLocation)
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) | ||
// Connect the the InternalRows read by each ShuffleReader | ||
new Iterator[InternalRow] { | ||
val readers = partitionStartIndices.zip(partitionEndIndices).map { case (start, end) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point that some shuffle blocks are empty and we should skip them, but I think this optimization should be done by the shuffle implementation.
What we need to do here is simply asking the shuffle implementation to read the data for one mapper, with a simple API (e.g. getMapReader(handle, mapId, ...)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry for delay response. Resolve the comments. Please help me review again. Thanks.
|
||
override def doCanonicalize(): SparkPlan = child.canonicalized | ||
|
||
override def outputPartitioning: Partitioning = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the child.child.outputPartitioning
is UnknowPartitioning(0)
and the partiiton number is not equal.
} | ||
|
||
case class LocalShuffleReaderExec( | ||
child: QueryStageExec) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestions. Updated.
extends RDD[InternalRow](dependency.rdd.context, Nil) { | ||
|
||
private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions | ||
private[this] val numPostShufflePartitions = dependency.rdd.partitions.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here for the LocalShuffledRowRDD
, the number of mappers is the post shuffle partitions.
@@ -91,6 +91,7 @@ case class AdaptiveSparkPlanExec( | |||
// optimizations should be stage-independent. | |||
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | |||
ReuseAdaptiveSubquery(conf, subqueryCache), | |||
OptimizedLocalShuffleReader(conf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already move it in queryStagePreparationRules
.
6e451b8
to
763b84f
Compare
25fc43c
to
9c1dc55
Compare
retest this please |
handle.shuffleId, | ||
startPartition, | ||
endPartition) | ||
case (_) => throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
case Some(..) =>
case None =>
There are still 2 code style comments not addressed. I'll merge this PR if tests pass and we can address code style comments in a followup. |
Test build #112096 has finished for PR 25295 at commit
|
thanks, merging to master! |
### What changes were proposed in this pull request? A followup of #25295 This PR proposes a few code cleanups: 1. rename the special `getMapSizesByExecutorId` to `getMapSizesByMapIndex` 2. rename the parameter `mapId` to `mapIndex` as that's really a mapper index. 3. `BlockStoreShuffleReader` should take `blocksByAddress` directly instead of a map id. 4. rename `getMapReader` to `getReaderForOneMapper` to be more clearer. ### Why are the changes needed? make code easier to understand ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes #26128 from cloud-fan/followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val numExchangeAfter = numExchanges(EnsureRequirements(conf).apply(optimizedPlan)) | ||
|
||
if (numExchangeAfter > numExchangeBefore) { | ||
logWarning("OptimizeLocalShuffleReader rule is not applied due" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logDebug
should be enough.
Can we do a quick follow-up to address minor comments here?
|
|
### What changes were proposed in this pull request? A followup of [#25295](#25295). 1) change the logWarning to logDebug in `OptimizeLocalShuffleReader`. 2) update the test to check whether query stage reuse can work well with local shuffle reader. ### Why are the changes needed? make code robust ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes #26157 from JkSelf/followup-25295. Authored-by: jiake <ke.a.jia@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
} | ||
} | ||
|
||
case class LocalShuffleReaderExec(child: QueryStageExec) extends LeafExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to make LocalShuffleReaderExec
a LeafNode?
There's a potential issue here: we make it a leaf node yet did not visit this node in createQueryStages
. So a stage can be "not complete yet" but considered complete and thus trigger the creation of parent stages. This might be the root cause of the flaky tests.
case q: QueryStageExec =>
CreateStageResult(newPlan = q,
allChildStagesMaterialized = q.resultOption.isDefined, newStages = Seq.empty)
case _ =>
if (plan.children.isEmpty) {
CreateStageResult(newPlan = plan, allChildStagesMaterialized = true, newStages = Seq.empty)
} else {
val results = plan.children.map(createQueryStages)
CreateStageResult(
newPlan = plan.withNewChildren(results.map(_.newPlan)),
allChildStagesMaterialized = results.forall(_.allChildStagesMaterialized),
newStages = results.flatMap(_.newStages))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue Thanks for your good findings. I have create PR#26250 to fix.
Here the flaky tests may be not caused by this. It occurs by the random build side of planner for inner join. Even with PR#26250, the flaky tests still exists. Thanks.
…reader as far as possible in BroadcastHashJoin ### What changes were proposed in this pull request? [PR#25295](#25295) already implement the rule of converting the shuffle reader to local reader for the `BroadcastHashJoin` in probe side. This PR support converting the shuffle reader to local reader in build side. ### Why are the changes needed? Improve performance ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing unit tests Closes #26289 from JkSelf/supportTwoSideLocalReader. Authored-by: jiake <ke.a.jia@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Implement a rule in the new adaptive execution framework introduced in SPARK-23128. This rule is used to optimize the shuffle reader to local shuffle reader when smj is converted to bhj in adaptive execution.
How was this patch tested?
Existing tests