New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16941]Use concurrentHashMap instead of scala Map in SparkSQLOperationManager. #14534
Conversation
…id concurrency problem.
Test build #63346 has finished for PR 14534 at commit
|
@@ -39,8 +39,10 @@ private[thriftserver] class SparkSQLOperationManager() | |||
val handleToOperation = ReflectionUtils | |||
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") | |||
|
|||
val sessionToActivePool = Map[SessionHandle, String]() | |||
val sessionToContexts = Map[SessionHandle, SQLContext]() | |||
val sessionToActivePool = new mutable.HashMap[SessionHandle, String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the best practice is to use Java's ConcurrentHashMap
. I seem to recall Scala's trait is deprecated or discouraged. Is this amount of synchronization sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct; SynchronizedMap has been deprecated since Scala 2.11.0 with this comment in the API docs: "Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentHashMap as an alternative."
The title of this PR must be updated to match what is actually being done after the switch to use ConcurrentHashMap since we don't want the misleading "Add SynchronizedMap trait" to persist in the commit history.
Test build #63353 has finished for PR 14534 at commit
|
Test build #63354 has finished for PR 14534 at commit
|
|
||
import scala.collection.mutable.Map | ||
import java.util.concurrent.ConcurrentHashMap | ||
import java.util.Map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename this -- otherwise it is very confusing whether we are using a scala Map or a java Map
Test build #63400 has finished for PR 14534 at commit
|
cc/ @srowen Is this OK? |
@@ -206,15 +206,16 @@ private[hive] class SparkExecuteStatementOperation( | |||
statementId, | |||
parentSession.getUsername) | |||
sqlContext.sparkContext.setJobGroup(statementId, statement) | |||
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => | |||
val pool = sessionToActivePool.get(parentSession.getSessionHandle) | |||
if(null != pool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (pool != null)
Test build #63433 has finished for PR 14534 at commit
|
any other comment? |
val sqlContext = sessionToContexts(parentSession.getSessionHandle) | ||
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) | ||
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + | ||
s" initialed or had already closed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last tiny nit: initaled -> initialized
Test build #63531 has finished for PR 14534 at commit
|
Merged to master |
What changes were proposed in this pull request?
ThriftServer will have some thread-safe problem in SparkSQLOperationManager.
Add a SynchronizedMap trait for the maps in it to avoid this problem.
Details in SPARK-16941
How was this patch tested?
NA