Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16941]Use concurrentHashMap instead of scala Map in SparkSQLOperationManager. #14534

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,7 +23,7 @@ import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.hive.metastore.api.FieldSchema
Expand All @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {

Expand Down Expand Up @@ -206,15 +206,16 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if(null != pool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (pool != null)

sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool(parentSession.getSessionHandle) = value
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
Expand Down
Expand Up @@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
sessionHandle
}

override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.hive.thriftserver.server

import java.util.{Map => JMap}

import scala.collection.mutable.Map
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
Expand All @@ -39,15 +38,19 @@ private[thriftserver] class SparkSQLOperationManager()
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")

val sessionToActivePool = Map[SessionHandle, String]()
val sessionToContexts = Map[SessionHandle, SQLContext]()
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here, make them private for a bit more future-proofing of access to these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole class is private, it this necessary to make flied to be private?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only private[thriftserver]. It's minor, and a whole lot of stuff in Spark that should be private isn't, but I wondered if it was worth it here because you're concerned with synchronizing access to this object and therefore possibly concerned with what is accessing it. The usages you changed look like they're sufficiently protected, but are there others BTW?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionToActivePool and sessionToContext will be used in SparkSQLSessionManager at openSession and closeSession methond. To make this field as private, it must add new funciton here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, leave it. It'd be nice to encapsulate access a bit more but not that important. =

val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()

override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts(parentSession.getSessionHandle)
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
if (null == sqlContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be HiveSQLException? I'd just use require to generate an IllegalArgumentException

throw new HiveSQLException(s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialed or had already closed.")
}
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
Expand Down