Skip to content

Commit

Permalink
Merge pull request #119 from yaooqinn/KYUUBI-118
Browse files Browse the repository at this point in the history
[KYUUBI-118]fix npe bug with incremental result collection
  • Loading branch information
yaooqinn committed Nov 7, 2018
2 parents 34af4d0 + 5a957ef commit d1d34e5
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 50 deletions.
1 change: 1 addition & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
<exclude>**/org/apache/spark/launcher/*.class</exclude>
<exclude>**/org/apache/spark/deploy/*.class</exclude>
<exclude>**/org/apache/spark/SparkEnv.class</exclude>
<exclude>**/org/apache/spark/SparkEnv$.class</exclude>
<exclude>**/org/apache/spark/sql/hive/client/*.class</exclude>
</excludes>
</configuration>
Expand Down
4 changes: 2 additions & 2 deletions kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object SparkEnv extends Logging {
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"

private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
private def user = UserGroupInformation.getCurrentUser.getShortUserName

def set(e: SparkEnv) {
if (e == null) {
Expand All @@ -162,7 +162,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
env.get(user)
env.getOrDefault(user, env.values().iterator().next())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class KyuubiSQLException(reason: String, sqlState: String, vendorCode: Int, caus

def this(reason: String) = this(reason, sqlState = null)

def this(cause: Throwable) = this(reason = null, cause)
def this(cause: Throwable) = this(cause.toString, cause)

/**
* Converts current object to a [[TStatus]] object
Expand Down Expand Up @@ -64,26 +64,38 @@ object KyuubiSQLException {
tStatus
}

def toString(
cause: Throwable,
parent: Array[StackTraceElement] = Array.empty): List[String] = {

def toString(cause: Throwable): List[String] = {
toString(cause, null)
}

def toString(cause: Throwable, parent: Array[StackTraceElement]): List[String] = {
val trace = cause.getStackTrace
enroll(cause, trace.diff(parent)) ++
var m = trace.length - 1
if (parent != null) {
var n = parent.length - 1
while (m >= 0 && n >=0 && trace(m).equals(parent(n))) {
m = m - 1
n = n - 1
}
}

enroll(cause, trace, m) ++
Option(cause.getCause).map(toString(_, trace)).getOrElse(Nil)
}

private[this] def enroll(
ex: Throwable,
trace: Array[StackTraceElement]): List[String] = {
private[this] def enroll(ex: Throwable,
trace: Array[StackTraceElement], max: Int): List[String] = {
val builder = new StringBuilder
builder.append('*').append(ex.getClass.getName).append(':')
Option(ex.getMessage).map(_.stripSuffix(";")).foreach(msg => builder.append(msg).append(":"))
builder.append(trace.length).append(':').append(trace.length - 1)
List(builder.toString) ++ trace.map { t =>
builder.append(ex.getMessage).append(':')
builder.append(trace.length).append(':').append(max)
List(builder.toString) ++ (0 to max).map { i =>
builder.setLength(0)
builder.append(t.getClassName).append(":").append(t.getMethodName).append(":")
Option(t.getFileName).foreach(builder.append)
builder.append(":").append(t.getLineNumber)
builder.append(trace(i).getClassName).append(":")
builder.append(trace(i).getMethodName).append(":")
builder.append(Option(trace(i).getFileName).getOrElse("")).append(':')
builder.append(trace(i).getLineNumber)
builder.toString
}.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
result.schema
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(FINISHED)
setHasResultSet(true)
val taken = if (order == FetchOrientation.FETCH_FIRST) {
result.toLocalIterator().asScala.take(maxRowsL.toInt)
result.toLocalIterator().asScala.take(rowSetSize.toInt)
} else {
iter.take(maxRowsL.toInt)
iter.take(rowSetSize.toInt)
}
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[kyuubi] class OperationManager private(name: String)
def getOperation(operationHandle: OperationHandle): KyuubiOperation = {
val operation = getOperationInternal(operationHandle)
if (operation == null) {
throw new KyuubiSQLException("Invalid OperationHandle: " + operationHandle)
throw new KyuubiSQLException("Invalid OperationHandle " + operationHandle)
}
operation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ private[kyuubi] class KyuubiSession(
sessionManager: SessionManager,
operationManager: OperationManager) extends Logging {

private[this] val sessionHandle: SessionHandle = new SessionHandle(protocol)
private[this] val opHandleSet = new MHSet[OperationHandle]
private[this] var _isOperationLogEnabled = false
private[this] var sessionLogDir: File = _
@volatile private[this] var lastAccessTime: Long = System.currentTimeMillis()
private[this] var lastIdleTime = 0L

private[this] val sessionUGI: UserGroupInformation = {
private val sessionHandle: SessionHandle = new SessionHandle(protocol)
private val opHandleSet = new MHSet[OperationHandle]
private var _isOperationLogEnabled = false
private var sessionLogDir: File = _
@volatile private var lastAccessTime: Long = System.currentTimeMillis()
private var lastIdleTime = 0L

private val sessionUGI: UserGroupInformation = {
val currentUser = UserGroupInformation.getCurrentUser
if (withImpersonation) {
if (UserGroupInformation.isSecurityEnabled) {
Expand All @@ -82,16 +82,16 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] val sparkSessionWithUGI =
private val sparkSessionWithUGI =
new SparkSessionWithUGI(sessionUGI, conf, sessionManager.getCacheMgr)

private[this] def acquire(userAccess: Boolean): Unit = {
private def acquire(userAccess: Boolean): Unit = {
if (userAccess) {
lastAccessTime = System.currentTimeMillis
}
}

private[this] def release(userAccess: Boolean): Unit = {
private def release(userAccess: Boolean): Unit = {
if (userAccess) {
lastAccessTime = System.currentTimeMillis
}
Expand All @@ -103,7 +103,7 @@ private[kyuubi] class KyuubiSession(
}

@throws[KyuubiSQLException]
private[this] def executeStatementInternal(statement: String) = {
private def executeStatementInternal(statement: String): OperationHandle = {
acquire(true)
val operation =
operationManager.newExecuteStatementOperation(this, statement)
Expand All @@ -121,7 +121,7 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] def cleanupSessionLogDir(): Unit = {
private def cleanupSessionLogDir(): Unit = {
if (_isOperationLogEnabled) {
try {
FileUtils.forceDelete(sessionLogDir)
Expand Down Expand Up @@ -152,7 +152,7 @@ private[kyuubi] class KyuubiSession(
case GetInfoType.DBMS_VERSION =>
new GetInfoValue(this.sparkSessionWithUGI.sparkSession.version)
case _ =>
throw new KyuubiSQLException("Unrecognized GetInfoType value: " + getInfoType.toString)
throw new KyuubiSQLException("Unrecognized GetInfoType value " + getInfoType.toString)
}
} finally {
release(true)
Expand Down Expand Up @@ -199,7 +199,7 @@ private[kyuubi] class KyuubiSession(
FileSystem.closeAllForUGI(sessionUGI)
} catch {
case ioe: IOException =>
throw new KyuubiSQLException("Could not clean up file-system handles for UGI: "
throw new KyuubiSQLException("Could not clean up file-system handles for UGI "
+ sessionUGI, ioe)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private[kyuubi] class SessionManager private(
def getSession(sessionHandle: SessionHandle): KyuubiSession = {
val session = handleToSession.get(sessionHandle)
if (session == null) {
throw new KyuubiSQLException("Invalid SessionHandle: " + sessionHandle)
throw new KyuubiSQLException("Invalid SessionHandle " + sessionHandle)
}
session
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class SparkSessionWithUGI(
user: UserGroupInformation,
conf: SparkConf,
cache: SparkSessionCacheManager) extends Logging {
private[this] var _sparkSession: SparkSession = _
private[this] val userName: String = user.getShortUserName
private[this] val promisedSparkContext = Promise[SparkContext]()
private[this] var initialDatabase: Option[String] = None
private[this] var sparkException: Option[Throwable] = None
private var _sparkSession: SparkSession = _
private val userName: String = user.getShortUserName
private val promisedSparkContext = Promise[SparkContext]()
private var initialDatabase: Option[String] = None
private var sparkException: Option[Throwable] = None

private[this] def newContext(): Thread = {
private def newContext(): Thread = {
new Thread(s"Start-SparkContext-$userName") {
override def run(): Unit = {
try {
Expand All @@ -64,7 +64,7 @@ class SparkSessionWithUGI(
/**
* Invoke SparkContext.stop() if not succeed initializing it
*/
private[this] def stopContext(): Unit = {
private def stopContext(): Unit = {
promisedSparkContext.future.map { sc =>
warn(s"Error occurred during initializing SparkContext for $userName, stopping")
try {
Expand All @@ -82,7 +82,7 @@ class SparkSessionWithUGI(
*
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = {
private def configureSparkConf(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value)
Expand All @@ -107,7 +107,7 @@ class SparkSessionWithUGI(
*
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = {
private def configureSparkSession(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(k) =>
Expand All @@ -122,7 +122,7 @@ class SparkSessionWithUGI(
}
}

private[this] def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
private def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15)
var checkRound = totalRounds
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL)
Expand All @@ -148,7 +148,7 @@ class SparkSessionWithUGI(
}
}

private[this] def create(sessionConf: Map[String, String]): Unit = {
private def create(sessionConf: Map[String, String]): Unit = {
info(s"--------- Create new SparkSession for $userName ----------")
// kyuubi|user name|canonical host name| port
val appName = Seq(
Expand Down Expand Up @@ -218,7 +218,7 @@ class SparkSessionWithUGI(
}

object SparkSessionWithUGI {
private[this] val userSparkContextBeingConstruct = new MHSet[String]()
private val userSparkContextBeingConstruct = new MHSet[String]()

def setPartiallyConstructed(user: String): Unit = {
userSparkContextBeingConstruct.add(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
assert(op === op2)
val operationHandle = mock[OperationHandle]
val e = intercept[KyuubiSQLException](operationMgr.getOperation(operationHandle))
e.getMessage should startWith("Invalid OperationHandle:")
e.getMessage should startWith("Invalid OperationHandle")

val e2 = intercept[KyuubiSQLException](operationMgr.closeOperation(operationHandle))
e2.getMessage should be("Operation does not exist!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KyuubiSessionSuite extends SparkFunSuite {
assert(
session.getInfo(GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === spark.version)
val e = intercept[KyuubiSQLException](session.getInfo(new GetInfoType {}))
assert(e.getMessage.startsWith("Unrecognized GetInfoType value:"))
assert(e.getMessage.startsWith("Unrecognized GetInfoType value"))
}

test("get last access time") {
Expand Down

0 comments on commit d1d34e5

Please sign in to comment.