Skip to content

Commit

Permalink
override start method
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jun 16, 2020
1 parent 9c6ac83 commit 5e79ad9
Showing 1 changed file with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.hive.service.{AbstractService, CompositeService, Service, ServiceException}
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
Expand Down Expand Up @@ -94,6 +95,12 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
initCompositeService(hiveConf)
}

/**
* the super class [[CLIService#start]] starts a useless dummy metastore client, skip it and call
* the ancestor [[CompositeService#start]] directly.
*/
override def start(): Unit = startCompositeService()

override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
Expand All @@ -105,6 +112,19 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
}

private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>

private val logInfo = (msg: String) => if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(msg)
} else {
getAncestorField[Log](this, 3, "LOG").info(msg)
}

private val logError = (msg: String, e: Throwable) => if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").error(msg, e)
} else {
getAncestorField[Log](this, 3, "LOG").error(msg, e)
}

def initCompositeService(hiveConf: HiveConf): Unit = {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
Expand All @@ -114,10 +134,28 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(s"Service: $getName is inited.")
} else {
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
logInfo(s"Service: $getName is inited.")
}

def startCompositeService(): Unit = {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
var serviceStartCount = 0
try {
serviceList.asScala.foreach { service =>
service.start()
serviceStartCount += 1
}
} catch {
case NonFatal(e) =>
logError(s"Error starting services $getName", e)
invoke(classOf[CompositeService], this, "stop",
classOf[Int] -> new Integer(serviceStartCount))
}

// Emulating `AbstractService.start`
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.INITED)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.STARTED)
logInfo(s"Service: $getName is started.")
}
}

0 comments on commit 5e79ad9

Please sign in to comment.