[CELEBORN-1597][CIP-11] Implement TagsManager#2739
[CELEBORN-1597][CIP-11] Implement TagsManager#2739s0nskar wants to merge 7 commits intoapache:mainfrom
Conversation
master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
Outdated
Show resolved
Hide resolved
| type Tag = String | ||
| type WorkerId = String | ||
|
|
||
| type TagsStore = ConcurrentHashMap[Tag, Set[WorkerId]] |
There was a problem hiding this comment.
Since the value of TagStore is not exposed outside of TagsManager for both read/write, make this a mutable MT-safe data structure ? like ConcurrentHashMap.newKeySet() for some such for ex ?
It will avoid perf degradation if mutations keep happening.
| type TagsStore = ConcurrentHashMap[Tag, Set[WorkerId]] | |
| import java.util.{Set => JSet} | |
| type TagsStore = ConcurrentHashMap[Tag, JSet[WorkerId]] |
There was a problem hiding this comment.
It will avoid perf degradation if mutations keep happening.
@mridulm I've updated the code to use JSet, although would like to read more about this, is there any references for this.
master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| def getTagsForWorker(worker: WorkerInfo): Set[Tag] = { | ||
| tagStore.asScala.filter(_._2.contains(worker.host)).keySet.toSet |
There was a problem hiding this comment.
This can be relatively expensive - are we expecting it to be a common operation ?
There was a problem hiding this comment.
Yes, this won't be a common operation and will only used if someone calls list tags for worker API. Operation cost would be upper bounded by O(tags) * O(log (numWorkers))
There was a problem hiding this comment.
IMO, if this is not a common operation, we can keep this code.
However, if this API is frequently used, O^2 + large number of celeborn workers may cause performance degradation. We can use additional data structures to assist in querying.
There was a problem hiding this comment.
agree, we can build a reverse index mapping of worker -> tags if needed in future.
master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala
Show resolved
Hide resolved
| } | ||
|
|
||
| def removeTagFromWorker(tag: Tag, worker: WorkerInfo): Unit = { | ||
| val workerId = worker.host |
There was a problem hiding this comment.
The worker's host cannot be used as the unique identifier of a worker. It is more appropriate to use the WorkerInfo#toUniqueId method.
There was a problem hiding this comment.
Or we can just put the WorkerInfo, it's comparable.
There was a problem hiding this comment.
@zwangsheng Thats a good point but WorkerInfo#toUniqueId contains too much information. IMO for the external interface like File / DB / API passing the worker unique id can become cumbersome to manage for admins while host names are easy to manage.
Another option to uniquely identify a worker can be workerHost:rpcPort which will also be unique.
There was a problem hiding this comment.
@s0nskar Sorry for the late reply, using just host may cause problems in some users' case.
Use workerHost:rpcPort as the unique key seems reasonable to me
There was a problem hiding this comment.
updated the PR to use toUniqueId as it is predefined.
Filed a follow up task to decide if we should use workerHost:rpcPort or something better.
Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2739 +/- ##
==========================================
+ Coverage 33.11% 33.13% +0.02%
==========================================
Files 314 314
Lines 18373 18435 +62
Branches 1683 1691 +8
==========================================
+ Hits 6082 6106 +24
- Misses 11951 11988 +37
- Partials 340 341 +1 ☔ View full report in Codecov by Sentry. |
| import org.apache.celeborn.service.deploy.master.tags.TagsManager.{Tag, TagsStore, WorkerId} | ||
|
|
||
| object TagsManager { | ||
| type Tag = String |
There was a problem hiding this comment.
I think these three types alias are not needed. These types are not complex. This won't help others to understand this code.
There was a problem hiding this comment.
ACK, will remove these.
| type Tag = String | ||
| type WorkerId = String | ||
|
|
||
| type TagsStore = ConcurrentHashMap[Tag, JSet[WorkerId]] |
There was a problem hiding this comment.
The value type of the "JSet" should be WorkerInfo.
There was a problem hiding this comment.
I think there is a slight disconnect. TagsManager will store the mapping for tags to Set{workerId}. It is possible the tagged worker might not be present/running currently.
TagsManger responsibility is just to filter out the tagged worker from given a list of WorkerInfo.
| val workerId = worker.host | ||
| val workers = tagStore.computeIfAbsent(tag, addNewTagFunc) | ||
| logInfo(s"Adding Tag $tag to worker $workerId") | ||
| workers.add(workerId) |
There was a problem hiding this comment.
You have already got the worker, just storing the worker will be better. The worker info had override equals methods, it can be put into a set.
There was a problem hiding this comment.
@FMX This will be manager by external interfaces as well where user will have to add a unique identifier for a worker. We are comparing worker with the identifier. wdyt?
There was a problem hiding this comment.
Look at your method declaration, you have already got the workerInfo you need. In this tag manager, storing a string as a worker's identifier is unnecessary.
There was a problem hiding this comment.
I understand that you want to convey that we need to find the right worker information. However, finding the correct work info can be accomplished outside of this tag manager. For example, we can use a worker's host or a worker's unique ID.
There was a problem hiding this comment.
Oh i implemented wrong definition, it should have been
def addTagToWorker(tag: Tag, worker: WorkerId)
I will update the same for this and other functions.
|
@mridulm @FMX @zwangsheng updated the PR, PTAL. |
FMX
left a comment
There was a problem hiding this comment.
LGTM. Although this implementation may have performance issues, it's OK. We can optimize performance if we encounter.
|
Merged into main(v0.6.0). |
@FMX can you mention the issues, i will try to address those in follow up PR. |
### What changes were proposed in this pull request? Added tags manager which will be responsible for managing worker tags. This will be used in the follow up PRs ### Why are the changes needed? https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn ### Does this PR introduce _any_ user-facing change? NA ### How was this patch tested? Added UTs Closes apache#2739 from s0nskar/tags_manager. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
What changes were proposed in this pull request?
Added tags manager which will be responsible for managing worker tags. This will be used in the follow up PRs
Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
Does this PR introduce any user-facing change?
NA
How was this patch tested?
Added UTs