Skip to content

Commit

Permalink
[KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request im…
Browse files Browse the repository at this point in the history
…mediately after marking the engine not alive

Support to interrupt the thrift request immediately after marking the engine not alive

# 🔍 Description
## Issue References 🔗

This pull request fixes #6172

## Describe Your Solution 🔧

https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala#L103-L110

When probe fails and exceeds engineAliveTimeout, not interrupt the thrift request immediately, only marked `remoteEngineBroken` and wait next `engineAliveProbeInterval` to interrupt.
Unit test `KyuubiOperationPerConnectionSuite` assert timeout 3s.

https://github.com/apache/kyuubi/blob/12c5568c9b020b1212c9514f046c67fcb267467e/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala#L344-L346

Exception log
```
03:25:15.125 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException: Socket is closed by peer.
	...
03:25:16.126 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)
	...
03:25:16.126 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 ERROR KyuubiSyncThriftClient: Mark the engine[local-1714879506640] not alive with no recent alive probe success: 2001 ms exceeds timeout 1000 ms
```

Success log
```
16:57:46.859 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: The engine[local-1715101059872] alive probe fails
...
16:57:46.860 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 ERROR KyuubiSyncThriftClient: Mark the engine[local-1715101059872] not alive with no recent alive probe success: 1001 ms exceeds timeout 1000 ms
16:57:47.860 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: Removing Clients for TSessionHandle(sessionId:THandleIdentifier(guid:9D AA D5 C2 9B E4 43 D7 BE 81 D0 99 EA 5B 9E 37, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38))
```

## Types of changes :bookmark:

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request :coffin:

#### Behavior With This Pull Request :tada:

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6375 from beryllw/kyuubi_6172.

Closes #6172

991798b [wangjunbo] [KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request immediately after marking the engine not alive

Authored-by: wangjunbo <wangjunbo@qiyi.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
  • Loading branch information
beryllw authored and turboFei committed May 9, 2024
1 parent 5b6ab1a commit 1fc2b35
Showing 1 changed file with 19 additions and 13 deletions.
Expand Up @@ -87,6 +87,23 @@ class KyuubiSyncThriftClient private (
private def startEngineAliveProbe(): Unit = {
engineAliveThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"engine-alive-probe-" + _aliveProbeSessionHandle)

def closeClient(): Unit = {
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol =>
Utils.tryLogNonFatalError {
if (tProtocol.getTransport.isOpen) {
tProtocol.getTransport.close()
}
}
}
clientClosedByAliveProbe = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
}

val task = new Runnable {
override def run(): Unit = {
if (!remoteEngineBroken && !engineConnectionClosed) {
Expand All @@ -108,23 +125,12 @@ class KyuubiSyncThriftClient private (
error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms")
remoteEngineBroken = true
closeClient()
}
}
}
} else {
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol =>
Utils.tryLogNonFatalError {
if (tProtocol.getTransport.isOpen) {
tProtocol.getTransport.close()
}
}
}
clientClosedByAliveProbe = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
closeClient()
}
}
}
Expand Down

0 comments on commit 1fc2b35

Please sign in to comment.