Skip to content

Commit

Permalink
[KYUUBI #1536] Add session conf advisor
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
closes #1536

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1892 from ulysses-you/KYUUBI-1536.

Closes #1536

f0c450a [ulysses-you] codecov
35fac4f [ulysses-you] fix
f34cfbf [ulysses-you] comment
78a54bc [ulysses-you] address comment
5cb574b [ulysses-you] fix test
1033909 [ulysses-you] fix
3c925e5 [ulysses-you] address comment
a74988c [ulysses-you] style
649f02d [ulysses-you] Add session conf advisor
debd3ad [ulysses-you] Add session conf advisor

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
ulysses-you committed Feb 15, 2022
1 parent d90c664 commit 6811530
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 8 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/style.yml
Expand Up @@ -41,6 +41,12 @@ jobs:
with:
distribution: zulu
java-version: 8
- name: Install
run: >-
build/mvn clean install -V -Pflink-provided,spark-provided -Dorg.slf4j.simpleLogger.defaultLogLevel=warn \
-Dmaven.javadoc.skip=true -Drat.skip=true -Dscalastyle.skip=true -Dspotless.check.skip -DskipTests \
-Pflink-provided,spark-provided \
-pl kyuubi-ctl,kyuubi-server,kyuubi-assembly -am
- name: Scalastyle with Maven
run: build/mvn scalastyle:check ${{ matrix.profiles }}
- name: Upload scalastyle report
Expand All @@ -49,9 +55,4 @@ jobs:
- name: JavaStyle with Maven
run: build/mvn spotless:check ${{ matrix.profiles }}
- name: Check dependency list
run: >-
build/mvn clean install -V -Pflink-provided,spark-provided -Dorg.slf4j.simpleLogger.defaultLogLevel=warn \
-Dmaven.javadoc.skip=true -Drat.skip=true -Dscalastyle.skip=true -Dspotless.check.skip -DskipTests \
-Pflink-provided,spark-provided \
-pl kyuubi-ctl,kyuubi-server,kyuubi-assembly -am
build/dependency.sh
run: build/dependency.sh
6 changes: 6 additions & 0 deletions dev/kyuubi-codecov/pom.xml
Expand Up @@ -68,6 +68,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-sql-engine_${scala.binary.version}</artifactId>
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Expand Up @@ -313,6 +313,7 @@ Key | Default | Meaning | Type | Since
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
<code>kyuubi.session.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for session timeout.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.conf.advisor</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different user or session configs and overwrite the session configs before open a new session. This config value should be a class which is a child of 'org.apache.kyuubi.plugin.SessionConfAdvisor' which has zero-arg constructor.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
<code>kyuubi.session.conf.ignore.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.session.conf.restrict.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.session.engine.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
Expand Down
Expand Up @@ -1172,6 +1172,16 @@ object KyuubiConf {
.checkValues(OperationLanguages.values.map(_.toString))
.createWithDefault(OperationLanguages.SQL.toString)

val SESSION_CONF_ADVISOR: OptionalConfigEntry[String] =
buildConf("session.conf.advisor")
.doc("A config advisor plugin for Kyuubi Server. This plugin can provide some custom " +
"configs for different user or session configs and overwrite the session configs before " +
"open a new session. This config value should be a class which is a child of " +
"'org.apache.kyuubi.plugin.SessionConfAdvisor' which has zero-arg constructor.")
.version("1.5.0")
.stringConf
.createOptional

val SERVER_NAME: OptionalConfigEntry[String] =
buildConf("server.name")
.doc("The name of Kyuubi Server.")
Expand Down
44 changes: 44 additions & 0 deletions kyuubi-server-plugin/pom.xml
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi-parent</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kyuubi-server-plugin</artifactId>
<name>Kyuubi Project Server Plugin</name>
<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin;

public class DefaultSessionConfAdvisor implements SessionConfAdvisor {}
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin;

import java.util.Collections;
import java.util.Map;

/** Provide the session configuration according to the user and session configuration. */
public interface SessionConfAdvisor {
/** The returned conf will overwrite the session conf */
@SuppressWarnings("unchecked")
default Map<String, String> getConfOverlay(String user, Map<String, String> sessionConf) {
return Collections.EMPTY_MAP;
}
}
6 changes: 6 additions & 0 deletions kyuubi-server/pom.xml
Expand Up @@ -37,6 +37,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
Expand Down
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin

import scala.util.control.NonFatal

import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf

private[kyuubi] object PluginLoader {

def loadSessionConfAdvisor(conf: KyuubiConf): SessionConfAdvisor = {
val advisorClass = conf.get(KyuubiConf.SESSION_CONF_ADVISOR)
if (advisorClass.isEmpty) {
return new DefaultSessionConfAdvisor()
}

try {
Class.forName(advisorClass.get).getConstructor().newInstance()
.asInstanceOf[SessionConfAdvisor]
} catch {
case _: ClassCastException =>
throw new KyuubiException(
s"Class ${advisorClass.get} is not a child of '${classOf[SessionConfAdvisor].getName}'.")
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '${advisorClass.get}':", e)
}
}
}
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.session

import scala.collection.JavaConverters._

import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift._

Expand Down Expand Up @@ -45,8 +47,20 @@ class KyuubiSessionImpl(
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle = SessionHandle(protocol)

private[kyuubi] val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
user,
normalizedConf.asJava)
if (confOverlay != null) {
normalizedConf ++ confOverlay.asScala
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
}
}

// TODO: needs improve the hardcode
normalizedConf.foreach {
optimizedConf.foreach {
case ("use:database", _) =>
case ("kyuubi.engine.pool.size.threshold", _) =>
case (key, value) => sessionConf.set(key, value)
Expand Down Expand Up @@ -90,7 +104,7 @@ class KyuubiSessionImpl(
Option(password).filter(_.nonEmpty).getOrElse("anonymous")
}
_client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
_engineSessionHandle = _client.openSession(protocol, user, passwd, normalizedConf)
_engineSessionHandle = _client.openSession(protocol, user, passwd, optimizedConf)
logSessionInfo(s"Connected to engine [$host:$port] with ${_engineSessionHandle}")
sessionEvent.openedTime = System.currentTimeMillis()
sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
Expand Down
Expand Up @@ -27,13 +27,16 @@ import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
import org.apache.kyuubi.plugin.{PluginLoader, SessionConfAdvisor}

class KyuubiSessionManager private (name: String) extends SessionManager(name) {

def this() = this(classOf[KyuubiSessionManager].getSimpleName)

val operationManager = new KyuubiOperationManager()
val credentialsManager = new HadoopCredentialsManager()
// this lazy is must be specified since the conf is null when the class initialization
lazy val sessionConfAdvisor: SessionConfAdvisor = PluginLoader.loadSessionConfAdvisor(conf)

override def initialize(conf: KyuubiConf): Unit = {
addService(credentialsManager)
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.operation

import java.sql.SQLException
import java.util
import java.util.Properties

import scala.collection.JavaConverters._
Expand All @@ -27,7 +28,9 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.plugin.SessionConfAdvisor

/**
* UT with Connection level engine shared cost much time, only run basic jdbc tests.
Expand All @@ -38,6 +41,7 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe

override protected val conf: KyuubiConf = {
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
.set(SESSION_CONF_ADVISOR.key, classOf[TestSessionConfAdvisor].getName)
}

test("KYUUBI #647 - async query causes engine crash") {
Expand Down Expand Up @@ -181,4 +185,30 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(resultSet.head.getStringVal.getValues.get(0).contains("kyuubi.operation.language"))
}
}

test("test session conf plugin") {
withSessionConf()(Map())(Map("spark.k1" -> "v0", "spark.k3" -> "v4")) {
withJdbcStatement() { statement =>
val r1 = statement.executeQuery("set spark.k1")
assert(r1.next())
assert(r1.getString(2) == "v0")

val r2 = statement.executeQuery("set spark.k3")
assert(r2.next())
assert(r2.getString(2) == "v3")

val r3 = statement.executeQuery("set spark.k4")
assert(r3.next())
assert(r3.getString(2) == "v4")
}
}
}
}

class TestSessionConfAdvisor extends SessionConfAdvisor {
override def getConfOverlay(
user: String,
sessionConf: util.Map[String, String]): util.Map[String, String] = {
Map("spark.k3" -> "v3", "spark.k4" -> "v4").asJava
}
}
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin

import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf

class PluginLoaderSuite extends KyuubiFunSuite {

test("test engine conf advisor wrong class") {
val conf = new KyuubiConf(false)
assert(PluginLoader.loadSessionConfAdvisor(conf).isInstanceOf[DefaultSessionConfAdvisor])

conf.set(KyuubiConf.SESSION_CONF_ADVISOR, classOf[FakeAdvisor].getName)
val msg1 = intercept[KyuubiException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
assert(msg1.contains(s"is not a child of '${classOf[SessionConfAdvisor].getName}'"))

conf.set(KyuubiConf.SESSION_CONF_ADVISOR, "non.exists")
val msg2 = intercept[IllegalArgumentException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
assert(msg2.startsWith("Error while instantiating 'non.exists'"))
}
}

class FakeAdvisor {}
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -80,6 +80,7 @@
<module>kyuubi-hive-jdbc-shaded</module>
<module>kyuubi-metrics</module>
<module>kyuubi-server</module>
<module>kyuubi-server-plugin</module>
<module>kyuubi-zookeeper</module>
</modules>

Expand Down

0 comments on commit 6811530

Please sign in to comment.