Skip to content

Commit 73ac1b6

Browse files
simon824yaooqinn
authored andcommitted
[KYUUBI #1445] [Kyuubi #1444] Implement api: /${version}/sessions/${session_identifier}/operations/${operation_identifier}
### _Why are the changes needed?_ This is a subtask of umbrella issue #KPIP-1 /${version}/sessions/${session_identifier}/operations/${operation_identifier} - desc: get an operation with a given session identifier and operation identifier - method: GET - params: none - returns: an instance of OperationHandle /${version}/sessions/${session_identifier}/operations/${operation_identifier} - desc: remove operation based on a given session identifier and operation identifier - method: DELETE - params: none - returns: an instance of OperationHandle ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1445 from simon824/r5. Closes #1445 196ea4c [simon] codestyle df1c20e [simon] remove isHasOperation 6bdebab [simon] fix 609aba1 [simon] codestyle dbae387 [simon] codestyle 7eb9eeb [simon] Merge remote-tracking branch 'upstream/master' into r5 8a9e78c [simon] codestyle 15b00aa [simon] operation b154cbc [simon] init Authored-by: simon <zhangshiming@cvte.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 97c3835 commit 73ac1b6

File tree

3 files changed

+174
-16
lines changed

3 files changed

+174
-16
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
package org.apache.kyuubi.server.api.v1
1919

2020
import java.util.UUID
21-
import javax.ws.rs._
22-
import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces}
21+
import javax.ws.rs.{Consumes, DELETE, GET, Path, PathParam, POST, Produces, _}
2322
import javax.ws.rs.core.{MediaType, Response}
2423

2524
import scala.collection.JavaConverters._
@@ -32,7 +31,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TProtocolVersion}
3231

3332
import org.apache.kyuubi.Utils.error
3433
import org.apache.kyuubi.cli.HandleIdentifier
35-
import org.apache.kyuubi.operation.OperationHandle
34+
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
3635
import org.apache.kyuubi.server.api.ApiRequestContext
3736
import org.apache.kyuubi.session.SessionHandle
3837

@@ -62,7 +61,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
6261
@GET
6362
@Path("{sessionHandle}")
6463
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): SessionDetail = {
65-
val sessionHandle = getSessionHandle(sessionHandleStr)
64+
val sessionHandle = parseSessionHandle(sessionHandleStr)
6665
try {
6766
val session = backendService.sessionManager.getSession(sessionHandle)
6867
SessionDetail(
@@ -92,7 +91,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
9291
def getInfo(
9392
@PathParam("sessionHandle") sessionHandleStr: String,
9493
@PathParam("infoType") infoType: Int): InfoDetail = {
95-
val sessionHandle = getSessionHandle(sessionHandleStr)
94+
val sessionHandle = parseSessionHandle(sessionHandleStr)
9695
val info = TGetInfoType.findByValue(infoType)
9796

9897
try {
@@ -153,7 +152,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
153152
@DELETE
154153
@Path("{sessionHandle}")
155154
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
156-
val sessionHandle = getSessionHandle(sessionHandleStr)
155+
val sessionHandle = parseSessionHandle(sessionHandleStr)
157156
backendService.closeSession(sessionHandle)
158157
Response.ok().build()
159158
}
@@ -168,7 +167,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
168167
def executeStatement(
169168
@PathParam("sessionHandle") sessionHandleStr: String,
170169
request: StatementRequest): OperationHandle = {
171-
val sessionHandle = getSessionHandle(sessionHandleStr)
170+
val sessionHandle = parseSessionHandle(sessionHandleStr)
172171
try {
173172
backendService.executeStatement(
174173
sessionHandle,
@@ -189,7 +188,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
189188
@POST
190189
@Path("{sessionHandle}/operations/typeInfo")
191190
def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
192-
val sessionHandle = getSessionHandle(sessionHandleStr)
191+
val sessionHandle = parseSessionHandle(sessionHandleStr)
193192
try {
194193
backendService.getTypeInfo(sessionHandle)
195194
} catch {
@@ -206,7 +205,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
206205
@POST
207206
@Path("{sessionHandle}/operations/catalogs")
208207
def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
209-
val sessionHandle = getSessionHandle(sessionHandleStr)
208+
val sessionHandle = parseSessionHandle(sessionHandleStr)
210209
try {
211210
backendService.getCatalogs(sessionHandle)
212211
} catch {
@@ -225,7 +224,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
225224
def getSchemas(
226225
@PathParam("sessionHandle") sessionHandleStr: String,
227226
request: GetSchemasRequest): OperationHandle = {
228-
val sessionHandle = getSessionHandle(sessionHandleStr)
227+
val sessionHandle = parseSessionHandle(sessionHandleStr)
229228
try {
230229
backendService.getSchemas(sessionHandle, request.catalogName, request.schemaName)
231230
} catch {
@@ -244,7 +243,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
244243
def getTables(
245244
@PathParam("sessionHandle") sessionHandleStr: String,
246245
request: GetTablesRequest): OperationHandle = {
247-
val sessionHandle = getSessionHandle(sessionHandleStr)
246+
val sessionHandle = parseSessionHandle(sessionHandleStr)
248247
try {
249248
backendService.getTables(
250249
sessionHandle,
@@ -266,7 +265,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
266265
@POST
267266
@Path("{sessionHandle}/operations/tableTypes")
268267
def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
269-
val sessionHandle = getSessionHandle(sessionHandleStr)
268+
val sessionHandle = parseSessionHandle(sessionHandleStr)
270269
try {
271270
backendService.getTableTypes(sessionHandle)
272271
} catch {
@@ -285,7 +284,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
285284
def getColumns(
286285
@PathParam("sessionHandle") sessionHandleStr: String,
287286
request: GetColumnsRequest): OperationHandle = {
288-
val sessionHandle = getSessionHandle(sessionHandleStr)
287+
val sessionHandle = parseSessionHandle(sessionHandleStr)
289288
try {
290289
backendService.getColumns(
291290
sessionHandle,
@@ -309,7 +308,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
309308
def getFunctions(
310309
@PathParam("sessionHandle") sessionHandleStr: String,
311310
request: GetFunctionsRequest): OperationHandle = {
312-
val sessionHandle = getSessionHandle(sessionHandleStr)
311+
val sessionHandle = parseSessionHandle(sessionHandleStr)
313312
try {
314313
backendService.getFunctions(
315314
sessionHandle,
@@ -322,7 +321,72 @@ private[v1] class SessionsResource extends ApiRequestContext {
322321
}
323322
}
324323

325-
def getSessionHandle(sessionHandleStr: String): SessionHandle = {
324+
@ApiResponse(
325+
responseCode = "200",
326+
content = Array(new Content(
327+
mediaType = MediaType.APPLICATION_JSON)),
328+
description = "Close an operation")
329+
@DELETE
330+
@Path("{sessionHandle}/operations/{operationHandle}")
331+
def closeOperation(
332+
@PathParam("sessionHandle") sessionHandleStr: String,
333+
@PathParam("operationHandle") operationHandleStr: String): OperationHandle = {
334+
val sessionHandle = parseSessionHandle(sessionHandleStr)
335+
val operationHandle = parseOperationHandle(operationHandleStr)
336+
try {
337+
backendService.sessionManager.getSession(sessionHandle).closeOperation(operationHandle)
338+
operationHandle
339+
} catch {
340+
case NonFatal(_) =>
341+
throw new NotFoundException(s"Error closing an operation")
342+
}
343+
}
344+
345+
@ApiResponse(
346+
responseCode = "200",
347+
content = Array(new Content(
348+
mediaType = MediaType.APPLICATION_JSON)),
349+
description =
350+
"Get an operation detail with a given session identifier and operation identifier")
351+
@GET
352+
@Path("{sessionHandle}/operations/{operationHandle}")
353+
def getOperationHandle(
354+
@PathParam("sessionHandle") sessionHandleStr: String,
355+
@PathParam("operationHandle") operationHandleStr: String): OperationDetail = {
356+
val operationHandle = parseOperationHandle(operationHandleStr)
357+
try {
358+
val operation = backendService.sessionManager.operationManager.getOperation(operationHandle)
359+
OperationDetail(operation.shouldRunAsync, operation.isTimedOut, operation.getStatus)
360+
} catch {
361+
case NonFatal(e) =>
362+
throw new NotFoundException(s"Error closing an operation")
363+
}
364+
}
365+
366+
def parseOperationHandle(operationHandleStr: String): OperationHandle = {
367+
try {
368+
val operationHandleParts = operationHandleStr.split("\\|")
369+
require(
370+
operationHandleParts.size == 4,
371+
s"Expected 4 parameters but found ${operationHandleParts.size}.")
372+
373+
val handleIdentifier = new HandleIdentifier(
374+
UUID.fromString(operationHandleParts(0)),
375+
UUID.fromString(operationHandleParts(1)))
376+
377+
val protocolVersion = TProtocolVersion.findByValue(operationHandleParts(2).toInt)
378+
val operationType = OperationType.withName(operationHandleParts(3))
379+
val operationHandle = new OperationHandle(handleIdentifier, operationType, protocolVersion)
380+
381+
operationHandle
382+
} catch {
383+
case NonFatal(e) =>
384+
error(s"Error getting operationHandle by $operationHandleStr.", e)
385+
throw new NotFoundException(s"Error getting operationHandle by $operationHandleStr.")
386+
}
387+
}
388+
389+
def parseSessionHandle(sessionHandleStr: String): SessionHandle = {
326390
try {
327391
val splitSessionHandle = sessionHandleStr.split("\\|")
328392
val handleIdentifier = new HandleIdentifier(

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.server.api.v1
1919

20+
import org.apache.kyuubi.operation.OperationStatus
2021
import org.apache.kyuubi.session.SessionHandle
2122

2223
case class SessionOpenCount(openSessionCount: Int)
@@ -77,3 +78,8 @@ case class GetFunctionsRequest(
7778
catalogName: String,
7879
schemaName: String,
7980
functionName: String)
81+
82+
case class OperationDetail(
83+
shouldRunAsync: Boolean,
84+
isTimedOut: Boolean,
85+
operationStatus: OperationStatus)

kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import javax.ws.rs.core.{MediaType, Response}
2424
import scala.concurrent.duration._
2525

2626
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
27-
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
27+
import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
2828
import org.apache.kyuubi.session.SessionHandle
2929

3030
class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -300,4 +300,92 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
300300
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
301301
}
302302
}
303+
304+
test("test get an operation status by identifier") {
305+
val requestObj = SessionOpenRequest(
306+
1,
307+
"admin",
308+
"123456",
309+
"localhost",
310+
Map("testConfig" -> "testValue"))
311+
312+
withKyuubiRestServer { (_, _, _, webTarget) =>
313+
var response: Response = webTarget.path("api/v1/sessions")
314+
.request(MediaType.APPLICATION_JSON_TYPE)
315+
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
316+
317+
val sessionHandle = response.readEntity(classOf[SessionHandle])
318+
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
319+
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
320+
321+
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
322+
323+
response = webTarget.path(s"$pathPrefix/operations/catalogs")
324+
.request(MediaType.APPLICATION_JSON_TYPE)
325+
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
326+
assert(200 == response.getStatus)
327+
var operationHandle = response.readEntity(classOf[OperationHandle])
328+
assert(operationHandle.typ == OperationType.GET_CATALOGS)
329+
330+
val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
331+
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
332+
s"${operationHandle.typ.toString}"
333+
334+
response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
335+
.request(MediaType.APPLICATION_JSON_TYPE).get()
336+
val operationDetail = response.readEntity(classOf[OperationDetail])
337+
assert(200 == response.getStatus)
338+
assert(operationDetail.operationStatus.state == OperationState.FINISHED)
339+
340+
// Invalid operationHandleStr
341+
val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
342+
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
343+
response = webTarget.path(s"$pathPrefix/operations/$invalidOperationHandle")
344+
.request(MediaType.APPLICATION_JSON_TYPE).get()
345+
assert(404 == response.getStatus)
346+
347+
}
348+
}
349+
350+
test("test close an operation") {
351+
val requestObj = SessionOpenRequest(
352+
1,
353+
"admin",
354+
"123456",
355+
"localhost",
356+
Map("testConfig" -> "testValue"))
357+
358+
withKyuubiRestServer { (_, _, _, webTarget) =>
359+
var response: Response = webTarget.path("api/v1/sessions")
360+
.request(MediaType.APPLICATION_JSON_TYPE)
361+
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
362+
363+
val sessionHandle = response.readEntity(classOf[SessionHandle])
364+
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
365+
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
366+
367+
val pathPrefix = s"api/v1/sessions/$serializedSessionHandle"
368+
369+
response = webTarget.path(s"$pathPrefix/operations/catalogs")
370+
.request(MediaType.APPLICATION_JSON_TYPE)
371+
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
372+
assert(200 == response.getStatus)
373+
var operationHandle = response.readEntity(classOf[OperationHandle])
374+
assert(operationHandle.typ == OperationType.GET_CATALOGS)
375+
376+
val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
377+
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
378+
s"${operationHandle.typ.toString}"
379+
380+
response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
381+
.request(MediaType.APPLICATION_JSON_TYPE).delete()
382+
assert(200 == response.getStatus)
383+
384+
// verify operationHandle
385+
response = webTarget.path(s"$pathPrefix/operations/$serializedOperationHandle")
386+
.request(MediaType.APPLICATION_JSON_TYPE).get()
387+
assert(404 == response.getStatus)
388+
389+
}
390+
}
303391
}

0 commit comments

Comments
 (0)