Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Feb 28, 2022
1 parent aed6636 commit efd1270
Showing 1 changed file with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.hadoop.conf.Configuration
import org.apache.hive.service.rpc.thrift.{TCancelDelegationTokenReq, TCancelDelegationTokenResp, TCancelOperationReq, TCancelOperationResp, TCLIService, TCloseOperationReq, TCloseOperationResp, TCloseSessionReq, TCloseSessionResp, TExecuteStatementReq, TExecuteStatementResp, TFetchResultsReq, TFetchResultsResp, TGetCatalogsReq, TGetCatalogsResp, TGetColumnsReq, TGetColumnsResp, TGetCrossReferenceReq, TGetCrossReferenceResp, TGetDelegationTokenReq, TGetDelegationTokenResp, TGetFunctionsReq, TGetFunctionsResp, TGetInfoReq, TGetInfoResp, TGetInfoValue, TGetOperationStatusReq, TGetOperationStatusResp, TGetPrimaryKeysReq, TGetPrimaryKeysResp, TGetResultSetMetadataReq, TGetResultSetMetadataResp, TGetSchemasReq, TGetSchemasResp, TGetTablesReq, TGetTablesResp, TGetTableTypesReq, TGetTableTypesResp, TGetTypeInfoReq, TGetTypeInfoResp, TOpenSessionReq, TOpenSessionResp, TProtocolVersion, TRenewDelegationTokenReq, TRenewDelegationTokenResp, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.server.{ServerContext, TServerEventHandler}
import org.apache.thrift.transport.TTransport
Expand Down Expand Up @@ -56,24 +56,55 @@ abstract class TFrontendService(name: String)
protected lazy val authFactory: KyuubiAuthenticationFactory =
new KyuubiAuthenticationFactory(conf, isServer())

/**
* Start the service itself(FE) and its composited (Discovery service, DS) in the order of:
* Start FE ->
* if (success) -> Continue starting DS
* if (success) -> finish
* else -> Stop DS -> Raise Error -> Stop FE -> Raise Error
* else
* Raise Error -> Stop FE -> Raise Error
* This makes sure that the FE has started and ready to serve before exposing through DS.
*/
override def start(): Unit = synchronized {
super.start()
if (!started.getAndSet(true)) {
serverThread.start()
try {
if (started.compareAndSet(false, true)) {
serverThread.start()
}
super.start()
} catch {
case e: Throwable =>
stopInternal()
throw e
}
}

protected def stopServer(): Unit

override def stop(): Unit = synchronized {
super.stop()
if (started.getAndSet(false)) {
/**
* Inner stop progress that will not stop all services composited with this.
*/
private def stopInternal(): Unit = {
if (started.compareAndSet(true, false)) {
serverThread.interrupt()
stopServer()
info(getName + " has stopped")
}
}

/**
* Stop the service itself(FE) and its composited (Discovery service, DS) in the order of:
* Stop DS -> Stop FE
* This makes sure of
* 1. The service stop serving before terminating during stopping
* 2. For engines with group share level, the DS stopping is invoked by a pool in FE,
* so we need to stop DS first in case of interrupting.
*/
override def stop(): Unit = synchronized {
super.stop()
stopInternal()
}

override def connectionUrl: String = {
checkInitialized()
val host = serverHost match {
Expand Down

0 comments on commit efd1270

Please sign in to comment.