Skip to content

Commit

Permalink
[KYUUBI #1516] Implement api: /${version}/operations/${operation_iden…
Browse files Browse the repository at this point in the history
…tifier}

### _Why are the changes needed?_
This is a subtask of umbrella issue #KPIP-1

/${version}/operations/${operation_identifier}
- desc: apply an action for an operation based on a given session identifier and operation identifier
- method: PUT
- params:
- action: "CANCEL" or "ADD" or "CLOSE"
- returns: status code

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1517 from simon824/oa.

Closes #1516

94742c6 [simon] fix desc
fffe3e0 [simon] fix desc
f941550 [simon] fix
f455995 [simon] operationAction
72bcf5f [simon] init

Authored-by: simon <zhangshiming@cvte.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
simon824 authored and yaooqinn committed Dec 8, 2021
1 parent 520bd26 commit 81b14ea
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 30 deletions.
Expand Up @@ -18,14 +18,15 @@
package org.apache.kyuubi.server.api.v1

import javax.ws.rs.{GET, Path, PathParam, Produces, _}
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.{MediaType, Response}

import scala.util.control.NonFatal

import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext

Expand All @@ -52,4 +53,30 @@ private[v1] class OperationsResource extends ApiRequestContext {
throw new NotFoundException(s"Error getting an operation detail")
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
"apply an action for an operation")
@PUT
@Path("{operationHandle}")
def applyOpAction(
request: OpActionRequest,
@PathParam("operationHandle") operationHandleStr: String): Response = {
try {
val operationHandle = parseOperationHandle(operationHandleStr)
request.action.toLowerCase() match {
case "cancel" => backendService.cancelOperation(operationHandle)
case "close" => backendService.closeOperation(operationHandle)
case _ => throw KyuubiSQLException(s"Invalid action ${request.action}")
}
Response.ok().build()
} catch {
case NonFatal(_) =>
throw new NotFoundException(s"Error applying ${request.action} " +
s"for operation handle $operationHandleStr")
}
}
}
Expand Up @@ -83,3 +83,5 @@ case class OperationDetail(
shouldRunAsync: Boolean,
isTimedOut: Boolean,
operationStatus: OperationStatus)

case class OpActionRequest(action: String)
Expand Up @@ -17,56 +17,85 @@

package org.apache.kyuubi.server.api.v1

import javax.ws.rs.client.Entity
import javax.ws.rs.client.{Entity, WebTarget}
import javax.ws.rs.core.{MediaType, Response}

import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.operation.{OperationHandle, OperationState, OperationType}
import org.apache.kyuubi.operation.{OperationHandle, OperationState}
import org.apache.kyuubi.session.SessionHandle

class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {

test("test get an operation detail by identifier") {
val requestObj = SessionOpenRequest(
1,
"admin",
"123456",
"localhost",
Map("testConfig" -> "testValue"))

withKyuubiRestServer { (_, _, _, webTarget) =>
var response: Response = webTarget.path("api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
val opHandleStr = getOpHandleStr(webTarget, "catalogs")

var response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE).get()
val operationDetail = response.readEntity(classOf[OperationDetail])
assert(200 == response.getStatus)
assert(operationDetail.operationStatus.state == OperationState.FINISHED)

// Invalid operationHandleStr
val invalidOperationHandle = opHandleStr.replaceAll("GET_CATALOGS", "GET_TYPE_INFO")
response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)

val sessionHandle = response.readEntity(classOf[SessionHandle])
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"
}
}

response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/catalogs")
test("test apply an action for an operation") {
withKyuubiRestServer { (_, _, _, webTarget: WebTarget) =>
val opHandleStr = getOpHandleStr(webTarget, "catalogs")

var response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
.put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
var operationHandle = response.readEntity(classOf[OperationHandle])
assert(operationHandle.typ == OperationType.GET_CATALOGS)

val serializedOperationHandle = s"${operationHandle.identifier.publicId}|" +
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
s"${operationHandle.typ.toString}"

response = webTarget.path(s"api/v1/operations/$serializedOperationHandle")
response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE).get()
val operationDetail = response.readEntity(classOf[OperationDetail])
assert(operationDetail.operationStatus.state == OperationState.FINISHED ||
operationDetail.operationStatus.state == OperationState.CANCELED)

response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.entity(OpActionRequest("close"), MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
assert(operationDetail.operationStatus.state == OperationState.FINISHED)

// Invalid operationHandleStr
val invalidOperationHandle = s"${operationHandle.identifier.publicId}|" +
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|GET_TYPE_INFO"
response = webTarget.path(s"api/v1/operations/$invalidOperationHandle")
response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE).get()
assert(404 == response.getStatus)

}
}

def getOpHandleStr(webTarget: WebTarget, operationType: String): String = {
val requestObj = SessionOpenRequest(
1,
"admin",
"123456",
"localhost",
Map("testConfig" -> "testValue"))

var response: Response = webTarget.path("api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
val sessionHandle = response.readEntity(classOf[SessionHandle])
val serializedSessionHandle = s"${sessionHandle.identifier.publicId}|" +
s"${sessionHandle.identifier.secretId}|${sessionHandle.protocol.getValue}"

response = webTarget.path(s"api/v1/sessions/$serializedSessionHandle/operations/$operationType")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
val operationHandle = response.readEntity(classOf[OperationHandle])

s"${operationHandle.identifier.publicId}|" +
s"${operationHandle.identifier.secretId}|${operationHandle.protocol.getValue}|" +
s"${operationHandle.typ.toString}"

}
}

0 comments on commit 81b14ea

Please sign in to comment.