Skip to content

Commit

Permalink
[KYUUBI #1691] Implement api: /${version}/operations/${operation_iden…
Browse files Browse the repository at this point in the history
…tifier}/rowset

### _Why are the changes needed?_
closes #1691
/${version}/operations/${operation_identifier}/rowset?fetchorientation=${fetchorientation}&maxrows=${maxrows}
mapping: ICLIService#fetchResults
desc: get the row set via the given operation identifier
method: GET
params: none
returns: an instance of RowSet

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1692 from simon824/getNextRowSet.

Closes #1691

611aafe [simon] desc
e6712b1 [simon] getNextRowSet

Authored-by: simon <zhangshiming@cvte.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
simon824 authored and yaooqinn committed Jan 7, 2022
1 parent e886524 commit c43d3a7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.hive.service.rpc.thrift.TTypeQualifierValue
import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.events.KyuubiOperationEvent
Expand Down Expand Up @@ -146,4 +146,51 @@ private[v1] class OperationsResource extends ApiRequestContext {
s"Error getting operation log for operation handle $operationHandleStr")
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
"get result row set")
@GET
@Path("{operationHandle}/rowset")
def getNextRowSet(
@PathParam("operationHandle") operationHandleStr: String,
@QueryParam("maxrows") maxRows: Int,
@QueryParam("fetchorientation") fetchOrientation: String): ResultRowSet = {
try {
val rowSet = fe.be.sessionManager.operationManager.getOperationNextRowSet(
parseOperationHandle(operationHandleStr),
FetchOrientation.withName(fetchOrientation),
maxRows)
val rows = rowSet.getRows.asScala.map(i => {
Row(i.getColVals.asScala.map(i => {
Field(
i.getSetField.name(),
i.getSetField match {
case TColumnValue._Fields.STRING_VAL =>
i.getStringVal.getFieldValue(TStringValue._Fields.VALUE)
case TColumnValue._Fields.BOOL_VAL =>
i.getBoolVal.getFieldValue(TBoolValue._Fields.VALUE)
case TColumnValue._Fields.BYTE_VAL =>
i.getByteVal.getFieldValue(TByteValue._Fields.VALUE)
case TColumnValue._Fields.DOUBLE_VAL =>
i.getDoubleVal.getFieldValue(TDoubleValue._Fields.VALUE)
case TColumnValue._Fields.I16_VAL =>
i.getI16Val.getFieldValue(TI16Value._Fields.VALUE)
case TColumnValue._Fields.I32_VAL =>
i.getI32Val.getFieldValue(TI32Value._Fields.VALUE)
case TColumnValue._Fields.I64_VAL =>
i.getI64Val.getFieldValue(TI64Value._Fields.VALUE)
})
}))
})
ResultRowSet(rows, rows.size)
} catch {
case NonFatal(_) =>
throw new NotFoundException(
s"Error getting result row set for operation handle $operationHandleStr")
}
}
}
Expand Up @@ -81,3 +81,9 @@ case class ColumnDesc(
comment: String)

case class OperationLog(logRowSet: Seq[String], rowCount: Int)

case class ResultRowSet(rows: Seq[Row], rowCount: Int)

case class Row(fields: Seq[Field])

case class Field(dataType: String, value: Any)
Expand Up @@ -62,7 +62,6 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
val opHandleStr = s"${op.getHandle.identifier.publicId}|" +
s"${op.getHandle.identifier.secretId}|${op.getHandle.protocol.getValue}|" +
s"${op.getHandle.typ.toString}"

var response = webTarget.path(s"api/v1/operations/$opHandleStr")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.entity(OpActionRequest("cancel"), MediaType.APPLICATION_JSON_TYPE))
Expand Down Expand Up @@ -101,7 +100,22 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
assert(logRowSet.rowCount === 10)
}

def getOpHandleStr(typ: OperationType): String = {
test("test get result row set") {
val opHandleStr =
getOpHandleStr(OperationType.EXECUTE_STATEMENT, "select \"test\", 1, 0.32d, true")
checkOpState(opHandleStr, FINISHED)
val response = webTarget.path(
s"api/v1/operations/$opHandleStr/rowset")
.queryParam("maxrows", "2")
.queryParam("fetchorientation", "FETCH_NEXT")
.request(MediaType.APPLICATION_JSON).get()
assert(200 == response.getStatus)
val logRowSet = response.readEntity(classOf[ResultRowSet])
assert("test".equals(logRowSet.rows.head.fields.head.value))
assert(logRowSet.rowCount == 1)
}

def getOpHandleStr(typ: OperationType, statement: String = "show tables"): String = {
val sessionHandle = fe.be.openSession(
HIVE_CLI_SERVICE_PROTOCOL_V2,
"admin",
Expand All @@ -111,7 +125,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper

val op = typ match {
case OperationType.EXECUTE_STATEMENT =>
fe.be.executeStatement(sessionHandle, "show tables", runAsync = true, 3000)
fe.be.executeStatement(sessionHandle, statement, runAsync = true, 3000)
case OperationType.GET_CATALOGS => fe.be.getCatalogs(sessionHandle)
}

Expand Down

0 comments on commit c43d3a7

Please sign in to comment.