Skip to content

Commit

Permalink
Use ConcurrentHashMap instead of Scala Map
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintBacchus committed Aug 8, 2016
1 parent 4af58bc commit a333269
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
Expand Up @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {

Expand Down Expand Up @@ -206,15 +206,16 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if(null != pool) {
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool(parentSession.getSessionHandle) = value
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
Expand Down
Expand Up @@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
sessionHandle
}

override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.sql.hive.thriftserver.server

import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}

import scala.collection.mutable

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveSessionState
Expand All @@ -39,17 +37,19 @@ private[thriftserver] class SparkSQLOperationManager()
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

val sessionToActivePool = new mutable.HashMap[SessionHandle, String]()
with mutable.SynchronizedMap[SessionHandle, String]
val sessionToContexts = new mutable.HashMap[SessionHandle, SQLContext]()
with mutable.SynchronizedMap[SessionHandle, SQLContext]
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts(parentSession.getSessionHandle)
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
if (null == sqlContext) {
throw new HiveSQLException(s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialed or had already closed.")
}
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
Expand Down

0 comments on commit a333269

Please sign in to comment.