Skip to content

Comments

[CELEBORN-1829] Replace waitThreadPoll's thread pool with ScheduledExecutorService in Controller#3059

Closed
zaynt4606 wants to merge 20 commits intoapache:mainfrom
zaynt4606:clb1829
Closed

[CELEBORN-1829] Replace waitThreadPoll's thread pool with ScheduledExecutorService in Controller#3059
zaynt4606 wants to merge 20 commits intoapache:mainfrom
zaynt4606:clb1829

Conversation

@zaynt4606
Copy link
Contributor

@zaynt4606 zaynt4606 commented Jan 9, 2025

What changes were proposed in this pull request?

  1. Replace waitThreadPoll's thread pool with ScheduledExecutorService.
  2. commitFile should reply when shuffleCommitTimeout.

Why are the changes needed?

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Cluster test & UT.

@zaynt4606 zaynt4606 changed the title replace threadPool [CELEBORN-1829] Replace waitThreadPoll's thread pool with ScheduledExecutorService in Controller Jan 9, 2025
@zaynt4606 zaynt4606 marked this pull request as draft January 9, 2025 12:08
@zaynt4606 zaynt4606 marked this pull request as ready for review January 9, 2025 17:43
shuffleKey,
JavaUtils.newConcurrentHashMap[Long, (Int, RpcCallContext)]())
val epochWaitTimeMap = shuffleCommitTime.get(shuffleKey)
epochWaitTimeMap.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadlock exists when lock both commitInfo and epochWaitTimeMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~
Read and write security of epoch in epochWaitTimeMap is guaranteed by commitInfo's lock so the lock of epochWaitTimeMap is needless.

}
}

private def checkCommitTimeout(shuffleCommitTime: ConcurrentHashMap[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddUt for checkCommitTimeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with UT

String,
ConcurrentHashMap[Long, (Int, RpcCallContext)]]): Unit = {
val delta = 100
val shuffleCommitTimeout = conf.workerShuffleCommitTimeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf.workerShuffleCommitTimeout can be used as property of the class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~

val commitInfo = shuffleCommitInfos.get(shuffleKey).get(epoch)
commitInfo.synchronized {
if (waitTime * delta < shuffleCommitTimeout) {
if (commitInfo.status == CommitInfo.COMMIT_FINISHED) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COMMIT_FINISHED check can be move ahead of timeout check.
Once the commit is complete, it can return even if it has timed out

var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
// shuffleKey -> (epoch -> CommitInfo)
var shuffleCommitInfos: ConcurrentHashMap[String, ConcurrentHashMap[Long, CommitInfo]] = _
var shuffleCommitTime: ConcurrentHashMap[String, ConcurrentHashMap[Long, (Int, RpcCallContext)]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add description for shuffleCommitTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~

shuffleKey,
JavaUtils.newConcurrentHashMap[Long, (Int, RpcCallContext)]())
val epochWaitTimeMap = shuffleCommitTime.get(shuffleKey)
epochWaitTimeMap.putIfAbsent(epoch, (0, context))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is already a context here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context is a parameter passed to handleCommitFiles and can not used by non handleCommitFiles internal functions

shuffleCommitTime.asScala.foreach {
case (shuffleKey, epochWaitTimeMap) =>
epochWaitTimeMap.asScala.foreach { case (epoch, (waitTime, context)) =>
val commitInfo = shuffleCommitInfos.get(shuffleKey).get(epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should check shuffleCommitInfos.get(shuffleKey) is null or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

epochWaitTimeMap.remove(epoch)
} else {
if (waitTime * delta < shuffleCommitTimeout) {
shuffleCommitTime.get(shuffleKey).put(epoch, (waitTime + 1, context))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitTime -> use startTimestamp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has been updated~

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall except a nit.

commitInfo.response.failedReplicaIds)
shuffleCommitInfos.get(shuffleKey).put(
epoch,
new CommitInfo(replyResponse, CommitInfo.COMMIT_FINISHED))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not create a commit info here because you have synchronized operations on the commitInfo object. I think you can change the value of this commit info object.
However, this won't be a trouble here because the epoch will not be replicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Modifying commitInfo looks more elegant.
Has been updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@RexXiong RexXiong closed this in f2751c2 Jan 18, 2025
@RexXiong
Copy link
Contributor

Thanks. merge to main(v0.6.0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants