Skip to content

Commit 7cede6f

Browse files
committed
[KYUUBI #1987] Support preserve user context in group/server share level
### _Why are the changes needed?_ close #1987 note that, this pr only for Spark engine ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2435 from ulysses-you/kyuubi-1987. Closes #1987 85826a1 [ulysses-you] style 9ddcc0c [ulysses-you] shutdown a568054 [ulysses-you] release d133905 [ulysses-you] address comment 8e958ff [ulysses-you] docs dd9b442 [ulysses-you] Support preserve user context in group share level Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 9fd62a2 commit 7cede6f

File tree

8 files changed

+233
-32
lines changed

8 files changed

+233
-32
lines changed

docs/deployment/settings.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ Key | Default | Meaning | Type | Since
228228
<code>kyuubi.engine.ui.retainedSessions</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
229229
<code>kyuubi.engine.ui.retainedStatements</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of statements kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
230230
<code>kyuubi.engine.ui.stop.enabled</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
231+
<code>kyuubi.engine.user.isolated.spark.session</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including: the temporary views, function registries, SQL configuration and the current database. Note that, it does not affect if the share level is connection or user.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
232+
<code>kyuubi.engine.user.isolated.spark.session.idle.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check if the user isolated spark session is timeout.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
233+
<code>kyuubi.engine.user.isolated.spark.session.idle.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>If kyuubi.engine.user.isolated.spark.session is false, we will release the spark session if its corresponding user is inactive after this configured timeout.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
231234

232235

233236
### Frontend

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import org.apache.spark.SparkContext
2525
import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.util.kvstore.KVIndex
2727

28-
import org.apache.kyuubi.Utils
28+
import org.apache.kyuubi.{Logging, Utils}
2929

30-
object KyuubiSparkUtil {
30+
object KyuubiSparkUtil extends Logging {
3131

3232
type KVIndexParam = KVIndex @getter
3333

@@ -36,6 +36,21 @@ object KyuubiSparkUtil {
3636

3737
def globalSparkContext: SparkContext = SparkSession.active.sparkContext
3838

39+
def initializeSparkSession(spark: SparkSession, initializationSQLs: Seq[String]): Unit = {
40+
initializationSQLs.foreach { sql =>
41+
spark.sparkContext.setJobGroup(
42+
"initialization sql queries",
43+
sql,
44+
interruptOnCancel = true)
45+
debug(s"Execute initialization sql: $sql")
46+
try {
47+
spark.sql(sql).isEmpty
48+
} finally {
49+
spark.sparkContext.clearJobGroup()
50+
}
51+
}
52+
}
53+
3954
def engineId: String = globalSparkContext.applicationId
4055

4156
lazy val diagnostics: String = {

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,16 +187,9 @@ object SparkSQLEngine extends Logging {
187187

188188
def createSpark(): SparkSession = {
189189
val session = SparkSession.builder.config(_sparkConf).getOrCreate
190-
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
191-
.foreach { sqlStr =>
192-
session.sparkContext.setJobGroup(
193-
"engine_initializing_queries",
194-
sqlStr,
195-
interruptOnCancel = true)
196-
debug(s"Execute session initializing sql: $sqlStr")
197-
session.sql(sqlStr).isEmpty
198-
session.sparkContext.clearJobGroup()
199-
}
190+
KyuubiSparkUtil.initializeSparkSession(
191+
session,
192+
kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
200193
session
201194
}
202195

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala

Lines changed: 99 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@
1717

1818
package org.apache.kyuubi.engine.spark.session
1919

20+
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
21+
2022
import org.apache.hive.service.rpc.thrift.TProtocolVersion
2123
import org.apache.spark.sql.SparkSession
2224

2325
import org.apache.kyuubi.{KyuubiSQLException, Utils}
2426
import org.apache.kyuubi.config.KyuubiConf
2527
import org.apache.kyuubi.config.KyuubiConf._
2628
import org.apache.kyuubi.engine.ShareLevel
27-
import org.apache.kyuubi.engine.spark.SparkSQLEngine
29+
import org.apache.kyuubi.engine.ShareLevel._
30+
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
2831
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
2932
import org.apache.kyuubi.session._
33+
import org.apache.kyuubi.util.ThreadUtils
3034

3135
/**
3236
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
@@ -50,6 +54,87 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
5054
val operationManager = new SparkSQLOperationManager()
5155

5256
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
57+
private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))
58+
59+
private lazy val userIsolatedSparkSession = conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION)
60+
private lazy val userIsolatedIdleInterval =
61+
conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL)
62+
private lazy val userIsolatedIdleTimeout =
63+
conf.get(ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT)
64+
private val userIsolatedCacheLock = new Object
65+
private lazy val userIsolatedCache = new java.util.HashMap[String, SparkSession]()
66+
private lazy val userIsolatedCacheCount =
67+
new java.util.HashMap[String, (Integer, java.lang.Long)]()
68+
private var userIsolatedSparkSessionThread: Option[ScheduledExecutorService] = None
69+
70+
private def startUserIsolatedCacheChecker(): Unit = {
71+
if (!userIsolatedSparkSession) {
72+
userIsolatedSparkSessionThread =
73+
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
74+
userIsolatedSparkSessionThread.foreach {
75+
_.scheduleWithFixedDelay(
76+
() => {
77+
userIsolatedCacheLock.synchronized {
78+
val iter = userIsolatedCacheCount.entrySet().iterator()
79+
while (iter.hasNext) {
80+
val kv = iter.next()
81+
if (kv.getValue._1 == 0 &&
82+
kv.getValue._2 + userIsolatedIdleTimeout < System.currentTimeMillis()) {
83+
userIsolatedCache.remove(kv.getKey)
84+
iter.remove()
85+
}
86+
}
87+
}
88+
},
89+
userIsolatedIdleInterval,
90+
userIsolatedIdleInterval,
91+
TimeUnit.MILLISECONDS)
92+
}
93+
}
94+
}
95+
96+
override def start(): Unit = {
97+
startUserIsolatedCacheChecker()
98+
super.start()
99+
}
100+
101+
override def stop(): Unit = {
102+
super.stop()
103+
userIsolatedSparkSessionThread.foreach(_.shutdown())
104+
}
105+
106+
private def getOrNewSparkSession(user: String): SparkSession = {
107+
if (singleSparkSession) {
108+
spark
109+
} else {
110+
shareLevel match {
111+
// it's unnecessary to create a new spark session in connection share level
112+
// since the session is only one
113+
case CONNECTION => spark
114+
case USER => newSparkSession(spark)
115+
case GROUP | SERVER if userIsolatedSparkSession => newSparkSession(spark)
116+
case GROUP | SERVER =>
117+
userIsolatedCacheLock.synchronized {
118+
if (userIsolatedCache.containsKey(user)) {
119+
val (count, _) = userIsolatedCacheCount.get(user)
120+
userIsolatedCacheCount.put(user, (count + 1, System.currentTimeMillis()))
121+
userIsolatedCache.get(user)
122+
} else {
123+
userIsolatedCacheCount.put(user, (1, System.currentTimeMillis()))
124+
val newSession = newSparkSession(spark)
125+
userIsolatedCache.put(user, newSession)
126+
newSession
127+
}
128+
}
129+
}
130+
}
131+
}
132+
133+
private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
134+
val newSparkSession = rootSparkSession.newSession()
135+
KyuubiSparkUtil.initializeSparkSession(newSparkSession, conf.get(ENGINE_SESSION_INITIALIZE_SQL))
136+
newSparkSession
137+
}
53138

54139
override protected def createSession(
55140
protocol: TProtocolVersion,
@@ -60,21 +145,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
60145
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
61146
val sparkSession =
62147
try {
63-
if (singleSparkSession) {
64-
spark
65-
} else {
66-
val ss = spark.newSession()
67-
this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
68-
ss.sparkContext.setJobGroup(
69-
"engine_initializing_queries",
70-
sqlStr,
71-
interruptOnCancel = true)
72-
debug(s"Execute session initializing sql: $sqlStr")
73-
ss.sql(sqlStr).isEmpty
74-
ss.sparkContext.clearJobGroup()
75-
}
76-
ss
77-
}
148+
getOrNewSparkSession(user)
78149
} catch {
79150
case e: Exception => throw KyuubiSQLException(e)
80151
}
@@ -91,8 +162,19 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
91162
}
92163

93164
override def closeSession(sessionHandle: SessionHandle): Unit = {
165+
if (!userIsolatedSparkSession) {
166+
val session = getSession(sessionHandle)
167+
if (session != null) {
168+
userIsolatedCacheLock.synchronized {
169+
if (userIsolatedCacheCount.containsKey(session.user)) {
170+
val (count, _) = userIsolatedCacheCount.get(session.user)
171+
userIsolatedCacheCount.put(session.user, (count - 1, System.currentTimeMillis()))
172+
}
173+
}
174+
}
175+
}
94176
super.closeSession(sessionHandle)
95-
if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
177+
if (shareLevel == ShareLevel.CONNECTION) {
96178
info("Session stopped due to shared level is Connection.")
97179
stopSession()
98180
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.spark.session
19+
20+
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
21+
22+
import org.apache.kyuubi.config.KyuubiConf._
23+
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
24+
import org.apache.kyuubi.operation.HiveJDBCTestHelper
25+
26+
class UserIsolatedSessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
27+
28+
override def withKyuubiConf: Map[String, String] = {
29+
Map(
30+
ENGINE_SHARE_LEVEL.key -> "GROUP",
31+
ENGINE_USER_ISOLATED_SPARK_SESSION.key -> "false",
32+
ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL.key -> "100",
33+
ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT.key -> "5000")
34+
}
35+
36+
override protected def jdbcUrl: String =
37+
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;#spark.ui.enabled=false"
38+
39+
private def executeSetStatement(user: String, statement: String): String = {
40+
withThriftClient(Some(user)) { client =>
41+
val req = new TOpenSessionReq()
42+
req.setUsername(user)
43+
req.setPassword("anonymous")
44+
val tOpenSessionResp = client.OpenSession(req)
45+
val tExecuteStatementReq = new TExecuteStatementReq()
46+
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
47+
tExecuteStatementReq.setStatement(statement)
48+
tExecuteStatementReq.setRunAsync(false)
49+
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
50+
51+
val operationHandle = tExecuteStatementResp.getOperationHandle
52+
val tFetchResultsReq = new TFetchResultsReq()
53+
tFetchResultsReq.setOperationHandle(operationHandle)
54+
tFetchResultsReq.setFetchType(0)
55+
tFetchResultsReq.setMaxRows(1)
56+
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
57+
tFetchResultsResp.getResults.getColumns.get(1).getStringVal.getValues.get(0)
58+
}
59+
}
60+
61+
test("isolated user spark session") {
62+
executeSetStatement("user1", "set a=1")
63+
assert(executeSetStatement("user1", "set a") == "1")
64+
assert(executeSetStatement("user1", "set a") == "1")
65+
assert(executeSetStatement("user2", "set a") == "<undefined>")
66+
executeSetStatement("user2", "set a=2")
67+
assert(executeSetStatement("user1", "set a") == "1")
68+
assert(executeSetStatement("user2", "set a") == "2")
69+
70+
Thread.sleep(6000)
71+
assert(executeSetStatement("user1", "set a") == "<undefined>")
72+
assert(executeSetStatement("user2", "set a") == "<undefined>")
73+
}
74+
}

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,31 @@ object KyuubiConf {
11431143
.booleanConf
11441144
.createWithDefault(false)
11451145

1146+
val ENGINE_USER_ISOLATED_SPARK_SESSION: ConfigEntry[Boolean] =
1147+
buildConf("kyuubi.engine.user.isolated.spark.session")
1148+
.doc("When set to false, if the engine is running in a group or server share level, " +
1149+
"all the JDBC/ODBC connections will be isolated against the user. Including: " +
1150+
"the temporary views, function registries, SQL configuration and the current database. " +
1151+
"Note that, it does not affect if the share level is connection or user.")
1152+
.version("1.6.0")
1153+
.booleanConf
1154+
.createWithDefault(true)
1155+
1156+
val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] =
1157+
buildConf("kyuubi.engine.user.isolated.spark.session.idle.timeout")
1158+
.doc(s"If ${ENGINE_USER_ISOLATED_SPARK_SESSION.key} is false, we will release the " +
1159+
s"spark session if its corresponding user is inactive after this configured timeout.")
1160+
.version("1.6.0")
1161+
.timeConf
1162+
.createWithDefault(Duration.ofHours(6).toMillis)
1163+
1164+
val ENGINE_USER_ISOLATED_SPARK_SESSION_IDLE_INTERVAL: ConfigEntry[Long] =
1165+
buildConf("kyuubi.engine.user.isolated.spark.session.idle.interval")
1166+
.doc(s"The interval to check if the user isolated spark session is timeout.")
1167+
.version("1.6.0")
1168+
.timeConf
1169+
.createWithDefault(Duration.ofMinutes(1).toMillis)
1170+
11461171
val SERVER_EVENT_JSON_LOG_PATH: ConfigEntry[String] =
11471172
buildConf("kyuubi.backend.server.event.json.log.path")
11481173
.doc("The location of server events go for the builtin JSON logger")

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
7777
}
7878

7979
def withThriftClient[T](f: TCLIService.Iface => T): T = {
80-
TClientTestUtils.withThriftClient(jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head)(f)
80+
withThriftClient()(f)
81+
}
82+
83+
def withThriftClient[T](user: Option[String] = None)(f: TCLIService.Iface => T): T = {
84+
TClientTestUtils.withThriftClient(
85+
jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head,
86+
user)(f)
8187
}
8288

8389
def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = {

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ import org.apache.kyuubi.service.authentication.PlainSASLHelper
3030

3131
object TClientTestUtils extends Logging {
3232

33-
def withThriftClient[T](url: String)(f: Iface => T): T = {
33+
def withThriftClient[T](url: String, user: Option[String] = None)(f: Iface => T): T = {
3434
val hostport = url.split(':')
3535
val socket = new TSocket(hostport.head, hostport.last.toInt)
36-
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
36+
val transport = PlainSASLHelper.getPlainTransport(
37+
user.getOrElse(Utils.currentUser),
38+
"anonymous",
39+
socket)
3740
val protocol = new TBinaryProtocol(transport)
3841
val client = new TCLIService.Client(protocol)
3942
transport.open()

0 commit comments

Comments
 (0)