Skip to content

Commit

Permalink
[KYUUBI #2331] Add createSession method to further abstract openSession
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

close #2331

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2332 from wForget/KYUUBI-2331.

Closes #2331

ce7f32e [wforget] fix
b74efd8 [wforget] [KYUUBI-2331] Add createSession method to further abstract openSession

Authored-by: wforget <643348094@qq.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
wForget authored and ulysses-you committed Apr 13, 2022
1 parent 55c4cae commit 8a525e5
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 95 deletions.
Expand Up @@ -21,10 +21,9 @@ import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}

class FlinkSQLSessionManager(engineContext: DefaultContext)
extends SessionManager("FlinkSQLSessionManager") {
Expand All @@ -39,20 +38,20 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
executor.start()
}

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
conf: Map[String, String]): Session = {

val sessionHandle = SessionHandle(protocol)
val sessionId = sessionHandle.identifier.toString

executor.openSession(sessionId)
val sessionContext = FlinkEngineUtils.getSessionContext(executor, sessionId)

val sessionImpl = new FlinkSessionImpl(
new FlinkSessionImpl(
protocol,
user,
password,
Expand All @@ -61,18 +60,6 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
this,
sessionHandle,
sessionContext)

try {
sessionImpl.open()
setSession(sessionHandle, sessionImpl)
info(s"$user's session with $sessionHandle is opened, current opening sessions" +
s" $getOpenSessionCount")
sessionHandle
} catch {
case e: Exception =>
sessionImpl.close()
throw KyuubiSQLException(e)
}
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
Expand Down
Expand Up @@ -25,13 +25,12 @@ import org.apache.hive.service.cli.{SessionHandle => ImportedSessionHandle}
import org.apache.hive.service.cli.session.{HiveSessionImpl => ImportedHiveSessionImpl, SessionManager => ImportedHiveSessionManager}
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.engine.hive.operation.HiveOperationManager
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.session.{CLIENT_IP_KEY, SessionHandle, SessionManager}
import org.apache.kyuubi.session.{CLIENT_IP_KEY, Session, SessionHandle, SessionManager}

class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSessionManager") {
override protected def isServer: Boolean = false
Expand Down Expand Up @@ -65,15 +64,14 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
}
}

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
conf: Map[String, String]): Session = {
val sessionHandle = SessionHandle(protocol)
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
info(s"Opening session for $user@$clientIp")
val hive = new ImportedHiveSessionImpl(
new ImportedSessionHandle(sessionHandle.toTSessionHandle, protocol),
protocol,
Expand All @@ -85,7 +83,7 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
hive.setSessionManager(internalSessionManager)
hive.setOperationManager(internalSessionManager.getOperationManager)
operationLogRoot.foreach(dir => hive.setOperationLogSessionDir(new File(dir)))
val session = new HiveSessionImpl(
new HiveSessionImpl(
protocol,
user,
password,
Expand All @@ -95,17 +93,6 @@ class HiveSessionManager(engine: HiveSQLEngine) extends SessionManager("HiveSess
this,
sessionHandle,
hive)
try {
session.open()
setSession(sessionHandle, session)
info(s"$user's session with $sessionHandle is opened, current opening sessions" +
s" $getOpenSessionCount")
sessionHandle
} catch {
case e: Exception =>
session.close()
throw KyuubiSQLException(e)
}
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
Expand Down
Expand Up @@ -51,14 +51,13 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)

private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
conf: Map[String, String]): Session = {
val clientIp = conf.getOrElse(CLIENT_IP_KEY, ipAddress)
info(s"Opening session for $user@$clientIp")
val sparkSession =
try {
if (singleSparkSession) {
Expand All @@ -80,7 +79,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
case e: Exception => throw KyuubiSQLException(e)
}

val session = new SparkSessionImpl(
new SparkSessionImpl(
protocol,
user,
password,
Expand All @@ -89,18 +88,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
conf,
this,
sparkSession)
try {
val handle = session.handle
session.open()
setSession(handle, session)
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
session.close()
throw KyuubiSQLException(e)
}
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
Expand Down
Expand Up @@ -19,16 +19,14 @@ package org.apache.kyuubi.engine.trino.session

import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_LOG_DIR_ROOT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.trino.TrinoSqlEngine
import org.apache.kyuubi.engine.trino.operation.TrinoOperationManager
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.session.SessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}

class TrinoSessionManager
extends SessionManager("TrinoSessionManager") {
Expand All @@ -41,28 +39,13 @@ class TrinoSessionManager
super.initialize(conf)
}

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
val sessionImpl =
new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)

try {
val handle = sessionImpl.handle
sessionImpl.open()
setSession(handle, sessionImpl)
info(s"$user's trino session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
sessionImpl.close()
throw KyuubiSQLException(e)
}
conf: Map[String, String]): Session = {
new TrinoSessionImpl(protocol, user, password, ipAddress, conf, this)
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
Expand Down
Expand Up @@ -77,12 +77,39 @@ abstract class SessionManager(name: String) extends CompositeService(name) {

def operationManager: OperationManager

protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): Session

def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
val session = createSession(protocol, user, password, ipAddress, conf)
try {
val handle = session.handle
session.open()
setSession(handle, session)
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
try {
session.close()
} catch {
case t: Throwable =>
warn(s"Error closing session for $user client ip: $ipAddress", t)
}
throw KyuubiSQLException(e)
}
}

def closeSession(sessionHandle: SessionHandle): Unit = {
_latestLogoutTime = System.currentTimeMillis()
Expand Down
Expand Up @@ -35,18 +35,16 @@ class NoopSessionManager extends SessionManager("noop") {
super.initialize(conf)
}

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
conf: Map[String, String]): Session = {
if (conf.get("kyuubi.test.should.fail").exists(_.toBoolean)) {
throw KyuubiSQLException("Asked to fail")
}
val session = new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
setSession(session.handle, session)
session.handle
new NoopSessionImpl(protocol, user, password, ipAddress, conf, this)
}

override protected def isServer: Boolean = true
Expand Down
Expand Up @@ -45,43 +45,39 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
super.initialize(conf)
}

override def openSession(
override protected def createSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {

val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
conf: Map[String, String]): Session = {
// inject client ip into session conf
val newConf = conf + (CLIENT_IP_KEY -> ipAddress)
val sessionImpl = new KyuubiSessionImpl(
new KyuubiSessionImpl(
protocol,
username,
user,
password,
ipAddress,
newConf,
this,
this.getConf.getUserDefaults(user))
}

override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
try {
sessionImpl.open()
val handle = sessionImpl.handle
setSession(handle, sessionImpl)
info(s"$username's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
super.openSession(protocol, username, password, ipAddress, conf)
} catch {
case e: Throwable =>
MetricsSystem.tracing { ms =>
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
}
try {
sessionImpl.close()
} catch {
case t: Throwable =>
warn(s"Error closing session for $username client ip: $ipAddress", t)
}
throw KyuubiSQLException(
s"Error opening session for $username client ip $ipAddress, due to ${e.getMessage}",
e)
Expand Down

0 comments on commit 8a525e5

Please sign in to comment.