From 0e9f2239836ce132466070f85090f282a3ff4fbe Mon Sep 17 00:00:00 2001 From: guowei2 Date: Wed, 10 Dec 2014 18:25:34 +0800 Subject: [PATCH 1/3] Fix: ThriftServer use only one SessionState to run sql using hive --- .../thriftserver/SparkSQLSessionManager.scala | 21 +++++++++ .../apache/spark/sql/hive/HiveContext.scala | 43 +++++++++---------- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 6b3275b4eaf04..506f35299a953 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,6 +27,8 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager +import org.apache.hive.service.cli.thrift.TProtocolVersion +import org.apache.hive.service.cli.SessionHandle private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) extends SessionManager @@ -46,4 +48,23 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) initCompositeService(hiveConf) } + + override def openSession( + protocol: TProtocolVersion, + username: String, + password: String, + sessionConf: java.util.Map[String, String], + withImpersonation: Boolean, + delegationToken: String): SessionHandle = { + val sessionHandle = + super.openSession( + protocol, username, password, sessionConf, withImpersonation, delegationToken) + hiveContext.addSessionState(getSession(sessionHandle).getSessionState) + sessionHandle + } + + override def closeSession(sessionHandle: SessionHandle) { + super.closeSession(sessionHandle) + hiveContext.removeSessionState + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 34fc21e61f60f..dda6be63a347c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.types.decimal.Decimal import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand import org.apache.spark.sql.sources.DataSourceStrategy +import scala.collection.mutable /** * DEPRECATED: Use HiveContext instead. @@ -218,20 +219,22 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } - /** - * SQLConf and HiveConf contracts: - * - * 1. reuse existing started SessionState if any - * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the - * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be - * set in the SQLConf *as well as* in the HiveConf. - */ - @transient protected[hive] lazy val (hiveconf, sessionState) = - Option(SessionState.get()) - .orElse { + // store all of the session states to the thread id, this will be updated when open new session + @transient + protected lazy val hiveSessionStates = mutable.Map[Long, SessionState]() + + def addSessionState(sessionState: SessionState) = + hiveSessionStates(Thread.currentThread().getId) = sessionState + + def removeSessionState = + hiveSessionStates -= Thread.currentThread().getId + + def sessionState: SessionState = + hiveSessionStates.getOrElse(Thread.currentThread().getId, + // if start with cliDriver, no records in sessionStates, use the latest one + Option(SessionState.get()) + .orElse { val newState = new SessionState(new HiveConf(classOf[SessionState])) - // Only starts newly created `SessionState` instance. Any existing `SessionState` instance - // returned by `SessionState.get()` must be the most recently started one. SessionState.start(newState) Some(newState) } @@ -239,9 +242,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { setConf(state.getConf.getAllProperties) if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") - (state.getConf, state) + state } - .get + .get) + + def hiveconf = sessionState.getConf override def setConf(key: String, value: String): Unit = { super.setConf(key, value) @@ -291,13 +296,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) - // Makes sure the session represented by the `sessionState` field is activated. This implies - // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks - // session isolation under multi-user scenarios (i.e. HiveThriftServer2). - // TODO Fix session isolation - if (SessionState.get() != sessionState) { - SessionState.start(sessionState) - } + SessionState.start(sessionState) proc match { case driver: Driver => From cdfda4e24b3443103f5eeeae4cf67f696be2660b Mon Sep 17 00:00:00 2001 From: guowei2 Date: Thu, 11 Dec 2014 10:46:48 +0800 Subject: [PATCH 2/3] update session state when create new operation instead of new session --- .../thriftserver/SparkSQLSessionManager.scala | 15 --------------- .../server/SparkSQLOperationManager.scala | 1 + .../org/apache/spark/sql/hive/HiveContext.scala | 4 ++-- 3 files changed, 3 insertions(+), 17 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 506f35299a953..4d714309aa3c1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,7 +27,6 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.cli.SessionHandle private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) @@ -49,20 +48,6 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) initCompositeService(hiveConf) } - override def openSession( - protocol: TProtocolVersion, - username: String, - password: String, - sessionConf: java.util.Map[String, String], - withImpersonation: Boolean, - delegationToken: String): SessionHandle = { - val sessionHandle = - super.openSession( - protocol, username, password, sessionConf, withImpersonation, delegationToken) - hiveContext.addSessionState(getSession(sessionHandle).getSessionState) - sessionHandle - } - override def closeSession(sessionHandle: SessionHandle) { super.closeSession(sessionHandle) hiveContext.removeSessionState diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 99c4f46a82b8e..8c0f29be3c7d0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -48,6 +48,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) + hiveContext.updateSessionState(parentSession.getSessionState) operation } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index dda6be63a347c..4aee8792c2c24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -219,11 +219,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } - // store all of the session states to the thread id, this will be updated when open new session + // store all of the session states to the thread id, this will be updated as new operation @transient protected lazy val hiveSessionStates = mutable.Map[Long, SessionState]() - def addSessionState(sessionState: SessionState) = + def updateSessionState(sessionState: SessionState) = hiveSessionStates(Thread.currentThread().getId) = sessionState def removeSessionState = From 72e351d5d315f3e8c06358c002b31c0050d849ef Mon Sep 17 00:00:00 2001 From: guowei2 Date: Sun, 14 Dec 2014 14:09:13 +0800 Subject: [PATCH 3/3] fix testsuite failed --- .../thriftserver/SparkSQLSessionManager.scala | 2 +- .../server/SparkSQLOperationManager.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 40 +++++++------------ 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 4d714309aa3c1..25b67cd56ee52 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -50,6 +50,6 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) override def closeSession(sessionHandle: SessionHandle) { super.closeSession(sessionHandle) - hiveContext.removeSessionState + hiveContext.currentSessionState.remove() } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 8c0f29be3c7d0..d9c1e4aa59799 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -48,7 +48,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) - hiveContext.updateSessionState(parentSession.getSessionState) + hiveContext.currentSessionState.set(parentSession.getSessionState) operation } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4aee8792c2c24..3730863b1ac71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -219,38 +219,26 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } - // store all of the session states to the thread id, this will be updated as new operation - @transient - protected lazy val hiveSessionStates = mutable.Map[Long, SessionState]() - - def updateSessionState(sessionState: SessionState) = - hiveSessionStates(Thread.currentThread().getId) = sessionState - - def removeSessionState = - hiveSessionStates -= Thread.currentThread().getId - - def sessionState: SessionState = - hiveSessionStates.getOrElse(Thread.currentThread().getId, - // if start with cliDriver, no records in sessionStates, use the latest one - Option(SessionState.get()) - .orElse { - val newState = new SessionState(new HiveConf(classOf[SessionState])) - SessionState.start(newState) - Some(newState) - } - .map { state => + @transient lazy val currentSessionState = new ThreadLocal[SessionState]() { + override def initialValue(): SessionState = { + Option(SessionState.get()).orElse { + val state = new SessionState(new HiveConf(classOf[SessionState])) + SessionState.start(state) setConf(state.getConf.getAllProperties) if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") - state - } - .get) + Some(state) + }.get + } + } + + def sessionState = currentSessionState.get() def hiveconf = sessionState.getConf override def setConf(key: String, value: String): Unit = { - super.setConf(key, value) runSqlHive(s"SET $key=$value") + super.setConf(key, value) } /* A catalyst metadata catalog that points to the Hive Metastore. */ @@ -296,7 +284,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) - SessionState.start(sessionState) + if (SessionState.get() != sessionState) { + SessionState.start(sessionState) + } proc match { case driver: Driver =>