-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Support user's app quota level limit #311
Conversation
Could you modify the description to attach your design document? Do the shuffle server have quota concept? cc @Gustfh |
proto/src/main/proto/Rss.proto
Outdated
@@ -302,6 +302,7 @@ service CoordinatorServer { | |||
|
|||
message AppHeartBeatRequest { | |||
string appId = 1; | |||
string user = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do the heartbeat need the user
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we register the shuffle, we put the user information to the shuffle server. Is it enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it isn't enough. Though user information to the shuffle server, we have to send server heartbeat to coordinator.So we use this app heartBeat to refresh each user' s app number which stored in coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For coordinator, could we pass the user information to the coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the heartbeat of the driver is used to send to the coordinator, the implementation is simple. If the heartbeat of the shuffleServer is used, you need to add a collection attribute in the heartbeat request of the shuffleServer, record the user and the corresponding app list, and then summarize them in the coordinator. Right ? And I haven't found the advantages of doing this for the time being. At present, this pr and our current production environment deployment have enough restrictions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't get your point. I feel it's unnecessary and strange that we put the user information to the heartbeat. We have register the user information to shuffle server. Why do we need heartbeat? We hope we can reduce the repeated information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got your point. The coordinator needs to rely on the user to update the life cycle of the app, so the user is added to the heartbeat of the app. As for the repetitive problem you mentioned, do you have a better solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the heartbeat of the driver is used to send to the coordinator, the implementation is simple. If the heartbeat of the shuffleServer is used, you need to add a collection attribute in the heartbeat request of the shuffleServer, record the user and the corresponding app list, and then summarize them in the coordinator. Right ? And I haven't found the advantages of doing this for the time being. At present, this pr and our current production environment deployment have enough restrictions.
What's your restrictions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present,restrictions is the number of apps that each user can submit.
Codecov Report
@@ Coverage Diff @@
## master #311 +/- ##
============================================
- Coverage 61.32% 58.39% -2.94%
- Complexity 1526 1552 +26
============================================
Files 186 193 +7
Lines 9441 10761 +1320
Branches 924 937 +13
============================================
+ Hits 5790 6284 +494
- Misses 3341 4103 +762
- Partials 310 374 +64
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
For now, shuffle server don't have quota concept. |
@@ -203,12 +206,12 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { | |||
@Override | |||
public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) { | |||
// If yarn enable retry ApplicationMaster, appId will be not unique and shuffle data will be incorrect, | |||
// appId + timestamp can avoid such problem, | |||
// appId + uuid can avoid such problem, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need uuid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't get the appId when we try Access, because the appId is generated after the RssManager is created. In order to support push down, we maintain the uuid as a substitute for the appId, and replace the uuid with the appId after the app heartbeat is reported to the coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can put the appId to the AcessInfo
when we try to access coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to generate the uuid on the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault. We ignore that I can't get the appId in the construct method. Let me think twice.
If we have shuffle server quota, shuffle server will report the some metrics to the coordinator. The metrics will contain user quota info. Maybe we can reuse the information. @Gustfh WDTY? |
Maybe we could add a rpc method called |
You mean to add a request different from |
Yes. |
PTAL. @jerqi |
public void appHeartbeat( | ||
AppHeartBeatRequest request, | ||
StreamObserver<AppHeartBeatResponse> responseObserver) { | ||
public void registerApplicationInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we rename appHeartbeat
to registerApplicationInfo
? I mean that we can add a method registerApplicationInfo
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a rpc method called registerApplicationInfo for Coordinator.
Didn't it say to add a new rpc method? If not, the shuffle server still get appHeartbeat contained the user's attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoordinatorCrpcService
should have heartbeat
and registerApplicationInfo
methods at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean to keep sendAppHeartbeat, and use registerApplicationInfo to send a request to the coordinator only once for recording. And still use sendAppHeartbeat later, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean to keep sendAppHeartbeat, and use registerApplicationInfo to send a request to the coordinator only once for recording. And still use sendAppHeartbeat later, right?
Yes. we use the registerApplicationInfo to coordinator and then send heartbeat to coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -282,6 +328,58 @@ private String getStorageHost(String remoteStoragePath) { | |||
return storageHost; | |||
} | |||
|
|||
public void detectUserResource() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the application manager involve the quota logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we can configure rss.coordinator.quota.default.path
to controls whether to execute the logic of detectUserResource
. If not configured, the logic will not be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a quota manager? Or will application manager manage both quota and application?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe quota manager will be better.
@@ -228,7 +254,8 @@ public void accessCluster(AccessClusterRequest request, StreamObserver<AccessClu | |||
new AccessInfo( | |||
request.getAccessId(), | |||
Sets.newHashSet(request.getTagsList()), | |||
request.getExtraPropertiesMap() | |||
request.getExtraPropertiesMap(), | |||
request.getUser() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a compatible feature? What will happen if a old client request a new coordinator service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 36fb720
client-spark/spark3/pom.xml
Outdated
@@ -39,6 +39,13 @@ | |||
<artifactId>spark-core_${scala.binary.version}</artifactId> | |||
<version>${spark.version}</version> | |||
<scope>provided</scope> | |||
<exclusions> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to remove this.
@@ -239,6 +239,7 @@ public Thread newThread(Runnable r) { | |||
long heartbeatInterval = conf.getLong(RssMRConfig.RSS_HEARTBEAT_INTERVAL, | |||
RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE); | |||
long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2); | |||
client.registerApplicationInfo(appId, heartbeatTimeout, "user"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove this if mr don't need to register application info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this because applicationManager
remove expired app need user
. When our Spark does not use the AppQuota checker, it also needs register application info, so it will not have much impact here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this isn't compatible feature. If we use old client to connect new service, something wrong will happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because the old client does not have user information or method of registerApplicationInfo
, app can only be updated at refreshApp
, which I am compatible with.
@@ -222,6 +225,24 @@ public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest request) | |||
return response; | |||
} | |||
|
|||
@Override | |||
public RssApplicationInfoResponse sendApplicationInfo(RssApplicationInfoRequest request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendApplicationInfo
-> registerApplicationInfo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, wait for CI
Because in the discussion of the last meeting, the shuffle server also needs to be limited, but for the number of apps, the number of apps on a sever may not directly determine the load, so I have not developed this place. WDYT? @jerqi |
It's ok for me. Wait for a moment. Let me see @Gustfh whether have another suggestion. If he don't reply us, I'll merge this pr tomorrow. |
merged. Thanks @smallzhongfeng |
@jerqi @smallzhongfeng I think this feature should be an option. WDYT? |
Yes, this is an option now. You can see incubator-uniffle/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java Line 76 in dea310b
|
But |
You can raise a pr to fix this issue. |
There are some unreasonable places here. I will fix it. @jerqi @xianjingfeng |
What changes were proposed in this pull request?
For issue #211 and the design document https://docs.google.com/document/d/1MApSMFQgoS1VAoKbZjomqSRm0iTbSuKG1yvKNlWW65c/edit?usp=sharing
Why are the changes needed?
Better isolation of resources between different users.
Does this PR introduce any user-facing change?
Add config
rss.coordinator.quota.default.app.num
to set default app number each user andrss.coordinator.quota.default.path
to set a path to record the number of apps that each user can run.How was this patch tested?
Add uts.