Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to refresh the unlimited users for session limiter #4360

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.server.limit.connections.per.ipaddress | <undefined> | Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
| kyuubi.server.limit.connections.per.user | <undefined> | Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
| kyuubi.server.limit.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.6.0 |
| kyuubi.server.limit.connections.user.unlimited.list || The maximin connections of the user in the white list will not be limited. | seq | 1.7.0 |
| kyuubi.server.limit.connections.user.unlimited.list || The maximum connections of the user in the white list will not be limited. | seq | 1.7.0 |
| kyuubi.server.name | <undefined> | The name of Kyuubi Server. | string | 1.5.0 |
| kyuubi.server.redaction.regex | <undefined> | Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs. || 1.6.0 |

Expand Down
4 changes: 3 additions & 1 deletion docs/tools/kyuubi-admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ Usage: ``bin/kyuubi-admin refresh config [options] [<configType>]``
* - hadoopConf
- The hadoop conf used for proxy user verification.
* - userDefaultsConf
- Refresh the user defaults configs with key in format in the form of `___{username}___.{config key}` from default property file.
- The user defaults configs with key in format in the form of `___{username}___.{config key}` from default property file.
bowenliang123 marked this conversation as resolved.
Show resolved Hide resolved
* - unlimitedUsers
- The users without maximum connections limitation.

.. _list_engine:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,7 @@ object KyuubiConf {

val SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.server.limit.connections.user.unlimited.list")
.doc("The maximin connections of the user in the white list will not be limited.")
.doc("The maximum connections of the user in the white list will not be limited.")
.version("1.7.0")
.serverOnly
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.client.AdminRestApi
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.AdminCtlCommand
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, USER_DEFAULTS_CONF}
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, UNLIMITED_USERS, USER_DEFAULTS_CONF}
import org.apache.kyuubi.ctl.opt.CliConfig
import org.apache.kyuubi.ctl.util.{Tabulator, Validator}

Expand All @@ -36,6 +36,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
normalizedCliConfig.adminConfigOpts.configType match {
case HADOOP_CONF => adminRestApi.refreshHadoopConf()
case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf()
case UNLIMITED_USERS => adminRestApi.refreshUnlimitedUsers()
case configType => throw new KyuubiException(s"Invalid config type:$configType")
}
}
Expand All @@ -48,4 +49,5 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
object RefreshConfigCommandConfigType {
final val HADOOP_CONF = "hadoopConf"
final val USER_DEFAULTS_CONF = "userDefaultsConf"
final val UNLIMITED_USERS = "unlimitedUsers"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ object AdminCommandLine extends CommonCommandLine {
.optional()
.action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v)))
.text("The valid config type can be one of the following: " +
s"$HADOOP_CONF, $USER_DEFAULTS_CONF."))
s"$HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS."))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
val opArgs = new AdminControlCliArguments(args)
assert(opArgs.cliConfig.action === ControlAction.REFRESH)
assert(opArgs.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs.cliConfig.adminConfigOpts.configType === "hadoopConf")
assert(opArgs.cliConfig.adminConfigOpts.configType === HADOOP_CONF)

args = Array(
"refresh",
Expand All @@ -72,7 +72,16 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
val opArgs2 = new AdminControlCliArguments(args)
assert(opArgs2.cliConfig.action === ControlAction.REFRESH)
assert(opArgs2.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs2.cliConfig.adminConfigOpts.configType === "userDefaultsConf")
assert(opArgs2.cliConfig.adminConfigOpts.configType === USER_DEFAULTS_CONF)

args = Array(
"refresh",
"config",
"unlimitedUsers")
val opArgs3 = new AdminControlCliArguments(args)
assert(opArgs3.cliConfig.action === ControlAction.REFRESH)
assert(opArgs3.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs3.cliConfig.adminConfigOpts.configType === UNLIMITED_USERS)

args = Array(
"refresh",
Expand Down Expand Up @@ -147,7 +156,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
| Refresh the resource.
|Command: refresh config [<configType>]
| Refresh the config with specified type.
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF.
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF, $UNLIMITED_USERS.
|
| -h, --help Show help message and exit.""".stripMargin
// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public String refreshUserDefaultsConf() {
return this.getClient().post(path, null, client.getAuthHeader());
}

public String refreshUnlimitedUsers() {
String path = String.format("%s/%s", API_BASE_PATH, "refresh/unlimited_users");
return this.getClient().post(path, null, client.getAuthHeader());
}

public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
Map<String, Object> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper

Expand Down Expand Up @@ -128,6 +129,16 @@ object KyuubiServer extends Logging {
info(s"Refreshed user defaults configs with changes of " +
s"unset: $unsetCount, updated: $updatedCount, added: $addedCount")
}

private[kyuubi] def refreshUnlimitedUsers(): Unit = synchronized {
val existingUnlimitedUsers =
kyuubiServer.conf.get(KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet
val refreshedUnlimitedUsers = KyuubiConf().loadFileDefaults().get(
KyuubiConf.SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).toSet
kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
.refreshUnlimitedUsers(refreshedUnlimitedUsers)
info(s"Refreshed unlimited users from $existingUnlimitedUsers to $refreshedUnlimitedUsers")
}
}

class KyuubiServer(name: String) extends Serverable(name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
Response.ok(s"Refresh the user defaults conf successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
description = "refresh the unlimited users")
@POST
@Path("refresh/unlimited_users")
def refreshUnlimitedUser(): Response = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh unlimited users request from $userName/$ipAddress")
if (!userName.equals(administrator)) {
throw new NotAllowedException(
s"$userName is not allowed to refresh the unlimited users")
}
info(s"Reloading unlimited users")
KyuubiServer.refreshUnlimitedUsers()
Response.ok(s"Refresh the unlimited users successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
userUnlimitedList)
}

private[kyuubi] def refreshUnlimitedUsers(userUnlimitedList: Set[String]): Unit = {
limiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList))
batchLimiter.foreach(SessionLimiter.resetUnlimitedUsers(_, userUnlimitedList))
}

private def applySessionLimiter(
userLimit: Int,
ipAddressLimit: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class SessionLimiterWithUnlimitedUsersImpl(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
unlimitedUsers: Set[String])
var unlimitedUsers: Set[String])
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
override def increment(userIpAddress: UserIpAddress): Unit = {
if (!unlimitedUsers.contains(userIpAddress.user)) {
Expand All @@ -118,6 +118,10 @@ class SessionLimiterWithUnlimitedUsersImpl(
super.decrement(userIpAddress)
}
}

private[kyuubi] def setUnlimitedUsers(unlimitedUsers: Set[String]): Unit = {
this.unlimitedUsers = unlimitedUsers
}
}

object SessionLimiter {
Expand All @@ -126,12 +130,18 @@ object SessionLimiter {
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
userWhiteList: Set[String] = Set.empty): SessionLimiter = {
unlimitedUsers: Set[String] = Set.empty): SessionLimiter = {
new SessionLimiterWithUnlimitedUsersImpl(
userLimit,
ipAddressLimit,
userIpAddressLimit,
userWhiteList)
unlimitedUsers)
}

def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit = {
limiter match {
case l: SessionLimiterWithUnlimitedUsersImpl => l.setUnlimitedUsers(unlimitedUsers)
case _ =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(200 == response.getStatus)
}

test("refresh unlimited users of the kyuubi server") {
var response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.post(null)
assert(405 == response.getStatus)

val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.post(null)
assert(200 == response.getStatus)
}

test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down