diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index b384f879330..202cdbffd09 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -328,6 +328,7 @@ **/org/apache/spark/launcher/*.class **/org/apache/spark/deploy/*.class **/org/apache/spark/SparkEnv.class + **/org/apache/spark/SparkEnv$.class **/org/apache/spark/sql/hive/client/*.class diff --git a/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala b/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala index 457adebad58..3a95d00acd0 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala @@ -145,7 +145,7 @@ object SparkEnv extends Logging { private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" - private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName + private def user = UserGroupInformation.getCurrentUser.getShortUserName def set(e: SparkEnv) { if (e == null) { @@ -162,7 +162,7 @@ object SparkEnv extends Logging { */ def get: SparkEnv = { debug(s"Kyuubi: Get SparkEnv for $user") - env.get(user) + env.getOrDefault(user, env.values().iterator().next()) } /** diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala index a10925d6d4c..8ce71db12d2 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala @@ -36,7 +36,7 @@ class KyuubiSQLException(reason: String, sqlState: String, vendorCode: Int, caus def this(reason: String) = this(reason, sqlState = null) - def this(cause: Throwable) = this(reason = null, cause) + def this(cause: Throwable) = this(cause.toString, cause) /** * Converts current object to a [[TStatus]] object @@ -64,26 +64,38 @@ object KyuubiSQLException { tStatus } - def toString( - cause: Throwable, - parent: Array[StackTraceElement] = Array.empty): List[String] = { + + def toString(cause: Throwable): List[String] = { + toString(cause, null) + } + + def toString(cause: Throwable, parent: Array[StackTraceElement]): List[String] = { val trace = cause.getStackTrace - enroll(cause, trace.diff(parent)) ++ + var m = trace.length - 1 + if (parent != null) { + var n = parent.length - 1 + while (m >= 0 && n >=0 && trace(m).equals(parent(n))) { + m = m - 1 + n = n - 1 + } + } + + enroll(cause, trace, m) ++ Option(cause.getCause).map(toString(_, trace)).getOrElse(Nil) } - private[this] def enroll( - ex: Throwable, - trace: Array[StackTraceElement]): List[String] = { + private[this] def enroll(ex: Throwable, + trace: Array[StackTraceElement], max: Int): List[String] = { val builder = new StringBuilder builder.append('*').append(ex.getClass.getName).append(':') - Option(ex.getMessage).map(_.stripSuffix(";")).foreach(msg => builder.append(msg).append(":")) - builder.append(trace.length).append(':').append(trace.length - 1) - List(builder.toString) ++ trace.map { t => + builder.append(ex.getMessage).append(':') + builder.append(trace.length).append(':').append(max) + List(builder.toString) ++ (0 to max).map { i => builder.setLength(0) - builder.append(t.getClassName).append(":").append(t.getMethodName).append(":") - Option(t.getFileName).foreach(builder.append) - builder.append(":").append(t.getLineNumber) + builder.append(trace(i).getClassName).append(":") + builder.append(trace(i).getMethodName).append(":") + builder.append(Option(trace(i).getFileName).getOrElse("")).append(':') + builder.append(trace(i).getLineNumber) builder.toString }.toList } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index cae8b8c8dee..3ce6f932567 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -221,14 +221,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging result.schema } - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { + def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = { validateDefaultFetchOrientation(order) assertState(FINISHED) setHasResultSet(true) val taken = if (order == FetchOrientation.FETCH_FIRST) { - result.toLocalIterator().asScala.take(maxRowsL.toInt) + result.toLocalIterator().asScala.take(rowSetSize.toInt) } else { - iter.take(maxRowsL.toInt) + iter.take(rowSetSize.toInt) } RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion) } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index d4e4aa468ad..b5c087f96f0 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -94,7 +94,7 @@ private[kyuubi] class OperationManager private(name: String) def getOperation(operationHandle: OperationHandle): KyuubiOperation = { val operation = getOperationInternal(operationHandle) if (operation == null) { - throw new KyuubiSQLException("Invalid OperationHandle: " + operationHandle) + throw new KyuubiSQLException("Invalid OperationHandle " + operationHandle) } operation } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index 943e321d415..e71ba554853 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -57,14 +57,14 @@ private[kyuubi] class KyuubiSession( sessionManager: SessionManager, operationManager: OperationManager) extends Logging { - private[this] val sessionHandle: SessionHandle = new SessionHandle(protocol) - private[this] val opHandleSet = new MHSet[OperationHandle] - private[this] var _isOperationLogEnabled = false - private[this] var sessionLogDir: File = _ - @volatile private[this] var lastAccessTime: Long = System.currentTimeMillis() - private[this] var lastIdleTime = 0L - - private[this] val sessionUGI: UserGroupInformation = { + private val sessionHandle: SessionHandle = new SessionHandle(protocol) + private val opHandleSet = new MHSet[OperationHandle] + private var _isOperationLogEnabled = false + private var sessionLogDir: File = _ + @volatile private var lastAccessTime: Long = System.currentTimeMillis() + private var lastIdleTime = 0L + + private val sessionUGI: UserGroupInformation = { val currentUser = UserGroupInformation.getCurrentUser if (withImpersonation) { if (UserGroupInformation.isSecurityEnabled) { @@ -82,16 +82,16 @@ private[kyuubi] class KyuubiSession( } } - private[this] val sparkSessionWithUGI = + private val sparkSessionWithUGI = new SparkSessionWithUGI(sessionUGI, conf, sessionManager.getCacheMgr) - private[this] def acquire(userAccess: Boolean): Unit = { + private def acquire(userAccess: Boolean): Unit = { if (userAccess) { lastAccessTime = System.currentTimeMillis } } - private[this] def release(userAccess: Boolean): Unit = { + private def release(userAccess: Boolean): Unit = { if (userAccess) { lastAccessTime = System.currentTimeMillis } @@ -103,7 +103,7 @@ private[kyuubi] class KyuubiSession( } @throws[KyuubiSQLException] - private[this] def executeStatementInternal(statement: String) = { + private def executeStatementInternal(statement: String): OperationHandle = { acquire(true) val operation = operationManager.newExecuteStatementOperation(this, statement) @@ -121,7 +121,7 @@ private[kyuubi] class KyuubiSession( } } - private[this] def cleanupSessionLogDir(): Unit = { + private def cleanupSessionLogDir(): Unit = { if (_isOperationLogEnabled) { try { FileUtils.forceDelete(sessionLogDir) @@ -152,7 +152,7 @@ private[kyuubi] class KyuubiSession( case GetInfoType.DBMS_VERSION => new GetInfoValue(this.sparkSessionWithUGI.sparkSession.version) case _ => - throw new KyuubiSQLException("Unrecognized GetInfoType value: " + getInfoType.toString) + throw new KyuubiSQLException("Unrecognized GetInfoType value " + getInfoType.toString) } } finally { release(true) @@ -199,7 +199,7 @@ private[kyuubi] class KyuubiSession( FileSystem.closeAllForUGI(sessionUGI) } catch { case ioe: IOException => - throw new KyuubiSQLException("Could not clean up file-system handles for UGI: " + throw new KyuubiSQLException("Could not clean up file-system handles for UGI " + sessionUGI, ioe) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index ac899c93f09..e37f32987a1 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -247,7 +247,7 @@ private[kyuubi] class SessionManager private( def getSession(sessionHandle: SessionHandle): KyuubiSession = { val session = handleToSession.get(sessionHandle) if (session == null) { - throw new KyuubiSQLException("Invalid SessionHandle: " + sessionHandle) + throw new KyuubiSQLException("Invalid SessionHandle " + sessionHandle) } session } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 57f174d9a4f..8dfe8c8ae07 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -41,13 +41,13 @@ class SparkSessionWithUGI( user: UserGroupInformation, conf: SparkConf, cache: SparkSessionCacheManager) extends Logging { - private[this] var _sparkSession: SparkSession = _ - private[this] val userName: String = user.getShortUserName - private[this] val promisedSparkContext = Promise[SparkContext]() - private[this] var initialDatabase: Option[String] = None - private[this] var sparkException: Option[Throwable] = None + private var _sparkSession: SparkSession = _ + private val userName: String = user.getShortUserName + private val promisedSparkContext = Promise[SparkContext]() + private var initialDatabase: Option[String] = None + private var sparkException: Option[Throwable] = None - private[this] def newContext(): Thread = { + private def newContext(): Thread = { new Thread(s"Start-SparkContext-$userName") { override def run(): Unit = { try { @@ -64,7 +64,7 @@ class SparkSessionWithUGI( /** * Invoke SparkContext.stop() if not succeed initializing it */ - private[this] def stopContext(): Unit = { + private def stopContext(): Unit = { promisedSparkContext.future.map { sc => warn(s"Error occurred during initializing SparkContext for $userName, stopping") try { @@ -82,7 +82,7 @@ class SparkSessionWithUGI( * * @param sessionConf configurations for user connection string */ - private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = { + private def configureSparkConf(sessionConf: Map[String, String]): Unit = { for ((key, value) <- sessionConf) { key match { case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value) @@ -107,7 +107,7 @@ class SparkSessionWithUGI( * * @param sessionConf configurations for user connection string */ - private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = { + private def configureSparkSession(sessionConf: Map[String, String]): Unit = { for ((key, value) <- sessionConf) { key match { case HIVE_VAR_PREFIX(k) => @@ -122,7 +122,7 @@ class SparkSessionWithUGI( } } - private[this] def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized { + private def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized { val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15) var checkRound = totalRounds val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL) @@ -148,7 +148,7 @@ class SparkSessionWithUGI( } } - private[this] def create(sessionConf: Map[String, String]): Unit = { + private def create(sessionConf: Map[String, String]): Unit = { info(s"--------- Create new SparkSession for $userName ----------") // kyuubi|user name|canonical host name| port val appName = Seq( @@ -218,7 +218,7 @@ class SparkSessionWithUGI( } object SparkSessionWithUGI { - private[this] val userSparkContextBeingConstruct = new MHSet[String]() + private val userSparkContextBeingConstruct = new MHSet[String]() def setPartiallyConstructed(user: String): Unit = { userSparkContextBeingConstruct.add(user) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala index 10e0785fded..d361f318f61 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala @@ -102,7 +102,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga assert(op === op2) val operationHandle = mock[OperationHandle] val e = intercept[KyuubiSQLException](operationMgr.getOperation(operationHandle)) - e.getMessage should startWith("Invalid OperationHandle:") + e.getMessage should startWith("Invalid OperationHandle") val e2 = intercept[KyuubiSQLException](operationMgr.closeOperation(operationHandle)) e2.getMessage should be("Operation does not exist!") diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala index 6911507deb4..6c501e38c23 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala @@ -91,7 +91,7 @@ class KyuubiSessionSuite extends SparkFunSuite { assert( session.getInfo(GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === spark.version) val e = intercept[KyuubiSQLException](session.getInfo(new GetInfoType {})) - assert(e.getMessage.startsWith("Unrecognized GetInfoType value:")) + assert(e.getMessage.startsWith("Unrecognized GetInfoType value")) } test("get last access time") {