Skip to content

Commit

Permalink
linkis-entranc - add support for specified username to use specified …
Browse files Browse the repository at this point in the history
…groupCapacity
  • Loading branch information
Alexkun committed May 13, 2022
1 parent c953223 commit 7b9892e
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.linkis.scheduler.queue.{Group, GroupFactory, SchedulerEvent}

import java.util
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import scala.collection.JavaConversions._


Expand All @@ -46,9 +47,15 @@ class EntranceGroupFactory extends GroupFactory with Logging {
.maximumSize(EntranceConfiguration.GRORUP_CACHE_MAX.getValue).build()

private val GROUP_MAX_CAPACITY = CommonVars("wds.linkis.entrance.max.capacity", 2000)
private val SPECIFIED_USERNAME_REGEX = CommonVars("wds.linkis.entrance.specified.username.regex", "hduser.*")
private val GROUP_SPECIFIED_USER_MAX_CAPACITY = CommonVars("wds.linkis.entrance.specified.max.capacity", 5000)

private val GROUP_INIT_CAPACITY = CommonVars("wds.linkis.entrance.init.capacity", 100)

private val specifiedUsernameRegexPattern: Pattern = if (StringUtils.isNotBlank(SPECIFIED_USERNAME_REGEX.getValue)) {
Pattern.compile(SPECIFIED_USERNAME_REGEX.getValue)
} else {
null
}

override def getOrCreateGroup(event: SchedulerEvent): Group = {
val (labels, params) = event match {
Expand Down Expand Up @@ -79,7 +86,16 @@ class EntranceGroupFactory extends GroupFactory with Logging {
}("Get user configurations from configuration server failed! Next use the default value to continue.")
val maxRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue)
val initCapacity = GROUP_INIT_CAPACITY.getValue(keyAndValue)
val maxCapacity = GROUP_MAX_CAPACITY.getValue(keyAndValue)
val maxCapacity = if (null != specifiedUsernameRegexPattern) {
if (specifiedUsernameRegexPattern.matcher(userCreatorLabel.getUser).find()) {
logger.info(s"Set maxCapacity of user ${userCreatorLabel.getUser} to specifiedMaxCapacity : ${GROUP_SPECIFIED_USER_MAX_CAPACITY.getValue(keyAndValue)}")
GROUP_SPECIFIED_USER_MAX_CAPACITY.getValue(keyAndValue)
} else {
GROUP_MAX_CAPACITY.getValue(keyAndValue)
}
} else {
GROUP_MAX_CAPACITY.getValue(keyAndValue)
}
logger.info(s"Got user configurations: groupName=$groupName, maxRunningJobs=$maxRunningJobs, initCapacity=$initCapacity, maxCapacity=$maxCapacity.")
val group = new ParallelGroup(groupName, initCapacity, maxCapacity)
group.setMaxRunningJobs(maxRunningJobs)
Expand Down

0 comments on commit 7b9892e

Please sign in to comment.