Skip to content

Commit

Permalink
[KYUUBI #2812] [SUB-TASK][KPIP-4] Refine the batch info response
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Return more batch info for insight: `user`, `name`, `createTime` and `endTime`.

Support to list batches with filter conditions:
- batchState
- batchUser
- createTime
- endTime

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2812 from turboFei/return_more_info.

Closes #2812

972fe88 [Fei Wang] comment
da2ea29 [Fei Wang] return more info for batch

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
turboFei authored and ulysses-you committed Jun 6, 2022
1 parent c56a13a commit 0cb6f16
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 48 deletions.
Expand Up @@ -141,12 +141,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
})
}

def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {
Option(handleToSession.get(sessionHandle))
}

def getSession(sessionHandle: SessionHandle): Session = {
val session = handleToSession.get(sessionHandle)
if (session == null) {
throw KyuubiSQLException(s"Invalid $sessionHandle")
}
session
getSessionOption(sessionHandle).getOrElse(throw KyuubiSQLException(s"Invalid $sessionHandle"))
}

final protected def setSession(sessionHandle: SessionHandle, session: Session): Unit = {
Expand Down
Expand Up @@ -48,9 +48,24 @@ public Batch getBatchById(String batchId) {
return this.getClient().get(path, null, Batch.class, client.getAuthHeader());
}

public GetBatchesResponse listBatches(String batchType, int from, int size) {
public GetBatchesResponse listBatches(
String batchType,
String batchUser,
String batchState,
Long createTime,
Long endTime,
int from,
int size) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
if (null != createTime && createTime >= 0) {
params.put("createTime", createTime);
}
if (null != endTime && endTime >= 0) {
params.put("endTime", endTime);
}
params.put("from", from);
params.put("size", size);
return this.getClient()
Expand Down
Expand Up @@ -23,24 +23,36 @@

public class Batch {
private String id;
private String user;
private String batchType;
private String name;
private Map<String, String> batchInfo;
private String kyuubiInstance;
private String state;
private long createTime;
private long endTime;

public Batch() {}

public Batch(
String id,
String user,
String batchType,
String name,
Map<String, String> batchInfo,
String kyuubiInstance,
String state) {
String state,
long createTime,
long endTime) {
this.id = id;
this.user = user;
this.batchType = batchType;
this.name = name;
this.batchInfo = batchInfo;
this.kyuubiInstance = kyuubiInstance;
this.state = state;
this.createTime = createTime;
this.endTime = endTime;
}

public String getId() {
Expand All @@ -51,6 +63,14 @@ public void setId(String id) {
this.id = id;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getBatchType() {
return batchType;
}
Expand All @@ -59,6 +79,14 @@ public void setBatchType(String batchType) {
this.batchType = batchType;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Map<String, String> getBatchInfo() {
if (null == batchInfo) {
return Collections.emptyMap();
Expand Down Expand Up @@ -86,6 +114,22 @@ public void setState(String state) {
this.state = state;
}

public long getCreateTime() {
return createTime;
}

public void setCreateTime(long createTime) {
this.createTime = createTime;
}

public long getEndTime() {
return endTime;
}

public void setEndTime(long endTime) {
this.endTime = endTime;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Expand Up @@ -111,8 +111,12 @@ public void testNoPasswordBasicClient() {
Batch result = noPasswordBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -131,8 +135,12 @@ public void testAnonymousBasicClient() {
Batch result = anonymousBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -154,8 +162,12 @@ public void createBatchTest() {
result = basicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -177,8 +189,12 @@ public void getBatchByIdTest() {
result = basicBatchRestApi.getBatchById("71535");

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -187,7 +203,8 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);

GetBatchesResponse expectedBatchesInfo = generateTestBatchesResponse();
GetBatchesResponse result = spnegoBatchRestApi.listBatches("spark", 0, 10);
GetBatchesResponse result =
spnegoBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0L, 0L, 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand All @@ -197,7 +214,7 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(BASIC_AUTH);
BatchTestServlet.allowAnonymous(false);

result = basicBatchRestApi.listBatches("spark", 0, 10);
result = basicBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", null, null, 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand Down
Expand Up @@ -35,6 +35,8 @@ public class RestClientTestUtil {
public static final String TEST_USERNAME = "test_user";
public static final String TEST_PASSWORD = "test_password";

public static final Long BATCH_CREATE_TIME = System.currentTimeMillis();

public static Batch generateTestBatch() {
return generateTestBatch("71535");
}
Expand All @@ -58,7 +60,17 @@ public static Batch generateTestBatch(String id) {
+ "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar");
batchInfo.put("state", "RUNNING");

Batch batch = new Batch(id, "spark", batchInfo, "192.168.31.130:64573", "RUNNING");
Batch batch =
new Batch(
id,
TEST_USERNAME,
"spark",
"batch_name",
batchInfo,
"192.168.31.130:64573",
"RUNNING",
BATCH_CREATE_TIME,
0);

return batch;
}
Expand Down
Expand Up @@ -31,9 +31,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.Logging
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.operation.{FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource.{supportedBatchType, REST_BATCH_PROTOCOL, SUPPORTED_BATCH_TYPES}
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
Expand All @@ -44,18 +44,19 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {

private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]

private def buildBatch(sessionHandle: SessionHandle): Batch = {
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
}

private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
new Batch(
batchOp.batchId,
session.user,
batchOp.batchType,
batchOp.batchName,
batchOp.currentApplicationState.getOrElse(Map.empty).asJava,
fe.connectionUrl,
batchOp.getStatus.state.toString)
batchOpStatus.state.toString,
session.createTime,
batchOpStatus.completed)
}

@ApiResponse(
Expand Down Expand Up @@ -83,7 +84,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
ipAddress,
request.getConf.asScala.toMap,
request)
buildBatch(sessionHandle)
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
}

@ApiResponse(
Expand All @@ -95,9 +96,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@GET
@Path("{batchId}")
def batchInfo(@PathParam("batchId") batchId: String): Batch = {
Option(sessionManager.getBatch(batchId)).getOrElse {
error(s"Invalid batchId: $batchId")
throw new NotFoundException(s"Invalid batchId: $batchId")
val sessionHandle = normalizedBatchSessionHandle(batchId)
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
buildBatch(batchSession)
}.getOrElse {
Option(sessionManager.getBatchFromStateStore(batchId)).getOrElse {
error(s"Invalid batchId: $batchId")
throw new NotFoundException(s"Invalid batchId: $batchId")
}
}
}

Expand All @@ -106,14 +112,34 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[GetBatchesResponse]))),
description = "returns the active batch sessions")
description = "returns the batch sessions.")
@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
def getBatchInfoList(
@QueryParam("batchType") batchType: String,
@QueryParam("batchState") batchState: String,
@QueryParam("batchUser") batchUser: String,
@QueryParam("createTime") createTime: Long,
@QueryParam("endTime") endTime: Long,
@QueryParam("from") from: Int,
@QueryParam("size") size: Int): GetBatchesResponse = {
val batches = sessionManager.getBatchesByType(batchType, from, size)
require(
createTime >= 0 && endTime >= 0 && (endTime == 0 || createTime <= endTime),
"Invalid time range")
if (batchState != null) {
require(
validBatchState(batchState),
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}
val batches =
sessionManager.getBatchesFromStateStore(
batchType,
batchUser,
batchState,
createTime,
endTime,
from,
size)
new GetBatchesResponse(from, batches.size, batches.asJava)
}

Expand All @@ -129,10 +155,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@PathParam("batchId") batchId: String,
@QueryParam("from") @DefaultValue("-1") from: Int,
@QueryParam("size") size: Int): OperationLog = {
val sessionHandle = normalizedBatchSessionHandle(batchId)
try {
val submissionOpt = sessionManager.getBatchSessionImpl(batchId, REST_BATCH_PROTOCOL)
.batchJobSubmissionOp
val rowSet = submissionOpt.getOperationLogRowSet(
val submissionOp = sessionManager.getBatchSessionImpl(sessionHandle).batchJobSubmissionOp
val rowSet = submissionOp.getOperationLogRowSet(
FetchOrientation.FETCH_NEXT,
from,
size)
Expand All @@ -157,9 +183,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {
val sessionHandle = normalizedBatchSessionHandle(batchId)
var session: KyuubiBatchSessionImpl = null
try {
val sessionHandle = sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -187,13 +213,33 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val (success, msg) = session.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(success, msg)
}

private def normalizedBatchSessionHandle(batchId: String): SessionHandle = {
try {
sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
} catch {
case NonFatal(e) =>
error(s"Invalid batchId: $batchId", e)
throw new NotFoundException(s"Invalid batchId: $batchId")
}
}
}

object BatchesResource {
val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
val SUPPORTED_BATCH_TYPES = Seq("SPARK")
val VALID_BATCH_STATES = Seq(
OperationState.PENDING,
OperationState.RUNNING,
OperationState.FINISHED,
OperationState.ERROR,
OperationState.CANCELED).map(_.toString)

def supportedBatchType(batchType: String): Boolean = {
Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
}

def validBatchState(batchState: String): Boolean = {
Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
}

0 comments on commit 0cb6f16

Please sign in to comment.