From 4af58bc3c9e3ff436e6258aff96a663cf55aa8ba Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 8 Aug 2016 12:06:17 +0800 Subject: [PATCH 1/6] Add SynchronizedMap trait with Map in SparkSQLOperationManager to avoid concurrency problem. --- .../thriftserver/server/SparkSQLOperationManager.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 79625239dea0e..4931d639a7a85 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver.server import java.util.{Map => JMap} -import scala.collection.mutable.Map +import scala.collection.mutable import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} @@ -39,8 +39,10 @@ private[thriftserver] class SparkSQLOperationManager() val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - val sessionToActivePool = Map[SessionHandle, String]() - val sessionToContexts = Map[SessionHandle, SQLContext]() + val sessionToActivePool = new mutable.HashMap[SessionHandle, String]() + with mutable.SynchronizedMap[SessionHandle, String] + val sessionToContexts = new mutable.HashMap[SessionHandle, SQLContext]() + with mutable.SynchronizedMap[SessionHandle, SQLContext] override def newExecuteStatementOperation( parentSession: HiveSession, From a333269325c28720f817a515443cd8e88718ae1b Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 8 Aug 2016 16:50:43 +0800 Subject: [PATCH 2/6] Use ConcurrentHashMap instead of Scala Map --- .../SparkExecuteStatementOperation.scala | 7 ++++--- .../thriftserver/SparkSQLSessionManager.scala | 4 ++-- .../server/SparkSQLOperationManager.scala | 16 ++++++++-------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e8bcdd76efd7a..d69802431da25 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true) - (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String]) + (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { @@ -206,7 +206,8 @@ private[hive] class SparkExecuteStatementOperation( statementId, parentSession.getUsername) sqlContext.sparkContext.setJobGroup(statementId, statement) - sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => + val pool = sessionToActivePool.get(parentSession.getSessionHandle) + if(null != pool) { sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { @@ -214,7 +215,7 @@ private[hive] class SparkExecuteStatementOperation( logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => - sessionToActivePool(parentSession.getSessionHandle) = value + sessionToActivePool.put(parentSession.getSessionHandle, value) logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 1e4c4790856be..6a5117aea492d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: sqlContext.newSession() } ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) - sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx + sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx) sessionHandle } override def closeSession(sessionHandle: SessionHandle) { HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString) super.closeSession(sessionHandle) - sparkSqlOperationManager.sessionToActivePool -= sessionHandle + sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle) sparkSqlOperationManager.sessionToContexts.remove(sessionHandle) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 4931d639a7a85..2c559316750ee 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.hive.thriftserver.server +import java.util.concurrent.ConcurrentHashMap import java.util.{Map => JMap} -import scala.collection.mutable - import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession - import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveSessionState @@ -39,17 +37,19 @@ private[thriftserver] class SparkSQLOperationManager() val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - val sessionToActivePool = new mutable.HashMap[SessionHandle, String]() - with mutable.SynchronizedMap[SessionHandle, String] - val sessionToContexts = new mutable.HashMap[SessionHandle, SQLContext]() - with mutable.SynchronizedMap[SessionHandle, SQLContext] + val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() + val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val sqlContext = sessionToContexts(parentSession.getSessionHandle) + val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) + if (null == sqlContext) { + throw new HiveSQLException(s"Session handle: ${parentSession.getSessionHandle} has not been" + + s" initialed or had already closed.") + } val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, From 2591b50d7126128aa71bfe3a6a19c3c2d79ddb9a Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 8 Aug 2016 17:03:24 +0800 Subject: [PATCH 3/6] Fix scala style. --- .../thriftserver/SparkExecuteStatementOperation.scala | 8 ++++---- .../thriftserver/server/SparkSQLOperationManager.scala | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index d69802431da25..f210064771903 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} -import java.util.{Arrays, Map => JMap, UUID} +import java.util.{Arrays, Map, UUID} import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -43,9 +43,9 @@ import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, - confOverlay: JMap[String, String], + confOverlay: Map[String, String], runInBackground: Boolean = true) - (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) + (sqlContext: SQLContext, sessionToActivePool: Map[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 2c559316750ee..63340eec8323f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.hive.thriftserver.server import java.util.concurrent.ConcurrentHashMap -import java.util.{Map => JMap} +import java.util.Map import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession + import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveSessionState @@ -35,7 +36,7 @@ private[thriftserver] class SparkSQLOperationManager() extends OperationManager with Logging { val handleToOperation = ReflectionUtils - .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") + .getSuperField[Map[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() @@ -43,7 +44,7 @@ private[thriftserver] class SparkSQLOperationManager() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, - confOverlay: JMap[String, String], + confOverlay: Map[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) if (null == sqlContext) { From 592d8170ee21c2b28f336fb7dad4fa540e44011f Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 9 Aug 2016 09:04:37 +0800 Subject: [PATCH 4/6] Rename Java map --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 6 +++--- .../hive/thriftserver/server/SparkSQLOperationManager.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f210064771903..ac1044dce50f0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} -import java.util.{Arrays, Map, UUID} +import java.util.{Arrays, Map => JMap, UUID} import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ @@ -43,9 +43,9 @@ import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, - confOverlay: Map[String, String], + confOverlay: JMap[String, String], runInBackground: Boolean = true) - (sqlContext: SQLContext, sessionToActivePool: Map[SessionHandle, String]) + (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 63340eec8323f..2de02fcf788f1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.hive.thriftserver.server +import java.util.{Map => JMap} import java.util.concurrent.ConcurrentHashMap -import java.util.Map import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} @@ -36,7 +36,7 @@ private[thriftserver] class SparkSQLOperationManager() extends OperationManager with Logging { val handleToOperation = ReflectionUtils - .getSuperField[Map[OperationHandle, Operation]](this, "handleToOperation") + .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() @@ -44,7 +44,7 @@ private[thriftserver] class SparkSQLOperationManager() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, - confOverlay: Map[String, String], + confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) if (null == sqlContext) { From 0a436a0a911151e0cc823a81974473f89e8bb966 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 9 Aug 2016 17:07:42 +0800 Subject: [PATCH 5/6] Commit the comment. --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- .../hive/thriftserver/server/SparkSQLOperationManager.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ac1044dce50f0..ba5bd5c309cc1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -207,7 +207,7 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) sqlContext.sparkContext.setJobGroup(statementId, statement) val pool = sessionToActivePool.get(parentSession.getSessionHandle) - if(null != pool) { + if (pool != null) { sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 2de02fcf788f1..40c5b3298a87f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -47,10 +47,8 @@ private[thriftserver] class SparkSQLOperationManager() confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) - if (null == sqlContext) { - throw new HiveSQLException(s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialed or had already closed.") - } + require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + s" initialed or had already closed.") val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, From 4f2726136cf8d71898c4cfd30a6db141b30d6d0f Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 10 Aug 2016 20:45:09 +0800 Subject: [PATCH 6/6] Fix typo --- .../sql/hive/thriftserver/server/SparkSQLOperationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 40c5b3298a87f..49ab664009341 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -48,7 +48,7 @@ private[thriftserver] class SparkSQLOperationManager() async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + - s" initialed or had already closed.") + s" initialized or had already closed.") val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,