[SPARK-31344][CORE] Polish implementation of barrier() and allGather()#28117
[SPARK-31344][CORE] Polish implementation of barrier() and allGather()#28117Ngone51 wants to merge 5 commits intoapache:masterfrom
Conversation
|
This PR tried to address comment #27395 (review) , but please note that we can not simply build rdd.barrier().mapPartitions { iter =>
val context = BarrierTaskContext.get()
val partitionId = context.partitionId
if (partitionId == 0) {
context.barrier()
} else {
context.allGather(partitionId.toString)
}
iter
}ping @sarthfrey @jiangxb1987 @mengxr Please take a look, thanks! |
|
Test build #120795 has finished for PR 28117 at commit
|
|
Test build #120796 has finished for PR 28117 at commit
|
|
Test build #120797 has finished for PR 28117 at commit
|
| private[spark] object RequestMethod extends Enumeration { | ||
| val BARRIER, ALL_GATHER = Value | ||
| } | ||
|
|
There was a problem hiding this comment.
TODO: remove redundant empty line.
| () | ||
| } | ||
| def barrier(): Unit = runBarrier("", RequestMethod.BARRIER) | ||
|
|
|
|
||
| // An Array of allGather messages for barrier tasks that have made a blocking runBarrier() call | ||
| private val allGatherMessages: ArrayBuffer[String] = new Array[String](numTasks).to[ArrayBuffer] | ||
| // Messages from each barrier task that have made a blocking runBarrier() call. And it will be |
There was a problem hiding this comment.
nit: And it -> The messages
|
Test build #120905 has finished for PR 28117 at commit
|
|
Ping @mengxr @jiangxb1987 |
|
LGTM, I'll wait for another couple days before merge this PR. |
|
retest this please |
|
Test build #121307 has finished for PR 28117 at commit
|
|
retest this please |
|
Test build #121332 has finished for PR 28117 at commit
|
|
retest this please |
|
Test build #121372 has finished for PR 28117 at commit
|
|
retest this please |
|
Test build #121382 has finished for PR 28117 at commit
|
|
retest this please |
|
Test build #121384 has finished for PR 28117 at commit
|
|
Thanks, merged to master! |
|
thanks all! |
### What changes were proposed in this pull request? 1. Combine `BarrierRequestToSync` and `AllGatherRequestToSync` into `RequestToSync`, which is distinguished by `RequestMethod` type. 2. Remove unnecessary Json serialization/deserialization 3. Clean up some codes to make runBarrier() and `BarrierCoordinator` more general 4. Remove unused imports. ### Why are the changes needed? To make codes simpler for better maintain in the future. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? This is pure code refactor, so should be covered by existed tests. Closes apache#28117 from Ngone51/refactor_barrier. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
What changes were proposed in this pull request?
Combine
BarrierRequestToSyncandAllGatherRequestToSyncintoRequestToSync, which is distinguished byRequestMethodtype.Remove unnecessary Json serialization/deserialization
Clean up some codes to make runBarrier() and
BarrierCoordinatormore generalRemove unused imports.
Why are the changes needed?
To make codes simpler for better maintain in the future.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This is pure code refactor, so should be covered by existed tests.