Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUB-TASK][KPIP-4] Refine the batch info response #2812

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
})
}

def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {
Option(handleToSession.get(sessionHandle))
}

def getSession(sessionHandle: SessionHandle): Session = {
val session = handleToSession.get(sessionHandle)
if (session == null) {
throw KyuubiSQLException(s"Invalid $sessionHandle")
}
session
getSessionOption(sessionHandle).getOrElse(throw KyuubiSQLException(s"Invalid $sessionHandle"))
}

final protected def setSession(sessionHandle: SessionHandle, session: Session): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,24 @@ public Batch getBatchById(String batchId) {
return this.getClient().get(path, null, Batch.class, client.getAuthHeader());
}

public GetBatchesResponse listBatches(String batchType, int from, int size) {
public GetBatchesResponse listBatches(
String batchType,
String batchUser,
String batchState,
Long createTime,
Long endTime,
int from,
int size) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
turboFei marked this conversation as resolved.
Show resolved Hide resolved
if (null != createTime && createTime >= 0) {
params.put("createTime", createTime);
}
if (null != endTime && endTime >= 0) {
params.put("endTime", endTime);
}
params.put("from", from);
params.put("size", size);
return this.getClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,36 @@

public class Batch {
private String id;
private String user;
private String batchType;
private String name;
private Map<String, String> batchInfo;
private String kyuubiInstance;
private String state;
private long createTime;
private long endTime;

public Batch() {}

public Batch(
String id,
String user,
String batchType,
String name,
Map<String, String> batchInfo,
String kyuubiInstance,
String state) {
String state,
long createTime,
long endTime) {
this.id = id;
this.user = user;
this.batchType = batchType;
this.name = name;
this.batchInfo = batchInfo;
this.kyuubiInstance = kyuubiInstance;
this.state = state;
this.createTime = createTime;
this.endTime = endTime;
}

public String getId() {
Expand All @@ -51,6 +63,14 @@ public void setId(String id) {
this.id = id;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getBatchType() {
return batchType;
}
Expand All @@ -59,6 +79,14 @@ public void setBatchType(String batchType) {
this.batchType = batchType;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Map<String, String> getBatchInfo() {
if (null == batchInfo) {
return Collections.emptyMap();
Expand Down Expand Up @@ -86,6 +114,22 @@ public void setState(String state) {
this.state = state;
}

public long getCreateTime() {
return createTime;
}

public void setCreateTime(long createTime) {
this.createTime = createTime;
}

public long getEndTime() {
return endTime;
}

public void setEndTime(long endTime) {
this.endTime = endTime;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ public void testNoPasswordBasicClient() {
Batch result = noPasswordBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -131,8 +135,12 @@ public void testAnonymousBasicClient() {
Batch result = anonymousBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -154,8 +162,12 @@ public void createBatchTest() {
result = basicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -177,8 +189,12 @@ public void getBatchByIdTest() {
result = basicBatchRestApi.getBatchById("71535");

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
assertEquals(result.getCreateTime(), expectedBatch.getCreateTime());
assertEquals(result.getEndTime(), expectedBatch.getEndTime());
}

@Test
Expand All @@ -187,7 +203,8 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);

GetBatchesResponse expectedBatchesInfo = generateTestBatchesResponse();
GetBatchesResponse result = spnegoBatchRestApi.listBatches("spark", 0, 10);
GetBatchesResponse result =
spnegoBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0L, 0L, 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand All @@ -197,7 +214,7 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(BASIC_AUTH);
BatchTestServlet.allowAnonymous(false);

result = basicBatchRestApi.listBatches("spark", 0, 10);
result = basicBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", null, null, 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class RestClientTestUtil {
public static final String TEST_USERNAME = "test_user";
public static final String TEST_PASSWORD = "test_password";

public static final Long BATCH_CREATE_TIME = System.currentTimeMillis();

public static Batch generateTestBatch() {
return generateTestBatch("71535");
}
Expand All @@ -58,7 +60,17 @@ public static Batch generateTestBatch(String id) {
+ "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar");
batchInfo.put("state", "RUNNING");

Batch batch = new Batch(id, "spark", batchInfo, "192.168.31.130:64573", "RUNNING");
Batch batch =
new Batch(
id,
TEST_USERNAME,
"spark",
"batch_name",
batchInfo,
"192.168.31.130:64573",
"RUNNING",
BATCH_CREATE_TIME,
0);

return batch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.Logging
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.operation.{FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource.{supportedBatchType, REST_BATCH_PROTOCOL, SUPPORTED_BATCH_TYPES}
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
Expand All @@ -44,18 +44,19 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {

private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]

private def buildBatch(sessionHandle: SessionHandle): Batch = {
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
}

private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
new Batch(
batchOp.batchId,
session.user,
batchOp.batchType,
batchOp.batchName,
batchOp.currentApplicationState.getOrElse(Map.empty).asJava,
fe.connectionUrl,
batchOp.getStatus.state.toString)
batchOpStatus.state.toString,
session.createTime,
batchOpStatus.completed)
}

@ApiResponse(
Expand Down Expand Up @@ -83,7 +84,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
ipAddress,
request.getConf.asScala.toMap,
request)
buildBatch(sessionHandle)
buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
}

@ApiResponse(
Expand All @@ -95,9 +96,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@GET
@Path("{batchId}")
def batchInfo(@PathParam("batchId") batchId: String): Batch = {
Option(sessionManager.getBatch(batchId)).getOrElse {
error(s"Invalid batchId: $batchId")
throw new NotFoundException(s"Invalid batchId: $batchId")
val sessionHandle = normalizedBatchSessionHandle(batchId)
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
buildBatch(batchSession)
}.getOrElse {
Option(sessionManager.getBatchFromStateStore(batchId)).getOrElse {
error(s"Invalid batchId: $batchId")
throw new NotFoundException(s"Invalid batchId: $batchId")
}
}
}

Expand All @@ -106,14 +112,34 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[GetBatchesResponse]))),
description = "returns the active batch sessions")
description = "returns the batch sessions.")
@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
def getBatchInfoList(
@QueryParam("batchType") batchType: String,
@QueryParam("batchState") batchState: String,
@QueryParam("batchUser") batchUser: String,
@QueryParam("createTime") createTime: Long,
@QueryParam("endTime") endTime: Long,
@QueryParam("from") from: Int,
@QueryParam("size") size: Int): GetBatchesResponse = {
val batches = sessionManager.getBatchesByType(batchType, from, size)
require(
createTime >= 0 && endTime >= 0 && (endTime == 0 || createTime <= endTime),
"Invalid time range")
if (batchState != null) {
require(
validBatchState(batchState),
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}
val batches =
sessionManager.getBatchesFromStateStore(
batchType,
batchUser,
batchState,
createTime,
endTime,
from,
size)
new GetBatchesResponse(from, batches.size, batches.asJava)
}

Expand All @@ -129,10 +155,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@PathParam("batchId") batchId: String,
@QueryParam("from") @DefaultValue("-1") from: Int,
@QueryParam("size") size: Int): OperationLog = {
val sessionHandle = normalizedBatchSessionHandle(batchId)
try {
val submissionOpt = sessionManager.getBatchSessionImpl(batchId, REST_BATCH_PROTOCOL)
.batchJobSubmissionOp
val rowSet = submissionOpt.getOperationLogRowSet(
val submissionOp = sessionManager.getBatchSessionImpl(sessionHandle).batchJobSubmissionOp
val rowSet = submissionOp.getOperationLogRowSet(
FetchOrientation.FETCH_NEXT,
from,
size)
Expand All @@ -157,9 +183,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {
val sessionHandle = normalizedBatchSessionHandle(batchId)
var session: KyuubiBatchSessionImpl = null
try {
val sessionHandle = sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -187,13 +213,33 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val (success, msg) = session.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(success, msg)
}

private def normalizedBatchSessionHandle(batchId: String): SessionHandle = {
try {
sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
} catch {
case NonFatal(e) =>
error(s"Invalid batchId: $batchId", e)
throw new NotFoundException(s"Invalid batchId: $batchId")
}
}
}

object BatchesResource {
val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
val SUPPORTED_BATCH_TYPES = Seq("SPARK")
val VALID_BATCH_STATES = Seq(
OperationState.PENDING,
OperationState.RUNNING,
OperationState.FINISHED,
OperationState.ERROR,
OperationState.CANCELED).map(_.toString)

def supportedBatchType(batchType: String): Boolean = {
Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
}

def validBatchState(batchState: String): Boolean = {
Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
}