Skip to content

Commit 3aba64d

Browse files
yaooqinnulysses-you
authored andcommitted
[KYUUBI #1785] Stop Discovery services properly
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Fix some issues: 1. EngineServiceDiscovery calls deregister twice 2. When LOST zk-server, the delay for gracefully stop shall be calculated via retry policy not retrywait * maxWaitMs - engine default using n time retry policy, we can avoid unnecessary cache for engines when the whole test file finished and server stopped - otherwise, we are very likey to hit the GA memory limits ``` KyuubiOperationPerGroupSuite: ./build/mvn: line 99: 1725 Killed ${MVN_BIN} $MAVEN_CLI_OPTS "$" Error: Process completed with exit code 137. ``` 3. When LOST, we shall skip connection retries, closing znode & closingclient ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1785 from yaooqinn/stop. Closes #1785 afdd4b4 [Kent Yao] Stop Discovery services properly 5efa493 [Kent Yao] Stop Discovery services properly 45f9312 [Kent Yao] Stop Discovery services properly d899012 [Kent Yao] Stop Discovery services properly 925ff3e [Kent Yao] Stop Discovery services properly 1f76611 [Kent Yao] Stop Discovery services properly 0d25f6a [Kent Yao] Stop Discovery services properly ff83375 [Kent Yao] Stop Discovery services properly 55d64c0 [Kent Yao] Stop Discovery services properly Authored-by: Kent Yao <yao@apache.org> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 012fc75 commit 3aba64d

File tree

9 files changed

+191
-50
lines changed

9 files changed

+191
-50
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ object KyuubiConf {
649649
.doc("The check interval for engine timeout")
650650
.version("1.0.0")
651651
.timeConf
652-
.checkValue(_ >= Duration.ofSeconds(3).toMillis, "Minimum 3 seconds")
652+
.checkValue(_ >= Duration.ofSeconds(1).toMillis, "Minimum 1 seconds")
653653
.createWithDefault(Duration.ofMinutes(1).toMillis)
654654

655655
val ENGINE_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.idle.timeout")

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,27 @@ class EngineServiceDiscovery(
3131
fe: FrontendService) extends ServiceDiscovery("EngineServiceDiscovery", fe) {
3232

3333
override def stop(): Unit = synchronized {
34-
discoveryClient.deregisterService()
35-
conf.get(ENGINE_SHARE_LEVEL) match {
36-
// For connection level, we should clean up the namespace in zk in case the disk stress.
37-
case "CONNECTION" =>
38-
try {
39-
if (discoveryClient.postDeregisterService) {
40-
info("Clean up discovery service due to this is connection share level.")
34+
if (!isServerLost.get()) {
35+
discoveryClient.deregisterService()
36+
conf.get(ENGINE_SHARE_LEVEL) match {
37+
// For connection level, we should clean up the namespace in zk in case the disk stress.
38+
case "CONNECTION" =>
39+
try {
40+
if (discoveryClient.postDeregisterService) {
41+
info("Clean up discovery service due to this is connection share level.")
42+
}
43+
} catch {
44+
case NonFatal(e) =>
45+
warn("Failed to clean up Spark engine before stop.", e)
4146
}
42-
} catch {
43-
case NonFatal(e) =>
44-
warn("Failed to clean up Spark engine before stop.", e)
45-
}
4647

47-
case _ =>
48+
case _ =>
49+
}
50+
discoveryClient.closeClient()
51+
} else {
52+
warn(s"The Zookeeper ensemble is LOST")
4853
}
4954
super.stop()
5055
}
56+
5157
}

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/KyuubiServiceDiscovery.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,15 @@ import org.apache.kyuubi.service.FrontendService
2626
* @param fe the frontend service to publish for service discovery
2727
*/
2828
class KyuubiServiceDiscovery(
29-
fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery", fe)
29+
fe: FrontendService) extends ServiceDiscovery("KyuubiServiceDiscovery", fe) {
30+
31+
override def stop(): Unit = synchronized {
32+
if (!isServerLost.get()) {
33+
discoveryClient.deregisterService()
34+
discoveryClient.closeClient()
35+
} else {
36+
warn(s"The Zookeeper ensemble is LOST")
37+
}
38+
super.stop()
39+
}
40+
}

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.kyuubi.ha.client
1919

2020
import java.nio.charset.StandardCharsets
21+
import java.util.concurrent.atomic.AtomicBoolean
2122

2223
import scala.collection.JavaConverters._
2324

@@ -42,6 +43,8 @@ abstract class ServiceDiscovery(
4243
name: String,
4344
val fe: FrontendService) extends AbstractService(name) {
4445

46+
protected val isServerLost = new AtomicBoolean(false)
47+
4548
private var _discoveryClient: ServiceDiscoveryClient = _
4649

4750
def discoveryClient: ServiceDiscoveryClient = _discoveryClient
@@ -60,20 +63,15 @@ abstract class ServiceDiscovery(
6063
super.start()
6164
}
6265

63-
override def stop(): Unit = {
64-
discoveryClient.closeClient()
65-
super.stop()
66-
}
67-
6866
// stop the server genteelly
69-
def stopGracefully(): Unit = {
70-
stop()
67+
def stopGracefully(isLost: Boolean = false): Unit = {
7168
while (fe.be != null && fe.be.sessionManager.getOpenSessionCount > 0) {
69+
debug(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
7270
Thread.sleep(1000 * 60)
7371
}
72+
isServerLost.set(isLost)
7473
fe.serverable.stop()
7574
}
76-
7775
}
7876

7977
object ServiceDiscovery extends Logging {

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.kyuubi.ha.client
2020
import java.io.{File, IOException}
2121
import javax.security.auth.login.Configuration
2222

23+
import scala.util.Random
24+
2325
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
2426
import org.apache.curator.retry._
2527
import org.apache.hadoop.security.UserGroupInformation
@@ -71,6 +73,27 @@ object ZooKeeperClientProvider extends Logging {
7173
builder.build()
7274
}
7375

76+
def getGracefulStopThreadDelay(conf: KyuubiConf): Long = {
77+
val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
78+
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
79+
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
80+
val retryPolicyName = conf.get(HA_ZK_CONN_RETRY_POLICY)
81+
RetryPolicies.withName(retryPolicyName) match {
82+
case ONE_TIME => baseSleepTime
83+
case N_TIME => maxRetries * baseSleepTime
84+
case BOUNDED_EXPONENTIAL_BACKOFF =>
85+
(0 until maxRetries).map { retryCount =>
86+
val retryWait = baseSleepTime * Math.max(1, Random.nextInt(1 << (retryCount + 1)))
87+
Math.min(retryWait, maxSleepTime)
88+
}.sum
89+
case UNTIL_ELAPSED => maxSleepTime
90+
case EXPONENTIAL_BACKOFF =>
91+
(0 until maxRetries).map { retryCount =>
92+
baseSleepTime * Math.max(1, Random.nextInt(1 << (retryCount + 1)))
93+
}.sum
94+
}
95+
}
96+
7497
/**
7598
* Creates a zookeeper client before calling `f` and close it after calling `f`.
7699
*/

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,12 @@ import org.apache.kyuubi.KYUUBI_VERSION
4141
import org.apache.kyuubi.KyuubiException
4242
import org.apache.kyuubi.Logging
4343
import org.apache.kyuubi.config.KyuubiConf
44-
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
45-
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
4644
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
4745
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
4846
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
4947
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
5048
import org.apache.kyuubi.ha.client.ServiceDiscovery
51-
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.buildZookeeperClient
49+
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient, getGracefulStopThreadDelay}
5250
import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
5351
import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
5452
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -69,8 +67,6 @@ class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging
6967

7068
def createClient(conf: KyuubiConf): Unit = {
7169
_namespace = conf.get(HA_ZK_NAMESPACE)
72-
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
73-
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
7470
zkClient = buildZookeeperClient(conf)
7571
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
7672
private val isConnected = new AtomicBoolean(false)
@@ -81,13 +77,13 @@ class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging
8177
case CONNECTED | RECONNECTED => isConnected.set(true)
8278
case LOST =>
8379
isConnected.set(false)
84-
val delay = maxRetries.toLong * maxSleepTime
80+
val delay = getGracefulStopThreadDelay(conf)
8581
connectionChecker.schedule(
8682
new Runnable {
8783
override def run(): Unit = if (!isConnected.get()) {
8884
error(s"Zookeeper client connection state changed to: $newState, but failed to" +
89-
s" reconnect in ${delay / 1000} seconds. Give up retry. ")
90-
serviceDiscovery.stopGracefully()
85+
s" reconnect in ${delay / 1000} seconds. Give up retry and stop gracefully . ")
86+
serviceDiscovery.stopGracefully(true)
9187
}
9288
},
9389
delay,
@@ -120,7 +116,7 @@ class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging
120116
try {
121117
serviceNode.close()
122118
} catch {
123-
case e: IOException =>
119+
case e @ (_: IOException | _: KeeperException) =>
124120
error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
125121
} finally {
126122
serviceNode = null
@@ -130,15 +126,20 @@ class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging
130126

131127
def postDeregisterService(): Boolean = {
132128
if (namespace != null) {
133-
zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
134-
true
129+
try {
130+
zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
131+
true
132+
} catch {
133+
case e: KeeperException =>
134+
warn(s"Failed to delete $namespace", e)
135+
false
136+
}
135137
} else {
136138
false
137139
}
138140
}
139141

140142
def closeClient(): Unit = {
141-
deregisterService()
142143
if (zkClient != null) zkClient.close()
143144
}
144145

kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.kyuubi.ha.client
2020
import java.io.{File, IOException}
2121
import java.net.InetAddress
2222
import java.util
23+
import java.util.concurrent.atomic.AtomicBoolean
2324
import javax.security.auth.login.Configuration
2425

2526
import scala.collection.JavaConverters._
@@ -32,7 +33,7 @@ import org.scalatest.time.SpanSugar._
3233
import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
3334
import org.apache.kyuubi.config.KyuubiConf
3435
import org.apache.kyuubi.ha.HighAvailabilityConf._
35-
import org.apache.kyuubi.service.{NoopTBinaryFrontendServer, Serverable, ServiceState}
36+
import org.apache.kyuubi.service._
3637
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
3738

3839
class ServiceDiscoverySuite extends KerberizedTestHelper {
@@ -66,17 +67,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
6667
.set(HA_ZK_NAMESPACE, namespace)
6768
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
6869

69-
val server: Serverable = new NoopTBinaryFrontendServer()
70+
var serviceDiscovery: KyuubiServiceDiscovery = null
71+
val server: Serverable = new NoopTBinaryFrontendServer() {
72+
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
73+
new NoopTBinaryFrontendService(this) {
74+
override val discoveryService: Option[Service] = {
75+
serviceDiscovery = new KyuubiServiceDiscovery(this)
76+
Some(serviceDiscovery)
77+
}
78+
})
79+
}
7080
server.initialize(conf)
7181
server.start()
72-
7382
val znodeRoot = s"/$namespace"
74-
val serviceDiscovery = new KyuubiServiceDiscovery(server.frontendServices.head)
7583
withZkClient(conf) { framework =>
7684
try {
77-
serviceDiscovery.initialize(conf)
78-
serviceDiscovery.start()
79-
8085
assert(framework.checkExists().forPath("/abc") === null)
8186
assert(framework.checkExists().forPath(znodeRoot) !== null)
8287
val children = framework.getChildren.forPath(znodeRoot).asScala
@@ -87,13 +92,12 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
8792
children.foreach { child =>
8893
framework.delete().forPath(s"""$znodeRoot/$child""")
8994
}
90-
eventually(timeout(5.seconds), interval(1.second)) {
95+
eventually(timeout(5.seconds), interval(100.millis)) {
9196
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
9297
assert(server.getServiceState === ServiceState.STOPPED)
9398
}
9499
} finally {
95100
server.stop()
96-
serviceDiscovery.stop()
97101
}
98102
}
99103
}
@@ -165,16 +169,22 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
165169
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
166170
.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
167171

168-
val server: Serverable = new NoopTBinaryFrontendServer()
172+
var serviceDiscovery: KyuubiServiceDiscovery = null
173+
val server: Serverable = new NoopTBinaryFrontendServer() {
174+
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
175+
new NoopTBinaryFrontendService(this) {
176+
override val discoveryService: Option[Service] = {
177+
serviceDiscovery = new KyuubiServiceDiscovery(this)
178+
Some(serviceDiscovery)
179+
}
180+
})
181+
}
169182
server.initialize(conf)
170183
server.start()
171184

172185
val znodeRoot = s"/$namespace"
173-
val serviceDiscovery = new EngineServiceDiscovery(server.frontendServices.head)
174186
withZkClient(conf) { framework =>
175187
try {
176-
serviceDiscovery.initialize(conf)
177-
serviceDiscovery.start()
178188

179189
assert(framework.checkExists().forPath("/abc") === null)
180190
assert(framework.checkExists().forPath(znodeRoot) !== null)
@@ -186,7 +196,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
186196
children.foreach { child =>
187197
framework.delete().forPath(s"""$znodeRoot/$child""")
188198
}
189-
eventually(timeout(5.seconds), interval(1.second)) {
199+
eventually(timeout(5.seconds), interval(100.millis)) {
190200
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
191201
assert(server.getServiceState === ServiceState.STOPPED)
192202
val msg = s"This Kyuubi instance ${server.frontendServices.head.connectionUrl}" +
@@ -217,4 +227,45 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
217227
assert(host === host2)
218228
assert(port === port2)
219229
}
230+
231+
test("stop engine in time while zk ensemble terminates") {
232+
val zkServer = new EmbeddedZookeeper()
233+
val conf = KyuubiConf()
234+
.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
235+
try {
236+
zkServer.initialize(conf)
237+
zkServer.start()
238+
var serviceDiscovery: EngineServiceDiscovery = null
239+
val server = new NoopTBinaryFrontendServer() {
240+
override val frontendServices: Seq[NoopTBinaryFrontendService] = Seq(
241+
new NoopTBinaryFrontendService(this) {
242+
override val discoveryService: Option[Service] = {
243+
serviceDiscovery = new EngineServiceDiscovery(this)
244+
Some(serviceDiscovery)
245+
}
246+
})
247+
}
248+
conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
249+
.set(HA_ZK_CONN_BASE_RETRY_WAIT, 1)
250+
.set(HA_ZK_QUORUM, zkServer.getConnectString)
251+
.set(HA_ZK_SESSION_TIMEOUT, 2000)
252+
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
253+
server.initialize(conf)
254+
server.start()
255+
assert(server.getServiceState === ServiceState.STARTED)
256+
257+
zkServer.stop()
258+
val isServerLostM = serviceDiscovery.getClass.getSuperclass.getDeclaredField("isServerLost")
259+
isServerLostM.setAccessible(true)
260+
val isServerLost = isServerLostM.get(serviceDiscovery)
261+
262+
eventually(timeout(10.seconds), interval(100.millis)) {
263+
assert(isServerLost.asInstanceOf[AtomicBoolean].get())
264+
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
265+
assert(server.getServiceState === ServiceState.STOPPED)
266+
}
267+
} finally {
268+
zkServer.stop()
269+
}
270+
}
220271
}

0 commit comments

Comments
 (0)