Skip to content

Commit 4319df7

Browse files
simon824ulysses-you
authored andcommitted
[KYUUBI #1501] Introduce operationsResource
### _Why are the changes needed?_ #1501 Introduce operationsResource mv parseSessionHandle() to SessionHandle mv parseOperationHandle() to OperationHandle ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1502 from simon824/operationresource. Closes #1501 c4b1b64 [simon] fix 72446d3 [simon] introduce operationsResource 06d1201 [simon] init Authored-by: simon <zhangshiming@cvte.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent bd3d4d7 commit 4319df7

File tree

7 files changed

+201
-145
lines changed

7 files changed

+201
-145
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationHandle.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.kyuubi.operation
1919

20-
import java.util.Objects
20+
import java.util.{Objects, UUID}
2121

2222
import scala.language.implicitConversions
23+
import scala.util.control.NonFatal
2324

2425
import org.apache.hive.service.rpc.thrift.{TOperationHandle, TProtocolVersion}
2526

27+
import org.apache.kyuubi.KyuubiSQLException
2628
import org.apache.kyuubi.cli.{Handle, HandleIdentifier}
2729
import org.apache.kyuubi.operation.OperationType.OperationType
2830

@@ -81,4 +83,23 @@ object OperationHandle {
8183
tOperationHandle.setHasResultSet(handle._hasResultSet)
8284
tOperationHandle
8385
}
86+
87+
def parseOperationHandle(operationHandleStr: String): OperationHandle = {
88+
try {
89+
val operationHandleParts = operationHandleStr.split("\\|")
90+
require(
91+
operationHandleParts.size == 4,
92+
s"Expected 4 parameters but found ${operationHandleParts.size}.")
93+
94+
val handleIdentifier = HandleIdentifier(
95+
UUID.fromString(operationHandleParts(0)),
96+
UUID.fromString(operationHandleParts(1)))
97+
val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
98+
val operationType = OperationType.withName(operationHandleParts(3))
99+
OperationHandle(handleIdentifier, operationType, protocolVersion)
100+
} catch {
101+
case NonFatal(e) =>
102+
throw KyuubiSQLException(s"Invalid $operationHandleStr", e)
103+
}
104+
}
84105
}

kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionHandle.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.kyuubi.session
1919

20-
import java.util.Objects
20+
import java.util.{Objects, UUID}
21+
22+
import scala.util.control.NonFatal
2123

2224
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TSessionHandle}
2325

26+
import org.apache.kyuubi.KyuubiSQLException
2427
import org.apache.kyuubi.cli.{Handle, HandleIdentifier}
2528

2629
case class SessionHandle(
@@ -55,4 +58,22 @@ object SessionHandle {
5558
def apply(protocol: TProtocolVersion): SessionHandle = {
5659
apply(HandleIdentifier(), protocol)
5760
}
61+
62+
def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
63+
try {
64+
val sessionHandleParts = sessionHandleStr.split("\\|")
65+
require(
66+
sessionHandleParts.size == 3,
67+
s"Expected 3 parameters but found ${sessionHandleParts.size}.")
68+
69+
val handleIdentifier = HandleIdentifier(
70+
UUID.fromString(sessionHandleParts(0)),
71+
UUID.fromString(sessionHandleParts(1)))
72+
val protocolVersion = TProtocolVersion.findByValue(sessionHandleParts(2).toInt)
73+
SessionHandle(handleIdentifier, protocolVersion)
74+
} catch {
75+
case NonFatal(e) =>
76+
throw KyuubiSQLException(s"Invalid $sessionHandleStr", e)
77+
}
78+
}
5879
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/ApiRootResource.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ private[v1] class ApiRootResource extends ApiRequestContext {
3737
@Path("sessions")
3838
def sessions: Class[SessionsResource] = classOf[SessionsResource]
3939

40+
@Path("operations")
41+
def operations: Class[OperationsResource] = classOf[OperationsResource]
42+
4043
@GET
4144
@Path("exception")
4245
@Produces(Array(MediaType.TEXT_PLAIN))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.server.api.v1
19+
20+
import javax.ws.rs.{GET, Path, PathParam, Produces, _}
21+
import javax.ws.rs.core.MediaType
22+
23+
import scala.util.control.NonFatal
24+
25+
import io.swagger.v3.oas.annotations.media.Content
26+
import io.swagger.v3.oas.annotations.responses.ApiResponse
27+
import io.swagger.v3.oas.annotations.tags.Tag
28+
29+
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
30+
import org.apache.kyuubi.server.api.ApiRequestContext
31+
32+
@Tag(name = "Operation")
33+
@Produces(Array(MediaType.APPLICATION_JSON))
34+
private[v1] class OperationsResource extends ApiRequestContext {
35+
36+
@ApiResponse(
37+
responseCode = "200",
38+
content = Array(new Content(
39+
mediaType = MediaType.APPLICATION_JSON)),
40+
description =
41+
"Get an operation detail with a given session identifier and operation identifier")
42+
@GET
43+
@Path("{operationHandle}")
44+
def getOperationDetail(
45+
@PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
46+
try {
47+
val operation = backendService.sessionManager.operationManager
48+
.getOperation(parseOperationHandle(operationHandleStr))
49+
OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
50+
} catch {
51+
case NonFatal(_) =>
52+
throw new NotFoundException(s"Error getting an operation detail")
53+
}
54+
}
55+
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala

Lines changed: 24 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kyuubi.server.api.v1
1919

20-
import java.util.UUID
2120
import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces, _}
2221
import javax.ws.rs.core.{MediaType, Response}
2322

@@ -30,10 +29,11 @@ import io.swagger.v3.oas.annotations.tags.Tag
3029
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}
3130

3231
import org.apache.kyuubi.Utils.error
33-
import org.apache.kyuubi.cli.HandleIdentifier
34-
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
32+
import org.apache.kyuubi.operation.OperationHandle
33+
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
3534
import org.apache.kyuubi.server.api.ApiRequestContext
3635
import org.apache.kyuubi.session.SessionHandle
36+
import org.apache.kyuubi.session.SessionHandle.parseSessionHandle
3737

3838
@Tag(name = "Session")
3939
@Produces(Array(MediaType.APPLICATION_JSON))
@@ -61,8 +61,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
6161
@GET
6262
@Path("{sessionHandle}")
6363
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
64-
val sessionHandle = parseSessionHandle(sessionHandleStr)
6564
try {
65+
val sessionHandle = parseSessionHandle(sessionHandleStr)
6666
val session = backendService.sessionManager.getSession(sessionHandle)
6767
SessionDetail(
6868
session.user,
@@ -75,8 +75,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
7575
session.conf)
7676
} catch {
7777
case NonFatal(e) =>
78-
error(s"Invalid $sessionHandle", e)
79-
throw new NotFoundException(s"Invalid $sessionHandle")
78+
error(s"Invalid $sessionHandleStr", e)
79+
throw new NotFoundException(s"Invalid $sessionHandleStr")
8080
}
8181
}
8282

@@ -91,11 +91,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
9191
def getInfo(
9292
@PathParam("sessionHandle") sessionHandleStr: String,
9393
@PathParam("infoType") infoType: Int): InfoDetail = {
94-
val sessionHandle = parseSessionHandle(sessionHandleStr)
95-
val info = TGetInfoType.findByValue(infoType)
96-
9794
try {
98-
val infoValue = backendService.getInfo(sessionHandle, info)
95+
val info = TGetInfoType.findByValue(infoType)
96+
val infoValue = backendService.getInfo(parseSessionHandle(sessionHandleStr), info)
9997
InfoDetail(info.toString, infoValue.getStringValue)
10098
} catch {
10199
case NonFatal(e) =>
@@ -152,8 +150,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
152150
@DELETE
153151
@Path("{sessionHandle}")
154152
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
155-
val sessionHandle = parseSessionHandle(sessionHandleStr)
156-
backendService.closeSession(sessionHandle)
153+
backendService.closeSession(parseSessionHandle(sessionHandleStr))
157154
Response.ok().build()
158155
}
159156

@@ -167,10 +164,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
167164
def executeStatement(
168165
@PathParam("sessionHandle") sessionHandleStr: String,
169166
request: StatementRequest): OperationHandle = {
170-
val sessionHandle = parseSessionHandle(sessionHandleStr)
171167
try {
172168
backendService.executeStatement(
173-
sessionHandle,
169+
parseSessionHandle(sessionHandleStr),
174170
request.statement,
175171
request.runAsync,
176172
request.queryTimeout)
@@ -188,9 +184,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
188184
@POST
189185
@Path("{sessionHandle}/operations/typeInfo")
190186
def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
191-
val sessionHandle = parseSessionHandle(sessionHandleStr)
192187
try {
193-
backendService.getTypeInfo(sessionHandle)
188+
backendService.getTypeInfo(parseSessionHandle(sessionHandleStr))
194189
} catch {
195190
case NonFatal(_) =>
196191
throw new NotFoundException(s"Error getting type information")
@@ -205,9 +200,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
205200
@POST
206201
@Path("{sessionHandle}/operations/catalogs")
207202
def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
208-
val sessionHandle = parseSessionHandle(sessionHandleStr)
209203
try {
210-
backendService.getCatalogs(sessionHandle)
204+
backendService.getCatalogs(parseSessionHandle(sessionHandleStr))
211205
} catch {
212206
case NonFatal(_) =>
213207
throw new NotFoundException(s"Error getting catalogs")
@@ -224,9 +218,11 @@ private[v1] class SessionsResource extends ApiRequestContext {
224218
def getSchemas(
225219
@PathParam("sessionHandle") sessionHandleStr: String,
226220
request: GetSchemasRequest): OperationHandle = {
227-
val sessionHandle = parseSessionHandle(sessionHandleStr)
228221
try {
229-
backendService.getSchemas(sessionHandle, request.catalogName, request.schemaName)
222+
backendService.getSchemas(
223+
parseSessionHandle(sessionHandleStr),
224+
request.catalogName,
225+
request.schemaName)
230226
} catch {
231227
case NonFatal(_) =>
232228
throw new NotFoundException(s"Error getting schemas")
@@ -243,10 +239,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
243239
def getTables(
244240
@PathParam("sessionHandle") sessionHandleStr: String,
245241
request: GetTablesRequest): OperationHandle = {
246-
val sessionHandle = parseSessionHandle(sessionHandleStr)
247242
try {
248243
backendService.getTables(
249-
sessionHandle,
244+
parseSessionHandle(sessionHandleStr),
250245
request.catalogName,
251246
request.schemaName,
252247
request.tableName,
@@ -265,9 +260,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
265260
@POST
266261
@Path("{sessionHandle}/operations/tableTypes")
267262
def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
268-
val sessionHandle = parseSessionHandle(sessionHandleStr)
269263
try {
270-
backendService.getTableTypes(sessionHandle)
264+
backendService.getTableTypes(parseSessionHandle(sessionHandleStr))
271265
} catch {
272266
case NonFatal(_) =>
273267
throw new NotFoundException(s"Error getting table types")
@@ -284,10 +278,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
284278
def getColumns(
285279
@PathParam("sessionHandle") sessionHandleStr: String,
286280
request: GetColumnsRequest): OperationHandle = {
287-
val sessionHandle = parseSessionHandle(sessionHandleStr)
288281
try {
289282
backendService.getColumns(
290-
sessionHandle,
283+
parseSessionHandle(sessionHandleStr),
291284
request.catalogName,
292285
request.schemaName,
293286
request.tableName,
@@ -308,10 +301,9 @@ private[v1] class SessionsResource extends ApiRequestContext {
308301
def getFunctions(
309302
@PathParam("sessionHandle") sessionHandleStr: String,
310303
request: GetFunctionsRequest): OperationHandle = {
311-
val sessionHandle = parseSessionHandle(sessionHandleStr)
312304
try {
313305
backendService.getFunctions(
314-
sessionHandle,
306+
parseSessionHandle(sessionHandleStr),
315307
request.catalogName,
316308
request.schemaName,
317309
request.functionName)
@@ -331,77 +323,15 @@ private[v1] class SessionsResource extends ApiRequestContext {
331323
def closeOperation(
332324
@PathParam("sessionHandle") sessionHandleStr: String,
333325
@PathParam("operationHandle") operationHandleStr: String): OperationHandle = {
334-
val sessionHandle = parseSessionHandle(sessionHandleStr)
335-
val operationHandle = parseOperationHandle(operationHandleStr)
326+
336327
try {
337-
backendService.sessionManager.getSession(sessionHandle).closeOperation(operationHandle)
328+
val operationHandle = parseOperationHandle(operationHandleStr)
329+
backendService.sessionManager.getSession(parseSessionHandle(sessionHandleStr))
330+
.closeOperation(operationHandle)
338331
operationHandle
339332
} catch {
340333
case NonFatal(_) =>
341334
throw new NotFoundException(s"Error closing an operation")
342335
}
343336
}
344-
345-
@ApiResponse(
346-
responseCode = "200",
347-
content = Array(new Content(
348-
mediaType = MediaType.APPLICATION_JSON)),
349-
description =
350-
"Get an operation detail with a given session identifier and operation identifier")
351-
@GET
352-
@Path("{sessionHandle}/operations/{operationHandle}")
353-
def getOperationHandle(
354-
@PathParam("sessionHandle") sessionHandleStr: String,
355-
@PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
356-
val operationHandle = parseOperationHandle(operationHandleStr)
357-
try {
358-
val operation = backendService.sessionManager.operationManager.getOperation(operationHandle)
359-
OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
360-
} catch {
361-
case NonFatal(e) =>
362-
throw new NotFoundException(s"Error closing an operation")
363-
}
364-
}
365-
366-
def parseOperationHandle(operationHandleStr: String): OperationHandle = {
367-
try {
368-
val operationHandleParts = operationHandleStr.split("\\|")
369-
require(
370-
operationHandleParts.size == 4,
371-
s"Expected 4 parameters but found ${operationHandleParts.size}.")
372-
373-
val handleIdentifier = new HandleIdentifier(
374-
UUID.fromString(operationHandleParts(0)),
375-
UUID.fromString(operationHandleParts(1)))
376-
377-
val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
378-
val operationType = OperationType.withName(operationHandleParts(3))
379-
val operationHandle = new OperationHandle(handleIdentifier, operationType, protocolVersion)
380-
381-
operationHandle
382-
} catch {
383-
case NonFatal(e) =>
384-
error(s"Error getting operationHandle by $operationHandleStr.", e)
385-
throw new NotFoundException(s"Error getting operationHandle by $operationHandleStr.")
386-
}
387-
}
388-
389-
def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
390-
try {
391-
val splitSessionHandle = sessionHandleStr.split("\\|")
392-
val handleIdentifier = new HandleIdentifier(
393-
UUID.fromString(splitSessionHandle(0)),
394-
UUID.fromString(splitSessionHandle(1)))
395-
val protocolVersion = TProtocolVersion.findByValue(splitSessionHandle(2).toInt)
396-
val sessionHandle = new SessionHandle(handleIdentifier, protocolVersion)
397-
398-
// if the sessionHandle is invalid, KyuubiSQLException will be thrown here.
399-
backendService.sessionManager.getSession(sessionHandle)
400-
sessionHandle
401-
} catch {
402-
case NonFatal(e) =>
403-
error(s"Error getting sessionHandle by $sessionHandleStr.", e)
404-
throw new NotFoundException(s"Error getting sessionHandle by $sessionHandleStr.")
405-
}
406-
}
407337
}

0 commit comments

Comments
 (0)