Skip to content

Commit c210fda

Browse files
committed
[KYUUBI #2686] Fix lock bug if engine initialization timeout
### _Why are the changes needed?_ closes #2686 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2687 from ulysses-you/lock. Closes #2686 e5dcacd [ulysses-you] finally 868f95b [ulysses-you] address comment 1ba6538 [ulysses-you] flaky test 11fad96 [ulysses-you] Fix lock bug if engine initialization timeout Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 010a34d commit c210fda

File tree

5 files changed

+138
-14
lines changed

5 files changed

+138
-14
lines changed

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.kyuubi.ha.client
1919

20-
import java.util.concurrent.TimeUnit
21-
2220
import org.apache.kyuubi.Logging
2321
import org.apache.kyuubi.config.KyuubiConf
2422

@@ -79,11 +77,10 @@ trait DiscoveryClient extends Logging {
7977
/**
8078
* The distributed lock path used to ensure only once engine being created for non-CONNECTION
8179
* share level.
80+
* @param timeout the timeout of acquiring lock, unit is ms
81+
* @throws KyuubiSQLException if timeout or get any exception during acquiring lock
8282
*/
83-
def tryWithLock[T](
84-
lockPath: String,
85-
timeout: Long,
86-
unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T
83+
def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T
8784

8885
/**
8986
* Get the engine address and port from engine space.

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,40 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
134134
})
135135
}
136136

137-
def tryWithLock[T](
138-
lockPath: String,
139-
timeout: Long,
140-
unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = {
137+
def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
141138
var lock: InterProcessSemaphoreMutex = null
142139
try {
143140
try {
144141
lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
145142
// Acquire a lease. If no leases are available, this method blocks until either the
146-
// maximum number of leases is increased or another client/process closes a lease
147-
lock.acquire(timeout, unit)
143+
// maximum number of leases is increased or another client/process closes a lease.
144+
//
145+
// Here, we should throw exception if timeout during acquiring lock.
146+
// Let's say we have three clients with same request lock to two kyuubi server instances.
147+
//
148+
// client A ---> kyuubi X -- first acquired \
149+
// client B ---> kyuubi X -- second acquired -- zookeeper
150+
// client C ---> kyuubi Y -- third acquired /
151+
//
152+
// The first client A acqiured the lock then B and C are blocked until A release the lock,
153+
// with the A created engine state:
154+
// - SUCCESS
155+
// B acquired the lock then get engine ref and release the lock.
156+
// C acquired the lock then get engine ref and release the lock.
157+
// - FAILED or TIMEOUT
158+
// B acquired the lock then try to create engine again if not timeout.
159+
// C should be timeout and throw exception back to client. This fast fail
160+
// to avoid client too long to waiting in concurrent.
161+
162+
// Return false means we are timeout
163+
val acquired = lock.acquire(timeout, TimeUnit.MILLISECONDS)
164+
if (!acquired) {
165+
throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after " +
166+
s"$timeout ms. There would be some problem that other session may " +
167+
s"create engine timeout.")
168+
}
148169
} catch {
170+
case e: KyuubiSQLException => throw e
149171
case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
150172
}
151173
f

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,8 @@ object MetricsSystem {
113113
tracing(_.updateTimer(name, System.nanoTime() - startTime, TimeUnit.NANOSECONDS))
114114
}
115115
}
116+
117+
def counterValue(name: String): Option[Long] = {
118+
maybeSystem.map(_.registry.counter(name).getCount)
119+
}
116120
}

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ private[kyuubi] class EngineRef(
7171

7272
private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
7373

74+
// In case the multi kyuubi instances have the small gap of timeout, here we add
75+
// a small amount of time for timeout
76+
private val LOCK_TIMEOUT_SPAN_FACTOR = 0.1
77+
7478
private var builder: ProcBuilder = _
7579

7680
@VisibleForTesting
@@ -147,10 +151,12 @@ private[kyuubi] class EngineRef(
147151
case _ =>
148152
val lockPath =
149153
DiscoveryPaths.makePath(
150-
s"${serverSpace}_$shareLevel",
154+
s"${serverSpace}_${shareLevel}_$engineType",
151155
"lock",
152156
Array(appUser, subdomain))
153-
discoveryClient.tryWithLock(lockPath, timeout, TimeUnit.MILLISECONDS)(f)
157+
discoveryClient.tryWithLock(
158+
lockPath,
159+
timeout + (LOCK_TIMEOUT_SPAN_FACTOR * timeout).toLong)(f)
154160
}
155161

156162
private def create(

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.kyuubi.engine
1919

2020
import java.util.UUID
21+
import java.util.concurrent.Executors
2122

2223
import org.apache.hadoop.security.UserGroupInformation
2324
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@@ -28,6 +29,8 @@ import org.apache.kyuubi.config.KyuubiConf._
2829
import org.apache.kyuubi.ha.HighAvailabilityConf
2930
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
3031
import org.apache.kyuubi.ha.client.DiscoveryPaths
32+
import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
33+
import org.apache.kyuubi.metrics.MetricsSystem
3134
import org.apache.kyuubi.util.NamedThreadFactory
3235
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
3336

@@ -37,6 +40,7 @@ class EngineRefSuite extends KyuubiFunSuite {
3740
private val zkServer = new EmbeddedZookeeper
3841
private val conf = KyuubiConf()
3942
private val user = Utils.currentUser
43+
private val metricsSystem = new MetricsSystem
4044

4145
override def beforeAll(): Unit = {
4246
val zkData = Utils.createTempDir()
@@ -45,17 +49,22 @@ class EngineRefSuite extends KyuubiFunSuite {
4549
.set("spark.sql.catalogImplementation", "in-memory")
4650
zkServer.initialize(conf)
4751
zkServer.start()
52+
metricsSystem.initialize(conf)
53+
metricsSystem.start()
4854
super.beforeAll()
4955
}
5056

5157
override def afterAll(): Unit = {
58+
metricsSystem.stop()
5259
zkServer.stop()
5360
super.afterAll()
5461
}
5562

5663
override def beforeEach(): Unit = {
5764
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
5865
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
66+
conf.unset(KyuubiConf.ENGINE_POOL_SIZE)
67+
conf.unset(KyuubiConf.ENGINE_POOL_NAME)
5968
super.beforeEach()
6069
}
6170

@@ -247,4 +256,90 @@ class EngineRefSuite extends KyuubiFunSuite {
247256
assert(port2 == port1, "engine shared")
248257
}
249258
}
259+
260+
test("different engine type should use its own lock") {
261+
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
262+
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
263+
conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
264+
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test1")
265+
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
266+
val conf1 = conf.clone
267+
conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
268+
val conf2 = conf.clone
269+
conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
270+
271+
val start = System.currentTimeMillis()
272+
val times = new Array[Long](2)
273+
val executor = Executors.newFixedThreadPool(2)
274+
try {
275+
executor.execute(() => {
276+
DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
277+
try {
278+
new EngineRef(conf1, user, UUID.randomUUID().toString, null)
279+
.getOrCreate(client)
280+
} finally {
281+
times(0) = System.currentTimeMillis()
282+
}
283+
}
284+
})
285+
executor.execute(() => {
286+
DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
287+
try {
288+
new EngineRef(conf2, user, UUID.randomUUID().toString, null)
289+
.getOrCreate(client)
290+
} finally {
291+
times(1) = System.currentTimeMillis()
292+
}
293+
}
294+
})
295+
296+
eventually(timeout(10.seconds), interval(200.milliseconds)) {
297+
assert(times.forall(_ > start))
298+
// ENGINE_INIT_TIMEOUT is 3000ms
299+
assert(times.max - times.min < 2500)
300+
}
301+
} finally {
302+
executor.shutdown()
303+
}
304+
}
305+
306+
test("three same lock request with initialization timeout") {
307+
val id = UUID.randomUUID().toString
308+
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
309+
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
310+
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
311+
conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
312+
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test2")
313+
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
314+
315+
val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
316+
val start = System.currentTimeMillis()
317+
val times = new Array[Long](3)
318+
val executor = Executors.newFixedThreadPool(3)
319+
try {
320+
(0 until (3)).foreach { i =>
321+
val cloned = conf.clone
322+
executor.execute(() => {
323+
DiscoveryClientProvider.withDiscoveryClient(cloned) { client =>
324+
try {
325+
new EngineRef(cloned, user, id, null).getOrCreate(client)
326+
} finally {
327+
times(i) = System.currentTimeMillis()
328+
}
329+
}
330+
})
331+
}
332+
333+
eventually(timeout(20.seconds), interval(200.milliseconds)) {
334+
assert(times.forall(_ > start))
335+
// ENGINE_INIT_TIMEOUT is 3000ms
336+
assert(times.max - times.min > 2800)
337+
}
338+
339+
// we should only submit two engines, the last request should timeout and fail
340+
assert(MetricsSystem.counterValue(ENGINE_TOTAL).get - beforeEngines == 2)
341+
} finally {
342+
executor.shutdown()
343+
}
344+
}
250345
}

0 commit comments

Comments
 (0)