Skip to content

Commit 5589406

Browse files
turboFeipan3793
authored andcommitted
[KYUUBI #3318] Transfer the TGetInfoReq to kyuubi engine side to check the connection valid and keep connection alive
### _Why are the changes needed?_ Now the connection path is: `client`-> `kyuubiServer` -> `kyuubiEngine` Maybe the connection between `client -> kyuubiServer` is valid, but the connection between `kyuubiServer -> kyuubiEngine` is not. So we need check the whole path. BTW, before, if customer invoke KyuubiConnection::isValid, it can make the connection between client and kyuubiServer keep alive. Now, it can make the whole connection path keep alive. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3318 from turboFei/is_valid. Closes #3318 af4d6b7 [Fei Wang] Transfer the TGetInfoReq to kyuubi engine side to keep the kyuubi connection alive Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
1 parent fd16dd7 commit 5589406

File tree

4 files changed

+41
-1
lines changed

4 files changed

+41
-1
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ abstract class AbstractSession(
7575
}
7676
}
7777

78-
private def withAcquireRelease[T](userAccess: Boolean = true)(f: => T): T = {
78+
protected def withAcquireRelease[T](userAccess: Boolean = true)(f: => T): T = {
7979
acquire(userAccess)
8080
try f
8181
finally release(userAccess)

kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ class KyuubiSyncThriftClient private (
227227
resp.getOperationHandle
228228
}
229229

230+
def getInfo(infoType: TGetInfoType): TGetInfoResp = {
231+
val req = new TGetInfoReq(_remoteSessionHandle, infoType)
232+
val resp = withLockAcquiredAsyncRequest(GetInfo(req))
233+
ThriftUtils.verifyTStatus(resp.getStatus)
234+
resp
235+
}
236+
230237
def getTypeInfo: TOperationHandle = {
231238
val req = new TGetTypeInfoReq(_remoteSessionHandle)
232239
val resp = withLockAcquiredAsyncRequest(GetTypeInfo(req))

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,14 @@ class KyuubiSessionImpl(
197197
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
198198
}
199199
}
200+
201+
override def getInfo(infoType: TGetInfoType): TGetInfoValue = {
202+
if (client != null) {
203+
withAcquireRelease() {
204+
client.getInfo(infoType).getInfoValue
205+
}
206+
} else {
207+
super.getInfo(infoType)
208+
}
209+
}
200210
}

kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
2929
import org.apache.kyuubi.WithKyuubiServer
3030
import org.apache.kyuubi.config.KyuubiConf
3131
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
32+
import org.apache.kyuubi.engine.ApplicationState
3233
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
3334
import org.apache.kyuubi.jdbc.hive.KyuubiConnection
3435
import org.apache.kyuubi.plugin.SessionConfAdvisor
36+
import org.apache.kyuubi.session.KyuubiSessionManager
3537

3638
/**
3739
* UT with Connection level engine shared cost much time, only run basic jdbc tests.
@@ -210,6 +212,27 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
210212
assert(kyuubiConnection.isClosed)
211213
}
212214
}
215+
216+
test("transfer the TGetInfoReq to kyuubi engine side to verify the connection valid") {
217+
withSessionConf(Map.empty)(Map(KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
218+
withJdbcStatement() { statement =>
219+
val conn = statement.getConnection.asInstanceOf[KyuubiConnection]
220+
assert(conn.isValid(3000))
221+
val sessionManager = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
222+
eventually(timeout(10.seconds)) {
223+
assert(sessionManager.allSessions().size === 1)
224+
}
225+
val engineId = sessionManager.allSessions().head.handle.identifier.toString
226+
// kill the engine application and wait the engine terminate
227+
sessionManager.applicationManager.killApplication(None, engineId)
228+
eventually(timeout(30.seconds), interval(100.milliseconds)) {
229+
assert(sessionManager.applicationManager.getApplicationInfo(None, engineId)
230+
.exists(_.state == ApplicationState.NOT_FOUND))
231+
}
232+
assert(!conn.isValid(3000))
233+
}
234+
}
235+
}
213236
}
214237

215238
class TestSessionConfAdvisor extends SessionConfAdvisor {

0 commit comments

Comments
 (0)