Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
save

save

support to refersh unlimited users
  • Loading branch information
turboFei committed Feb 17, 2023
1 parent 7538b99 commit a8409d0
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 9 deletions.
3 changes: 2 additions & 1 deletion docs/tools/kyuubi-admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Usage: ``bin/kyuubi-admin refresh config [options] [<configType>]``
- The hadoop conf used for proxy user verification.
* - userDefaultsConf
- Refresh the user defaults configs with key in format in the form of `___{username}___.{config key}` from default property file.

* - unlimitedUsers
- The users without maximin connections limitation.
.. _list_engine:

List Engines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.client.AdminRestApi
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.AdminCtlCommand
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, USER_DEFAULTS_CONF}
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF}
import org.apache.kyuubi.ctl.opt.CliConfig
import org.apache.kyuubi.ctl.util.{Tabulator, Validator}

Expand All @@ -36,6 +36,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
normalizedCliConfig.adminConfigOpts.configType match {
case HADOOP_CONF => adminRestApi.refreshHadoopConf()
case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf()
case UNLIMITED_USERS => adminRestApi.refreshUnlimitedUsers()
case configType => throw new KyuubiException(s"Invalid config type:$configType")
}
}
Expand All @@ -48,4 +49,5 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
object RefreshConfigCommandConfigType {
final val HADOOP_CONF = "hadoopConf"
final val USER_DEFAULTS_CONF = "userDefaultsConf"
final val UNLIMITED_USERS = "unlimitedUsers"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ object AdminCommandLine extends CommonCommandLine {
.optional()
.action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v)))
.text("The valid config type can be one of the following: " +
s"$HADOOP_CONF, $USER_DEFAULTS_CONF."))
s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS."))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.ctl

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.ctl.cli.AdminControlCliArguments
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType._
import org.apache.kyuubi.ctl.opt.{ControlAction, ControlObject}

Expand Down Expand Up @@ -63,7 +64,8 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
val opArgs = new AdminControlCliArguments(args)
assert(opArgs.cliConfig.action === ControlAction.REFRESH)
assert(opArgs.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs.cliConfig.adminConfigOpts.configType === "hadoopConf")
assert(
opArgs.cliConfig.adminConfigOpts.configType === RefreshConfigCommandConfigType.HADOOP_CONF)

args = Array(
"refresh",
Expand All @@ -72,7 +74,16 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
val opArgs2 = new AdminControlCliArguments(args)
assert(opArgs2.cliConfig.action === ControlAction.REFRESH)
assert(opArgs2.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs2.cliConfig.adminConfigOpts.configType === "userDefaultsConf")
assert(opArgs2.cliConfig.adminConfigOpts.configType === RefreshConfigCommandConfigType.USER_DEFAULTS_CONF)

args = Array(
"refresh",
"config",
"unlimitedUsers")
val opArgs3 = new AdminControlCliArguments(args)
assert(opArgs3.cliConfig.action === ControlAction.REFRESH)
assert(opArgs3.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs3.cliConfig.adminConfigOpts.configType === RefreshConfigCommandConfigType.UNLIMITED_USERS)

args = Array(
"refresh",
Expand Down Expand Up @@ -147,7 +158,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
| Refresh the resource.
|Command: refresh config [<configType>]
| Refresh the config with specified type.
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF.
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS.
|
| -h, --help Show help message and exit.""".stripMargin
// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public String refreshUserDefaultsConf() {
return this.getClient().post(path, null, client.getAuthHeader());
}

public String refreshUnlimitedUsers() {
String path = String.format("%s/%s", API_BASE_PATH, "refresh/unlimited_users");
return this.getClient().post(path, null, client.getAuthHeader());
}

public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
Map<String, Object> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper

Expand Down Expand Up @@ -128,6 +129,16 @@ object KyuubiServer extends Logging {
info(s"Refreshed user defaults configs with changes of " +
s"unset: $unsetCount, updated: $updatedCount, added: $addedCount")
}

private[kyuubi] def refreshUnlimitedUsers(): Unit = synchronized {
val existingUnlimitedUsers =
kyuubiServer.conf.get(KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet
val refreshedUnlimitedUsers = KyuubiConf().loadFileDefaults().get(
KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet
kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
.refreshUnlimitedUsers(refreshedUnlimitedUsers)
info(s"Refreshed unlimited users from $existingUnlimitedUsers to $refreshedUnlimitedUsers")
}
}

class KyuubiServer(name: String) extends Serverable(name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
Response.ok(s"Refresh the user defaults conf successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
description = "refresh the unlimited users")
@POST
@Path("refresh/unlimited_users")
def refreshUnlimitedUser(): Response = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh unlimited users request from $userName/$ipAddress")
if (!userName.equals(administrator)) {
throw new NotAllowedException(
s"$userName is not allowed to refresh the unlimited users")
}
info(s"Reloading unlimited users")
KyuubiServer.refreshUnlimitedUsers()
Response.ok(s"Refresh the unlimited users successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
userUnlimitedList)
}

private[kyuubi] def refreshUnlimitedUsers(userUnlimitedList: Set[String]): Unit = {
limiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList))
batchLimiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList))
}

private def applySessionLimiter(
userLimit: Int,
ipAddressLimit: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class SessionLimiterWithUnlimitedUsersImpl(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
unlimitedUsers: Set[String])
var unlimitedUsers: Set[String])
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
override def increment(userIpAddress: UserIpAddress): Unit = {
if (!unlimitedUsers.contains(userIpAddress.user)) {
Expand All @@ -118,6 +118,10 @@ class SessionLimiterWithUnlimitedUsersImpl(
super.decrement(userIpAddress)
}
}

def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = {
this.unlimitedUsers = unlimitedUsers
}
}

object SessionLimiter {
Expand All @@ -126,12 +130,18 @@ object SessionLimiter {
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
userWhiteList: Set[String] = Set.empty): SessionLimiter = {
unlimitedUsers: Set[String] = Set.empty): SessionLimiter = {
new SessionLimiterWithUnlimitedUsersImpl(
userLimit,
ipAddressLimit,
userIpAddressLimit,
userWhiteList)
unlimitedUsers)
}

def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = {
limiter match {
case l: SessionLimiterWithUnlimitedUsersImpl => l.setUnlimitedUsers(unlimitedUsers)
case _ =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(200 == response.getStatus)
}

test("refresh unlimited users the kyuubi server") {
var response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.post(null)
assert(405 == response.getStatus)

val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.post(null)
assert(200 == response.getStatus)
}

test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down

0 comments on commit a8409d0

Please sign in to comment.