From 67d1c5dd513327b350dac3d6855af5223a594087 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 1 Mar 2022 10:39:41 +0800 Subject: [PATCH] [KYUUBI #1969] Fix race on some service during start and stop phase ### _Why are the changes needed?_ #1969 might be fixed by this. ``` 12:18:19.885 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkSQLSessionManager: Session stopped due to shared level is Connection. 12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping. 12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkTBinaryFrontendService: SparkTBinaryFrontend has stopped 12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO SparkTBinaryFrontendService: Service: [EngineServiceDiscovery] is stopping. 12:18:19.890 SparkTBinaryFrontendHandler-Pool: Thread-1260 DEBUG FailedDeleteManager: Path being added to guaranteed delete set: /kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000 12:18:19.892 ProcessThread(sid:0 cport:40861): INFO PrepRequestProcessor: Got user-level KeeperException when processing sessionid:0x100000e6ac00000 type:delete cxid:0xb zxid:0x9 txntype:-1 reqpath:n/a Error Path:/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000 Error:KeeperErrorCode = NoNode for /kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000 12:18:19.892 SyncThread:0 DEBUG FinalRequestProcessor: Processing request:: sessionid:0x100000e6ac00000 type:delete cxid:0xa zxid:0x8 txntype:2 reqpath:n/a 12:18:19.893 SparkTBinaryFrontendHandler-Pool: Thread-1260 ERROR ServiceDiscoveryClient: Failed to close the persistent ephemeral znodenull java.io.IOException: java.lang.InterruptedException at org.apache.curator.framework.recipes.nodes.PersistentNode.close(PersistentNode.java:296) ~[curator-recipes-2.12.0.jar:?] at org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.deregisterService(ServiceDiscoveryClient.scala:117) ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT] at org.apache.kyuubi.ha.client.EngineServiceDiscovery.stop(EngineServiceDiscovery.scala:35) ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT] at org.apache.kyuubi.service.CompositeService.$anonfun$stop$2(CompositeService.scala:75) ~[kyuubi-common_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT] ``` the deregister for connection-level happens in session close and SparkTBinaryFrontendHandler pool, which might be stopped before finish executing zk node deletion. this PR contains some other race issues too ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1980 from yaooqinn/conc. Closes #1969 efd12707 [Kent Yao] address comments aed6636f [Kent Yao] Fix race on some service during stop phase 63592621 [Kent Yao] Fix race on some service during stop phase Authored-by: Kent Yao Signed-off-by: ulysses-you --- .../kyuubi/engine/flink/FlinkSQLEngine.scala | 5 --- .../events/AbstractEventLoggingService.scala | 2 +- .../kyuubi/service/TFrontendService.scala | 43 ++++++++++++++++--- .../apache/kyuubi/metrics/MetricsSystem.scala | 2 +- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index a08b461434f..a242df9bb7e 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -58,11 +58,6 @@ case class FlinkSQLEngine(engineContext: DefaultContext) extends Serverable("Fli currentEngine.get.stop() } } - - override def stop(): Unit = { - super.stop() - } - } object FlinkSQLEngine extends Logging { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala index 248d3c7acc2..51de9c83c38 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala @@ -51,7 +51,7 @@ abstract class AbstractEventLoggingService object EventLogging { - private[events] var _service: Option[AbstractEventLoggingService] = None + @volatile private[events] var _service: Option[AbstractEventLoggingService] = None def onEvent(event: KyuubiEvent): Unit = { _service.foreach(_.onEvent(event)) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala index 957ff2b9671..464901d13a9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration -import org.apache.hive.service.rpc.thrift.{TCancelDelegationTokenReq, TCancelDelegationTokenResp, TCancelOperationReq, TCancelOperationResp, TCLIService, TCloseOperationReq, TCloseOperationResp, TCloseSessionReq, TCloseSessionResp, TExecuteStatementReq, TExecuteStatementResp, TFetchResultsReq, TFetchResultsResp, TGetCatalogsReq, TGetCatalogsResp, TGetColumnsReq, TGetColumnsResp, TGetCrossReferenceReq, TGetCrossReferenceResp, TGetDelegationTokenReq, TGetDelegationTokenResp, TGetFunctionsReq, TGetFunctionsResp, TGetInfoReq, TGetInfoResp, TGetInfoValue, TGetOperationStatusReq, TGetOperationStatusResp, TGetPrimaryKeysReq, TGetPrimaryKeysResp, TGetResultSetMetadataReq, TGetResultSetMetadataResp, TGetSchemasReq, TGetSchemasResp, TGetTablesReq, TGetTablesResp, TGetTableTypesReq, TGetTableTypesResp, TGetTypeInfoReq, TGetTypeInfoResp, TOpenSessionReq, TOpenSessionResp, TProtocolVersion, TRenewDelegationTokenReq, TRenewDelegationTokenResp, TStatus, TStatusCode} +import org.apache.hive.service.rpc.thrift._ import org.apache.thrift.protocol.TProtocol import org.apache.thrift.server.{ServerContext, TServerEventHandler} import org.apache.thrift.transport.TTransport @@ -56,22 +56,53 @@ abstract class TFrontendService(name: String) protected lazy val authFactory: KyuubiAuthenticationFactory = new KyuubiAuthenticationFactory(conf, isServer()) + /** + * Start the service itself(FE) and its composited (Discovery service, DS) in the order of: + * Start FE -> + * if (success) -> Continue starting DS + * if (success) -> finish + * else -> Stop DS -> Raise Error -> Stop FE -> Raise Error + * else + * Raise Error -> Stop FE -> Raise Error + * This makes sure that the FE has started and ready to serve before exposing through DS. + */ override def start(): Unit = synchronized { - super.start() - if (!started.getAndSet(true)) { - serverThread.start() + try { + if (started.compareAndSet(false, true)) { + serverThread.start() + } + super.start() + } catch { + case e: Throwable => + stopInternal() + throw e } } protected def stopServer(): Unit - override def stop(): Unit = synchronized { - if (started.getAndSet(false)) { + /** + * Inner stop progress that will not stop all services composited with this. + */ + private def stopInternal(): Unit = { + if (started.compareAndSet(true, false)) { serverThread.interrupt() stopServer() info(getName + " has stopped") } + } + + /** + * Stop the service itself(FE) and its composited (Discovery service, DS) in the order of: + * Stop DS -> Stop FE + * This makes sure of + * 1. The service stop serving before terminating during stopping + * 2. For engines with group share level, the DS stopping is invoked by a pool in FE, + * so we need to stop DS first in case of interrupting. + */ + override def stop(): Unit = synchronized { super.stop() + stopInternal() } override def connectionUrl: String = { diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala index 3622ff8d79a..9533a633d29 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala @@ -98,7 +98,7 @@ class MetricsSystem extends CompositeService("MetricsSystem") { object MetricsSystem { - private var maybeSystem: Option[MetricsSystem] = None + @volatile private var maybeSystem: Option[MetricsSystem] = None def tracing[T](func: MetricsSystem => T): Unit = { maybeSystem.foreach(func(_))