Skip to content

Commit

Permalink
[KYUUBI #1969] Fix race on some service during start and stop phase
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

#1969 might be fixed by this.

```
12:18:19.885 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkSQLSessionManager: Session stopped due to shared level is Connection.
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping.
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkTBinaryFrontendService: SparkTBinaryFrontend has stopped
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkTBinaryFrontendService: Service: [EngineServiceDiscovery] is stopping.
12:18:19.890 SparkTBinaryFrontendHandler-Pool: Thread-1260 DEBUG FailedDeleteManager: Path being added to guaranteed delete set: /kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
12:18:19.892 ProcessThread(sid:0 cport:40861): INFO PrepRequestProcessor: Got user-level KeeperException when processing sessionid:0x100000e6ac00000 type:delete cxid:0xb zxid:0x9 txntype:-1 reqpath:n/a Error Path:/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000 Error:KeeperErrorCode = NoNode for /kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
12:18:19.892 SyncThread:0 DEBUG FinalRequestProcessor: Processing request:: sessionid:0x100000e6ac00000 type:delete cxid:0xa zxid:0x8 txntype:2 reqpath:n/a
12:18:19.893 SparkTBinaryFrontendHandler-Pool: Thread-1260 ERROR ServiceDiscoveryClient: Failed to close the persistent ephemeral znodenull
java.io.IOException: java.lang.InterruptedException
	at org.apache.curator.framework.recipes.nodes.PersistentNode.close(PersistentNode.java:296) ~[curator-recipes-2.12.0.jar:?]
	at org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.deregisterService(ServiceDiscoveryClient.scala:117) ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
	at org.apache.kyuubi.ha.client.EngineServiceDiscovery.stop(EngineServiceDiscovery.scala:35) ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
	at org.apache.kyuubi.service.CompositeService.$anonfun$stop$2(CompositeService.scala:75) ~[kyuubi-common_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
```

the deregister for connection-level happens in session close and SparkTBinaryFrontendHandler pool, which might be stopped before finish executing zk node deletion.

this PR contains some other race issues too

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1980 from yaooqinn/conc.

Closes #1969

efd1270 [Kent Yao] address comments
aed6636 [Kent Yao] Fix race on some service during stop phase
6359262 [Kent Yao] Fix race on some service during stop phase

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
yaooqinn authored and ulysses-you committed Mar 1, 2022
1 parent 7faefb8 commit 67d1c5d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ case class FlinkSQLEngine(engineContext: DefaultContext) extends Serverable("Fli
currentEngine.get.stop()
}
}

override def stop(): Unit = {
super.stop()
}

}

object FlinkSQLEngine extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class AbstractEventLoggingService

object EventLogging {

private[events] var _service: Option[AbstractEventLoggingService] = None
@volatile private[events] var _service: Option[AbstractEventLoggingService] = None

def onEvent(event: KyuubiEvent): Unit = {
_service.foreach(_.onEvent(event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.hadoop.conf.Configuration
import org.apache.hive.service.rpc.thrift.{TCancelDelegationTokenReq, TCancelDelegationTokenResp, TCancelOperationReq, TCancelOperationResp, TCLIService, TCloseOperationReq, TCloseOperationResp, TCloseSessionReq, TCloseSessionResp, TExecuteStatementReq, TExecuteStatementResp, TFetchResultsReq, TFetchResultsResp, TGetCatalogsReq, TGetCatalogsResp, TGetColumnsReq, TGetColumnsResp, TGetCrossReferenceReq, TGetCrossReferenceResp, TGetDelegationTokenReq, TGetDelegationTokenResp, TGetFunctionsReq, TGetFunctionsResp, TGetInfoReq, TGetInfoResp, TGetInfoValue, TGetOperationStatusReq, TGetOperationStatusResp, TGetPrimaryKeysReq, TGetPrimaryKeysResp, TGetResultSetMetadataReq, TGetResultSetMetadataResp, TGetSchemasReq, TGetSchemasResp, TGetTablesReq, TGetTablesResp, TGetTableTypesReq, TGetTableTypesResp, TGetTypeInfoReq, TGetTypeInfoResp, TOpenSessionReq, TOpenSessionResp, TProtocolVersion, TRenewDelegationTokenReq, TRenewDelegationTokenResp, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.server.{ServerContext, TServerEventHandler}
import org.apache.thrift.transport.TTransport
Expand Down Expand Up @@ -56,22 +56,53 @@ abstract class TFrontendService(name: String)
protected lazy val authFactory: KyuubiAuthenticationFactory =
new KyuubiAuthenticationFactory(conf, isServer())

/**
* Start the service itself(FE) and its composited (Discovery service, DS) in the order of:
* Start FE ->
* if (success) -> Continue starting DS
* if (success) -> finish
* else -> Stop DS -> Raise Error -> Stop FE -> Raise Error
* else
* Raise Error -> Stop FE -> Raise Error
* This makes sure that the FE has started and ready to serve before exposing through DS.
*/
override def start(): Unit = synchronized {
super.start()
if (!started.getAndSet(true)) {
serverThread.start()
try {
if (started.compareAndSet(false, true)) {
serverThread.start()
}
super.start()
} catch {
case e: Throwable =>
stopInternal()
throw e
}
}

protected def stopServer(): Unit

override def stop(): Unit = synchronized {
if (started.getAndSet(false)) {
/**
* Inner stop progress that will not stop all services composited with this.
*/
private def stopInternal(): Unit = {
if (started.compareAndSet(true, false)) {
serverThread.interrupt()
stopServer()
info(getName + " has stopped")
}
}

/**
* Stop the service itself(FE) and its composited (Discovery service, DS) in the order of:
* Stop DS -> Stop FE
* This makes sure of
* 1. The service stop serving before terminating during stopping
* 2. For engines with group share level, the DS stopping is invoked by a pool in FE,
* so we need to stop DS first in case of interrupting.
*/
override def stop(): Unit = synchronized {
super.stop()
stopInternal()
}

override def connectionUrl: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class MetricsSystem extends CompositeService("MetricsSystem") {

object MetricsSystem {

private var maybeSystem: Option[MetricsSystem] = None
@volatile private var maybeSystem: Option[MetricsSystem] = None

def tracing[T](func: MetricsSystem => T): Unit = {
maybeSystem.foreach(func(_))
Expand Down

0 comments on commit 67d1c5d

Please sign in to comment.