Skip to content

Commit

Permalink
[SPARK-4756][SQL] FIX: sessionToActivePool grow infinitely, even as s…
Browse files Browse the repository at this point in the history
…essions expire

**sessionToActivePool** in **SparkSQLOperationManager** grow infinitely, even as sessions expire.
we should remove the pool value when the session closed, even though **sessionToActivePool** would not exist in all of sessions.

Author: guowei2 <guowei2@asiainfo.com>

Closes apache#3617 from guowei2/SPARK-4756 and squashes the following commits:

e9b97b8 [guowei2] fix compile bug with Shim12
cf0f521 [guowei2] Merge remote-tracking branch 'apache/master' into SPARK-4756
e070998 [guowei2] fix: remove active pool of the session when it expired
  • Loading branch information
guowei2 authored and marmbrus committed Dec 19, 2014
1 parent b68bc6d commit 22ddb6e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.hive.service.cli.SessionHandle

private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SessionManager
with ReflectedCompositeService {

private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)

override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)

Expand All @@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")

val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)

initCompositeService(hiveConf)
}

override def closeSession(sessionHandle: SessionHandle) {
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

// TODO: Currenlty this will grow infinitely, even as sessions expire
val sessionToActivePool = Map[HiveSession, String]()
val sessionToActivePool = Map[SessionHandle, String]()

override def newExecuteStatementOperation(
parentSession: HiveSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String])(
hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String])
sessionToActivePool: SMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {

private var result: SchemaRDD = _
Expand Down Expand Up @@ -191,14 +191,14 @@ private[hive] class SparkExecuteStatementOperation(
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}

val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String],
runInBackground: Boolean = true)(
hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String])
sessionToActivePool: SMap[SessionHandle, String])
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {

Expand Down Expand Up @@ -162,14 +162,14 @@ private[hive] class SparkExecuteStatementOperation(
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}

val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
Expand Down

0 comments on commit 22ddb6e

Please sign in to comment.