Skip to content

Commit

Permalink
[SPARK-16941] Use concurrentHashMap instead of scala Map in SparkSQLO…
Browse files Browse the repository at this point in the history
…perationManager.

## What changes were proposed in this pull request?
ThriftServer will have some thread-safe problem in **SparkSQLOperationManager**.
Add a SynchronizedMap trait for the maps in it to avoid this problem.

Details in [SPARK-16941](https://issues.apache.org/jira/browse/SPARK-16941)

## How was this patch tested?
NA

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #14534 from SaintBacchus/SPARK-16941.
  • Loading branch information
SaintBacchus authored and srowen committed Aug 11, 2016
1 parent 8a6b703 commit a45fefd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
Expand Up @@ -23,7 +23,7 @@ import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.hive.metastore.api.FieldSchema
Expand All @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {

Expand Down Expand Up @@ -215,15 +215,16 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool(parentSession.getSessionHandle) = value
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
Expand Down
Expand Up @@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
sessionHandle
}

override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.hive.thriftserver.server

import java.util.{Map => JMap}

import scala.collection.mutable.Map
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
Expand All @@ -39,15 +38,17 @@ private[thriftserver] class SparkSQLOperationManager()
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

val sessionToActivePool = Map[SessionHandle, String]()
val sessionToContexts = Map[SessionHandle, SQLContext]()
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts(parentSession.getSessionHandle)
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
Expand Down

0 comments on commit a45fefd

Please sign in to comment.