-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24817][Core] Implement BarrierTaskContext.barrier() #21898
Conversation
Test build #93685 has finished for PR 21898 at commit
|
de517f5
to
5c5db85
Compare
Test build #93706 has finished for PR 21898 at commit
|
retest this please |
Test build #93730 has finished for PR 21898 at commit
|
* } catch { | ||
* case e: Exception => logWarning("...", e) | ||
* } | ||
* context.barrier() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here should not be another context.barrier()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to demonstrate that in one task there is only one call of barrier()
while in others there may be two calls of barrier()
. Please refer to BarrierTaskContextSuite
."throw exception if barrier() call mismatched"
. However, I'm still considering what shall be the most proper behavior for this scenario.
try { | ||
barrierCoordinator.askSync[Unit]( | ||
message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, barrierEpoch), | ||
timeout = new RpcTimeout(31536000 /** = 3600 * 24 * 365 */ seconds, "barrierTimeout")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use BARRIER_SYNC_TIMEOUT
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set a fix timeout for RPC intentionally, so users shall get a SparkException thrown by BarrierCoordinator, instead of RPCTimeoutException from the RPC framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add an inline comment so readers understand why.
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " + | ||
"global sync successfully, waited for " + | ||
s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch is " + | ||
s"$barrierEpoch.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we stop timer
for this epoch here if the global sync finished successfully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! just updated.
Test build #93738 has finished for PR 21898 at commit
|
Test build #93737 has finished for PR 21898 at commit
|
retest this please |
Test build #93742 has finished for PR 21898 at commit
|
Test build #93743 has finished for PR 21898 at commit
|
retest this please |
Test build #93748 has finished for PR 21898 at commit
|
766381d
to
cb1861d
Compare
Test build #93785 has finished for PR 21898 at commit
|
|
||
// Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request | ||
// mismatches the barrier epoch in the coordinator. | ||
private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to use HashMap[(Int, Int), Int]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how about using AtomicLong
to remember the epoch?
|
||
private[spark] val BARRIER_SYNC_TIMEOUT = | ||
ConfigBuilder("spark.barrier.sync.timeout") | ||
.doc("The timeout in milliseconds for each barrier() call from a barrier task. If the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will users set this config in milliseconds? I feel seconds should be more common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -61,6 +61,9 @@ private[spark] trait TaskScheduler { | |||
*/ | |||
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean | |||
|
|||
// Kill all the running task attempts in a stage. | |||
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused here. Is it part of this PR? We should kill all task attempts in case of any task failures in a barrier stage, not limited to context.barrier() failures. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC killing all tasks is just the best effort, we can guarantee the tasks are all killed. Shall we tolerate this in the barrier scheduling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One high-level comment is to move fail all task attempts to a separate PR to make this one minimal.
import org.apache.spark.internal.Logging | ||
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} | ||
|
||
class BarrierCoordinator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- package private
- add ScalDoc
|
||
private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") | ||
|
||
// Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Epoch counter for each barrier (stage, attempt).
- Remove "fail ..." because it is not implemented by this variable.
|
||
// Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request | ||
// mismatches the barrier epoch in the coordinator. | ||
private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
// mismatches the barrier epoch in the coordinator. | ||
private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] | ||
|
||
// Any access to this should be synchronized. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then shall we switch to Java's ConcurrentHashMap?
|
||
// Any access to this should be synchronized. | ||
private val syncRequestsByStageIdAndAttempt = | ||
new HashMap[Int, HashMap[Int, ArrayBuffer[RpcCallContext]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto (stage, attempt) -> contexts.
|
||
private[spark] val BARRIER_SYNC_TIMEOUT = | ||
ConfigBuilder("spark.barrier.sync.timeout") | ||
.doc("The timeout in milliseconds for each barrier() call from a barrier task. If the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -61,6 +61,9 @@ private[spark] trait TaskScheduler { | |||
*/ | |||
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean | |||
|
|||
// Kill all the running task attempts in a stage. | |||
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused here. Is it part of this PR? We should kill all task attempts in case of any task failures in a barrier stage, not limited to context.barrier() failures. Right?
|
||
test("global sync by barrier() call") { | ||
val conf = new SparkConf() | ||
.setMaster("local-cluster[4, 1, 1024]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should comment why we need local cluster
|
||
val times = rdd2.collect() | ||
// All the tasks shall finish global sync within a short time slot. | ||
assert(times.max - times.min <= 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5ms seem too risky to me. Actually, 1 second is perhaps okay here.
assert(error.contains("within 100 ms")) | ||
} | ||
|
||
ignore("throw exception if barrier() call mismatched") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ignored? To create this scenario, we might need to create a new thread to call context.barrier()
and then interrupt the thread.
Test build #93890 has finished for PR 21898 at commit
|
retest this please |
Test build #93911 has finished for PR 21898 at commit
|
private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { | ||
val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) | ||
if (requests != null) { | ||
requests.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed? when we call syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId))
, the array buffer becomes dangling and will be GCed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to be safe, in case the requests are held in other places, we can still GC the RpcCallContext
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @cloud-fan that this is not necessary. It only explicitly clears the ArrayBuffer object instead of the contexts.
private def getOrInitSyncRequests( | ||
stageId: Int, | ||
stageAttemptId: Int, | ||
numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will we use the default value 0
?
if (syncRequests.size == numTasks) { | ||
syncRequests.foreach(_.reply(())) | ||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (...) {
...
true
} else {
false
}
timer.schedule(new TimerTask { | ||
override def run(): Unit = { | ||
// Timeout for current barrier() call, fail all the sync requests. | ||
val requests = getOrInitSyncRequests(stageId, stageAttemptId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if all the sync requests finish before timeout? then here we may init the request array again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have some tests for the timeout behavior, by setting a very small timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, how about cancel the TimerTask when sync request finished successfully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we should do that, but we also need to consider race like sync request finishes and timer triggers at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will also remove the internal data on stage completed, so I assume the race condition you mentioned won't cause serious issues, the internal data will after all be removed.
}, timeout * 1000) | ||
} | ||
|
||
syncRequests += context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although very unlikely, shall we add an assert that syncRequests.length == numTasks
? Just in case we have a bug and some barrier tasks have a different value of numTasks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't remember the numTasks
in BarrierCoordinator, if it really worth that then we have to use another map to store the information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each barrier task remembers numTask, so here we can make sure barrier tasks of same group would have same numTasks
Good to see to finish without failure. |
They are not - I made the variable |
I see. got it, thanks |
|
||
// Number of tasks of the current barrier stage, a barrier() call must collect enough requests | ||
// from different tasks within the same barrier stage attempt to succeed. | ||
private lazy val numTasks = getTaskInfos().size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be a def
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If change it to a def
then we have to call getTaskInfos()
every time, the current lazy val
shall only call getTaskInfos()
once.
@@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { | |||
Utils.tryLogNonFatalError { | |||
_executorAllocationManager.foreach(_.stop()) | |||
} | |||
if (_dagScheduler != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to fix #21898 (comment) , previously LiveListenerBus was stopped before we stop DAGScheduler.
val callSite = Utils.getCallSite() | ||
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + | ||
s"the global sync, current barrier epoch is $barrierEpoch.") | ||
logTrace(s"Current callSite: $callSite") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or simpler: logTrace("Current callSite: " + Utils.getCallSite())
listenerBus: LiveListenerBus, | ||
override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { | ||
|
||
private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we identify the underlying reason before merging to master?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly a potential bug in SparkSubmit
and not related to the changes made in this PR, I don't feel it shall block this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened https://issues.apache.org/jira/browse/SPARK-25030 to track the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment above this line?
ok to test |
retest this please |
LGTM, pending jenkins |
Test build #94279 has finished for PR 21898 at commit
|
Test build #94286 has finished for PR 21898 at commit
|
Test build #94277 has finished for PR 21898 at commit
|
Test build #94275 has finished for PR 21898 at commit
|
test this please |
1 similar comment
test this please |
Test build #94318 has finished for PR 21898 at commit
|
retest this please |
is there a way to increase the build timeout? cc @shaneknapp |
Test build #94326 has finished for PR 21898 at commit
|
retest this please |
test this please |
@rxin, here we seems indeed starting to hit the time limit now. |
Test build #94333 has finished for PR 21898 at commit
|
great, finally tests pass! thanks, merging to master! |
Test build #94344 has finished for PR 21898 at commit
|
Test build #94341 has finished for PR 21898 at commit
|
What changes were proposed in this pull request?
Implement BarrierTaskContext.barrier(), to support global sync between all the tasks in a barrier stage.
The function set a global barrier and waits until all tasks in this stage hit this barrier. Similar to MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same stage have reached this routine. The global sync shall finish immediately once all tasks in the same barrier stage reaches the same barrier.
This PR implements BarrierTaskContext.barrier() based on netty-based RPC client, introduces new
BarrierCoordinator
and newBarrierCoordinatorMessage
, and new config to handle timeout issue.How was this patch tested?
Add
BarrierTaskContextSuite
to testBarrierTaskContext.barrier()