Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI-118]fix npe bug with incremental result collection #119

Merged
merged 2 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
<exclude>**/org/apache/spark/launcher/*.class</exclude>
<exclude>**/org/apache/spark/deploy/*.class</exclude>
<exclude>**/org/apache/spark/SparkEnv.class</exclude>
<exclude>**/org/apache/spark/SparkEnv$.class</exclude>
<exclude>**/org/apache/spark/sql/hive/client/*.class</exclude>
</excludes>
</configuration>
Expand Down
4 changes: 2 additions & 2 deletions kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object SparkEnv extends Logging {
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"

private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
private def user = UserGroupInformation.getCurrentUser.getShortUserName

def set(e: SparkEnv) {
if (e == null) {
Expand All @@ -162,7 +162,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
env.get(user)
env.getOrDefault(user, env.values().iterator().next())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class KyuubiSQLException(reason: String, sqlState: String, vendorCode: Int, caus

def this(reason: String) = this(reason, sqlState = null)

def this(cause: Throwable) = this(reason = null, cause)
def this(cause: Throwable) = this(cause.toString, cause)

/**
* Converts current object to a [[TStatus]] object
Expand Down Expand Up @@ -64,26 +64,38 @@ object KyuubiSQLException {
tStatus
}

def toString(
cause: Throwable,
parent: Array[StackTraceElement] = Array.empty): List[String] = {

def toString(cause: Throwable): List[String] = {
toString(cause, null)
}

def toString(cause: Throwable, parent: Array[StackTraceElement]): List[String] = {
val trace = cause.getStackTrace
enroll(cause, trace.diff(parent)) ++
var m = trace.length - 1
if (parent != null) {
var n = parent.length - 1
while (m >= 0 && n >=0 && trace(m).equals(parent(n))) {
m = m - 1
n = n - 1
}
}

enroll(cause, trace, m) ++
Option(cause.getCause).map(toString(_, trace)).getOrElse(Nil)
}

private[this] def enroll(
ex: Throwable,
trace: Array[StackTraceElement]): List[String] = {
private[this] def enroll(ex: Throwable,
trace: Array[StackTraceElement], max: Int): List[String] = {
val builder = new StringBuilder
builder.append('*').append(ex.getClass.getName).append(':')
Option(ex.getMessage).map(_.stripSuffix(";")).foreach(msg => builder.append(msg).append(":"))
builder.append(trace.length).append(':').append(trace.length - 1)
List(builder.toString) ++ trace.map { t =>
builder.append(ex.getMessage).append(':')
builder.append(trace.length).append(':').append(max)
List(builder.toString) ++ (0 to max).map { i =>
builder.setLength(0)
builder.append(t.getClassName).append(":").append(t.getMethodName).append(":")
Option(t.getFileName).foreach(builder.append)
builder.append(":").append(t.getLineNumber)
builder.append(trace(i).getClassName).append(":")
builder.append(trace(i).getMethodName).append(":")
builder.append(Option(trace(i).getFileName).getOrElse("")).append(':')
builder.append(trace(i).getLineNumber)
builder.toString
}.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
result.schema
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(FINISHED)
setHasResultSet(true)
val taken = if (order == FetchOrientation.FETCH_FIRST) {
result.toLocalIterator().asScala.take(maxRowsL.toInt)
result.toLocalIterator().asScala.take(rowSetSize.toInt)
} else {
iter.take(maxRowsL.toInt)
iter.take(rowSetSize.toInt)
}
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[kyuubi] class OperationManager private(name: String)
def getOperation(operationHandle: OperationHandle): KyuubiOperation = {
val operation = getOperationInternal(operationHandle)
if (operation == null) {
throw new KyuubiSQLException("Invalid OperationHandle: " + operationHandle)
throw new KyuubiSQLException("Invalid OperationHandle " + operationHandle)
}
operation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ private[kyuubi] class KyuubiSession(
sessionManager: SessionManager,
operationManager: OperationManager) extends Logging {

private[this] val sessionHandle: SessionHandle = new SessionHandle(protocol)
private[this] val opHandleSet = new MHSet[OperationHandle]
private[this] var _isOperationLogEnabled = false
private[this] var sessionLogDir: File = _
@volatile private[this] var lastAccessTime: Long = System.currentTimeMillis()
private[this] var lastIdleTime = 0L

private[this] val sessionUGI: UserGroupInformation = {
private val sessionHandle: SessionHandle = new SessionHandle(protocol)
private val opHandleSet = new MHSet[OperationHandle]
private var _isOperationLogEnabled = false
private var sessionLogDir: File = _
@volatile private var lastAccessTime: Long = System.currentTimeMillis()
private var lastIdleTime = 0L

private val sessionUGI: UserGroupInformation = {
val currentUser = UserGroupInformation.getCurrentUser
if (withImpersonation) {
if (UserGroupInformation.isSecurityEnabled) {
Expand All @@ -82,16 +82,16 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] val sparkSessionWithUGI =
private val sparkSessionWithUGI =
new SparkSessionWithUGI(sessionUGI, conf, sessionManager.getCacheMgr)

private[this] def acquire(userAccess: Boolean): Unit = {
private def acquire(userAccess: Boolean): Unit = {
if (userAccess) {
lastAccessTime = System.currentTimeMillis
}
}

private[this] def release(userAccess: Boolean): Unit = {
private def release(userAccess: Boolean): Unit = {
if (userAccess) {
lastAccessTime = System.currentTimeMillis
}
Expand All @@ -103,7 +103,7 @@ private[kyuubi] class KyuubiSession(
}

@throws[KyuubiSQLException]
private[this] def executeStatementInternal(statement: String) = {
private def executeStatementInternal(statement: String): OperationHandle = {
acquire(true)
val operation =
operationManager.newExecuteStatementOperation(this, statement)
Expand All @@ -121,7 +121,7 @@ private[kyuubi] class KyuubiSession(
}
}

private[this] def cleanupSessionLogDir(): Unit = {
private def cleanupSessionLogDir(): Unit = {
if (_isOperationLogEnabled) {
try {
FileUtils.forceDelete(sessionLogDir)
Expand Down Expand Up @@ -152,7 +152,7 @@ private[kyuubi] class KyuubiSession(
case GetInfoType.DBMS_VERSION =>
new GetInfoValue(this.sparkSessionWithUGI.sparkSession.version)
case _ =>
throw new KyuubiSQLException("Unrecognized GetInfoType value: " + getInfoType.toString)
throw new KyuubiSQLException("Unrecognized GetInfoType value " + getInfoType.toString)
}
} finally {
release(true)
Expand Down Expand Up @@ -199,7 +199,7 @@ private[kyuubi] class KyuubiSession(
FileSystem.closeAllForUGI(sessionUGI)
} catch {
case ioe: IOException =>
throw new KyuubiSQLException("Could not clean up file-system handles for UGI: "
throw new KyuubiSQLException("Could not clean up file-system handles for UGI "
+ sessionUGI, ioe)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private[kyuubi] class SessionManager private(
def getSession(sessionHandle: SessionHandle): KyuubiSession = {
val session = handleToSession.get(sessionHandle)
if (session == null) {
throw new KyuubiSQLException("Invalid SessionHandle: " + sessionHandle)
throw new KyuubiSQLException("Invalid SessionHandle " + sessionHandle)
}
session
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class SparkSessionWithUGI(
user: UserGroupInformation,
conf: SparkConf,
cache: SparkSessionCacheManager) extends Logging {
private[this] var _sparkSession: SparkSession = _
private[this] val userName: String = user.getShortUserName
private[this] val promisedSparkContext = Promise[SparkContext]()
private[this] var initialDatabase: Option[String] = None
private[this] var sparkException: Option[Throwable] = None
private var _sparkSession: SparkSession = _
private val userName: String = user.getShortUserName
private val promisedSparkContext = Promise[SparkContext]()
private var initialDatabase: Option[String] = None
private var sparkException: Option[Throwable] = None

private[this] def newContext(): Thread = {
private def newContext(): Thread = {
new Thread(s"Start-SparkContext-$userName") {
override def run(): Unit = {
try {
Expand All @@ -64,7 +64,7 @@ class SparkSessionWithUGI(
/**
* Invoke SparkContext.stop() if not succeed initializing it
*/
private[this] def stopContext(): Unit = {
private def stopContext(): Unit = {
promisedSparkContext.future.map { sc =>
warn(s"Error occurred during initializing SparkContext for $userName, stopping")
try {
Expand All @@ -82,7 +82,7 @@ class SparkSessionWithUGI(
*
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = {
private def configureSparkConf(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value)
Expand All @@ -107,7 +107,7 @@ class SparkSessionWithUGI(
*
* @param sessionConf configurations for user connection string
*/
private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = {
private def configureSparkSession(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(k) =>
Expand All @@ -122,7 +122,7 @@ class SparkSessionWithUGI(
}
}

private[this] def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
private def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15)
var checkRound = totalRounds
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL)
Expand All @@ -148,7 +148,7 @@ class SparkSessionWithUGI(
}
}

private[this] def create(sessionConf: Map[String, String]): Unit = {
private def create(sessionConf: Map[String, String]): Unit = {
info(s"--------- Create new SparkSession for $userName ----------")
// kyuubi|user name|canonical host name| port
val appName = Seq(
Expand Down Expand Up @@ -218,7 +218,7 @@ class SparkSessionWithUGI(
}

object SparkSessionWithUGI {
private[this] val userSparkContextBeingConstruct = new MHSet[String]()
private val userSparkContextBeingConstruct = new MHSet[String]()

def setPartiallyConstructed(user: String): Unit = {
userSparkContextBeingConstruct.add(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
assert(op === op2)
val operationHandle = mock[OperationHandle]
val e = intercept[KyuubiSQLException](operationMgr.getOperation(operationHandle))
e.getMessage should startWith("Invalid OperationHandle:")
e.getMessage should startWith("Invalid OperationHandle")

val e2 = intercept[KyuubiSQLException](operationMgr.closeOperation(operationHandle))
e2.getMessage should be("Operation does not exist!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KyuubiSessionSuite extends SparkFunSuite {
assert(
session.getInfo(GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === spark.version)
val e = intercept[KyuubiSQLException](session.getInfo(new GetInfoType {}))
assert(e.getMessage.startsWith("Unrecognized GetInfoType value:"))
assert(e.getMessage.startsWith("Unrecognized GetInfoType value"))
}

test("get last access time") {
Expand Down