17
17
18
18
package org .apache .kyuubi .service
19
19
20
- import java .util
20
+ import java .time . Duration
21
21
22
22
import scala .collection .JavaConverters ._
23
23
24
24
import org .apache .hive .service .rpc .thrift ._
25
- import org .apache .thrift .protocol .TBinaryProtocol
26
- import org .apache .thrift .transport .TSocket
25
+ import org .scalatest .time ._
27
26
28
27
import org .apache .kyuubi .{KyuubiFunSuite , KyuubiSQLException , Utils }
29
28
import org .apache .kyuubi .config .KyuubiConf
30
29
import org .apache .kyuubi .config .KyuubiConf .{FRONTEND_BIND_HOST , FRONTEND_CONNECTION_URL_USE_HOSTNAME , FRONTEND_THRIFT_BINARY_BIND_HOST , FRONTEND_THRIFT_BINARY_BIND_PORT }
31
- import org .apache .kyuubi .operation .{OperationHandle , OperationType }
30
+ import org .apache .kyuubi .operation .{OperationHandle , OperationType , TClientTestUtils }
32
31
import org .apache .kyuubi .service .TFrontendService .{FeServiceServerContext , SERVER_VERSION }
33
- import org .apache .kyuubi .service .authentication .PlainSASLHelper
34
- import org .apache .kyuubi .session .SessionHandle
32
+ import org .apache .kyuubi .session .{AbstractSession , SessionHandle }
35
33
36
- class ThriftFrontendServiceSuite extends KyuubiFunSuite {
34
+ class TFrontendServiceSuite extends KyuubiFunSuite {
37
35
38
36
protected val server = new NoopTBinaryFrontendServer ()
39
37
protected val conf = KyuubiConf ()
40
38
.set(KyuubiConf .FRONTEND_THRIFT_BINARY_BIND_PORT , 0 )
41
39
.set(" kyuubi.test.server.should.fail" , " false" )
42
-
43
- val user : String = System .getProperty(" user.name" )
44
- val sessionConf : util.Map [String , String ] = new util.HashMap ()
40
+ .set(KyuubiConf .SESSION_CHECK_INTERVAL , Duration .ofSeconds(5 ).toMillis)
41
+ .set(KyuubiConf .SESSION_IDLE_TIMEOUT , Duration .ofSeconds(5 ).toMillis)
42
+ .set(KyuubiConf .OPERATION_IDLE_TIMEOUT , Duration .ofSeconds(20 ).toMillis)
43
+ .set(KyuubiConf .SESSION_CONF_RESTRICT_LIST , Seq (" spark.*" ))
44
+ .set(KyuubiConf .SESSION_CONF_IGNORE_LIST , Seq (" session.engine.*" ))
45
+
46
+ private def withSessionHandle (f : (TCLIService .Iface , TSessionHandle ) => Unit ): Unit = {
47
+ TClientTestUtils .withSessionHandle(server.frontendServices.head.connectionUrl, Map .empty)(f)
48
+ }
45
49
46
50
override def beforeAll (): Unit = {
47
51
server.initialize(conf)
@@ -54,45 +58,6 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
54
58
super .afterAll()
55
59
}
56
60
57
- protected def withThriftClient (f : TCLIService .Iface => Unit ): Unit = {
58
- val hostAndPort = server.frontendServices.head.connectionUrl.split(" :" )
59
- val host = hostAndPort.head
60
- val port = hostAndPort(1 ).toInt
61
- val socket = new TSocket (host, port)
62
- val transport = PlainSASLHelper .getPlainTransport(Utils .currentUser, " anonymous" , socket)
63
-
64
- val protocol = new TBinaryProtocol (transport)
65
- val client = new TCLIService .Client (protocol)
66
- transport.open()
67
- try {
68
- f(client)
69
- } finally {
70
- socket.close()
71
- }
72
- }
73
-
74
- protected def withSessionHandle (f : (TCLIService .Iface , TSessionHandle ) => Unit ): Unit = {
75
- withThriftClient { client =>
76
- val req = new TOpenSessionReq ()
77
- req.setUsername(user)
78
- req.setPassword(" anonymous" )
79
- req.setConfiguration(sessionConf)
80
- val resp = client.OpenSession (req)
81
- val handle = resp.getSessionHandle
82
-
83
- try {
84
- f(client, handle)
85
- } finally {
86
- val tCloseSessionReq = new TCloseSessionReq (handle)
87
- try {
88
- client.CloseSession (tCloseSessionReq)
89
- } catch {
90
- case e : Exception => error(s " Failed to close $handle" , e)
91
- }
92
- }
93
- }
94
- }
95
-
96
61
private def checkOperationResult (
97
62
client : TCLIService .Iface ,
98
63
handle : TOperationHandle ): Unit = {
@@ -151,24 +116,26 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
151
116
}
152
117
153
118
test(" open session" ) {
154
- withThriftClient { client =>
155
- val req = new TOpenSessionReq ()
156
- req.setUsername(user)
157
- req.setPassword(" anonymous" )
158
- val resp = client.OpenSession (req)
159
- val handle = resp.getSessionHandle
160
- assert(handle != null )
161
- assert(resp.getStatus.getStatusCode == TStatusCode .SUCCESS_STATUS )
162
-
163
- req.setConfiguration(Map (" kyuubi.test.should.fail" -> " true" ).asJava)
164
- val resp1 = client.OpenSession (req)
165
- assert(resp1.getSessionHandle === null )
166
- assert(resp1.getStatus.getStatusCode === TStatusCode .ERROR_STATUS )
167
- val cause = KyuubiSQLException .toCause(resp1.getStatus.getInfoMessages.asScala)
168
- assert(cause.isInstanceOf [KyuubiSQLException ])
169
- assert(cause.getMessage === " Asked to fail" )
119
+ TClientTestUtils .withThriftClient(server.frontendServices.head) {
120
+ client =>
121
+ val req = new TOpenSessionReq ()
122
+ req.setUsername(Utils .currentUser)
123
+ req.setPassword(" anonymous" )
124
+ val resp = client.OpenSession (req)
125
+ val handle = resp.getSessionHandle
126
+ assert(handle != null )
127
+ assert(resp.getStatus.getStatusCode == TStatusCode .SUCCESS_STATUS )
128
+
129
+ req.setConfiguration(Map (" kyuubi.test.should.fail" -> " true" ).asJava)
130
+ val resp1 = client.OpenSession (req)
131
+ assert(resp1.getSessionHandle === null )
132
+ assert(resp1.getStatus.getStatusCode === TStatusCode .ERROR_STATUS )
133
+ val cause = KyuubiSQLException .toCause(resp1.getStatus.getInfoMessages.asScala)
134
+ assert(cause.isInstanceOf [KyuubiSQLException ])
135
+ assert(cause.getMessage === " Asked to fail" )
136
+
137
+ assert(resp1.getStatus.getErrorMessage === " Asked to fail" )
170
138
171
- assert(resp1.getStatus.getErrorMessage === " Asked to fail" )
172
139
}
173
140
}
174
141
@@ -513,4 +480,58 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
513
480
" Delegation token is not supported" )
514
481
}
515
482
}
483
+
484
+ test(" close expired operations" ) {
485
+ withSessionHandle { (client, handle) =>
486
+ val req = new TCancelOperationReq ()
487
+ val req1 = new TGetSchemasReq (handle)
488
+ val resp1 = client.GetSchemas (req1)
489
+
490
+ val sessionManager = server.backendService.sessionManager
491
+ val session = sessionManager
492
+ .getSession(SessionHandle (handle))
493
+ .asInstanceOf [AbstractSession ]
494
+ var lastAccessTime = session.lastAccessTime
495
+ assert(sessionManager.getOpenSessionCount == 1 )
496
+ assert(session.lastIdleTime > 0 )
497
+
498
+ resp1.getOperationHandle
499
+ req.setOperationHandle(resp1.getOperationHandle)
500
+ val resp2 = client.CancelOperation (req)
501
+ assert(resp2.getStatus.getStatusCode === TStatusCode .SUCCESS_STATUS )
502
+ assert(sessionManager.getOpenSessionCount == 1 )
503
+ assert(session.lastIdleTime == 0 )
504
+ assert(lastAccessTime < session.lastAccessTime)
505
+ lastAccessTime = session.lastAccessTime
506
+
507
+ eventually(timeout(Span (60 , Seconds )), interval(Span (1 , Seconds ))) {
508
+ assert(session.lastIdleTime > lastAccessTime)
509
+ }
510
+
511
+ info(" operation is terminated" )
512
+ assert(lastAccessTime == session.lastAccessTime)
513
+ assert(sessionManager.getOpenSessionCount == 1 )
514
+
515
+ eventually(timeout(Span (60 , Seconds )), interval(Span (1 , Seconds ))) {
516
+ assert(session.lastAccessTime > lastAccessTime)
517
+ }
518
+ assert(sessionManager.getOpenSessionCount == 0 )
519
+ }
520
+ }
521
+
522
+ test(" test validate and normalize config" ) {
523
+ val sessionManager = server.backendService.sessionManager
524
+ // test restrict
525
+ intercept[KyuubiSQLException ] {
526
+ sessionManager.validateAndNormalizeConf(Map (" spark.driver.memory" -> " 2G" ))
527
+ }
528
+
529
+ // test ignore
530
+ val conf = sessionManager.validateAndNormalizeConf(
531
+ Map (
532
+ " session.engine.spark.main.resource" -> " org.apahce.kyuubi.test" ,
533
+ " session.check.interval" -> " 10000" ))
534
+ assert(conf.size == 1 )
535
+ assert(conf(" session.check.interval" ) == " 10000" )
536
+ }
516
537
}
0 commit comments