Skip to content

Commit

Permalink
compatible with lower version clients
Browse files Browse the repository at this point in the history
  • Loading branch information
smallzhongfeng committed Nov 21, 2022
1 parent 7fa69e8 commit 36fb720
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
7 changes: 0 additions & 7 deletions client-spark/spark3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>

</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,23 @@ public AccessCheckResult check(AccessInfo accessInfo) {
COUNTER.increment();
final String uuid = hostIp.hashCode() + "-" + COUNTER.sum();
final String user = accessInfo.getUser();
Map<String, Map<String, Long>> currentUserApps = applicationManager.getCurrentUserApps();
Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x -> Maps.newConcurrentMap());
Integer defaultAppNum = applicationManager.getDefaultUserApps().getOrDefault(user,
conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
int currentAppNum = appAndTimes.size();
if (currentAppNum >= defaultAppNum) {
String msg = "Denied by AccessClusterLoadChecker => "
+ "User: " + user + ", current app num is: " + currentAppNum
+ ", default app num is: " + defaultAppNum + ". We will reject this app[uuid=" + uuid + "].";
LOG.error(msg);
CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
return new AccessCheckResult(false, msg);
// low version client user attribute is an empty string
if (!"".equals(user)) {
Map<String, Map<String, Long>> currentUserApps = applicationManager.getCurrentUserApps();
Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x -> Maps.newConcurrentMap());
Integer defaultAppNum = applicationManager.getDefaultUserApps().getOrDefault(user,
conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
int currentAppNum = appAndTimes.size();
if (currentAppNum >= defaultAppNum) {
String msg = "Denied by AccessClusterLoadChecker => "
+ "User: " + user + ", current app num is: " + currentAppNum
+ ", default app num is: " + defaultAppNum + ". We will reject this app[uuid=" + uuid + "].";
LOG.error(msg);
CoordinatorMetrics.counterTotalQuotaDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
appAndTimes.put(uuid, System.currentTimeMillis());
}
appAndTimes.put(uuid, System.currentTimeMillis());
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE, uuid);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ public void registerApplicationInfo(String appId, String user) {
}

public void refreshAppId(String appId) {
Map<String, Long> appAndTime = currentUserAndApp.get(appIdToUser.get(appId));
appAndTime.put(appId, System.currentTimeMillis());
String user = appIdToUser.get(appId);
// compatible with lower version clients
if (user == null) {
registerApplicationInfo(appId, "");
} else {
Map<String, Long> appAndTime = currentUserAndApp.get(user);
appAndTime.put(appId, System.currentTimeMillis());
}
}

public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageConf) {
Expand Down

0 comments on commit 36fb720

Please sign in to comment.