Skip to content

Commit

Permalink
return more info for batch
Browse files Browse the repository at this point in the history
save

Refine batch

refactor

save

refine

save
  • Loading branch information
turboFei committed Jun 2, 2022
1 parent bb98aa7 commit b3f2cd8
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public Batch getBatchById(String batchId) {
return this.getClient().get(path, null, Batch.class, client.getAuthHeader());
}

public GetBatchesResponse listBatches(String batchType, int from, int size) {
public GetBatchesResponse listBatches(
String batchType, String batchUser, String batchState, int from, int size) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
params.put("from", from);
params.put("size", size);
return this.getClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

public class Batch {
private String id;
private String user;
private String batchType;
private String name;
private Map<String, String> batchInfo;
private String kyuubiInstance;
private String state;
Expand All @@ -32,12 +34,16 @@ public Batch() {}

public Batch(
String id,
String user,
String batchType,
String name,
Map<String, String> batchInfo,
String kyuubiInstance,
String state) {
this.id = id;
this.user = user;
this.batchType = batchType;
this.name = name;
this.batchInfo = batchInfo;
this.kyuubiInstance = kyuubiInstance;
this.state = state;
Expand All @@ -51,6 +57,14 @@ public void setId(String id) {
this.id = id;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getBatchType() {
return batchType;
}
Expand All @@ -59,6 +73,14 @@ public void setBatchType(String batchType) {
this.batchType = batchType;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Map<String, String> getBatchInfo() {
if (null == batchInfo) {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public void testNoPasswordBasicClient() {
Batch result = noPasswordBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
}

Expand All @@ -130,7 +132,9 @@ public void testAnonymousBasicClient() {
Batch result = anonymousBasicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
}

Expand All @@ -153,7 +157,9 @@ public void createBatchTest() {
result = basicBatchRestApi.createBatch(batchRequest);

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
}

Expand All @@ -176,7 +182,9 @@ public void getBatchByIdTest() {
result = basicBatchRestApi.getBatchById("71535");

assertEquals(result.getId(), expectedBatch.getId());
assertEquals(result.getUser(), expectedBatch.getUser());
assertEquals(result.getBatchType(), expectedBatch.getBatchType());
assertEquals(result.getName(), expectedBatch.getName());
assertEquals(result.getState(), expectedBatch.getState());
}

Expand All @@ -186,7 +194,8 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);

GetBatchesResponse expectedBatchesInfo = generateTestBatchesResponse();
GetBatchesResponse result = spnegoBatchRestApi.listBatches("spark", 0, 10);
GetBatchesResponse result =
spnegoBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand All @@ -196,7 +205,7 @@ public void getBatchInfoListTest() {
BatchTestServlet.setAuthSchema(BASIC_AUTH);
BatchTestServlet.allowAnonymous(false);

result = basicBatchRestApi.listBatches("spark", 0, 10);
result = basicBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0, 10);

assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size());
assertEquals(expectedBatchesInfo.getFrom(), result.getFrom());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public static Batch generateTestBatch(String id) {
+ "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar");
batchInfo.put("state", "RUNNING");

Batch batch = new Batch(id, "spark", batchInfo, "192.168.31.130:64573", "RUNNING");
Batch batch =
new Batch(
id, TEST_USERNAME, "spark", "batch_name", batchInfo, "192.168.31.130:64573", "RUNNING");

return batch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.Logging
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.operation.{FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource.{supportedBatchType, REST_BATCH_PROTOCOL, SUPPORTED_BATCH_TYPES}
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.http.authentication.AuthenticationFilter
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle}
Expand All @@ -52,7 +52,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val batchOp = session.batchJobSubmissionOp
new Batch(
batchOp.batchId,
session.user,
batchOp.batchType,
batchOp.batchName,
batchOp.currentApplicationState.getOrElse(Map.empty).asJava,
fe.connectionUrl,
batchOp.getStatus.state.toString)
Expand Down Expand Up @@ -106,14 +108,23 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[GetBatchesResponse]))),
description = "returns the active batch sessions")
description = "returns the batch sessions. Supports optional use of batchType, batchUser" +
" and batchState to filter batches. The valid batchState can be one of the following:" +
" PENDING,RUNNING,FINISHED,ERROR,CANCELED.")
@GET
@Consumes(Array(MediaType.APPLICATION_JSON))
def getBatchInfoList(
@QueryParam("batchType") batchType: String,
@QueryParam("batchState") batchState: String,
@QueryParam("batchUser") batchUser: String,
@QueryParam("from") from: Int,
@QueryParam("size") size: Int): GetBatchesResponse = {
val batches = sessionManager.getBatchesByType(batchType, from, size)
if (batchState != null) {
require(
validBatchState(batchState),
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}
val batches = sessionManager.getBatches(batchType, batchUser, batchState, from, size)
new GetBatchesResponse(from, batches.size, batches.asJava)
}

Expand Down Expand Up @@ -192,8 +203,18 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
object BatchesResource {
val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11
val SUPPORTED_BATCH_TYPES = Seq("SPARK")
val VALID_BATCH_STATES = Seq(
OperationState.PENDING,
OperationState.RUNNING,
OperationState.FINISHED,
OperationState.ERROR,
OperationState.CANCELED).map(_.toString)

def supportedBatchType(batchType: String): Boolean = {
Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT)))
}

def validBatchState(batchState: String): Boolean = {
Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ class SessionStateStore extends AbstractService("SessionStateStore") {
Option(_stateStore.getMetadata(batchId, true)).map(buildBatch).orNull
}

def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = {
def getBatches(
batchType: String,
batchUser: String,
batchState: String,
from: Int,
size: Int): Seq[Batch] = {
_stateStore.getMetadataList(
SessionType.BATCH,
batchType,
null,
null,
batchUser,
batchState,
null,
from,
size,
Expand Down Expand Up @@ -112,7 +117,9 @@ class SessionStateStore extends AbstractService("SessionStateStore") {

new Batch(
batchMetadata.identifier,
batchMetadata.username,
batchMetadata.engineType,
batchMetadata.requestName,
batchAppInfo.asJava,
batchMetadata.kyuubiInstance,
batchMetadata.state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,17 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging {
params += userName
}
Option(state).filter(_.nonEmpty).foreach { _ =>
whereConditions += " STATE = ? "
params += state
whereConditions += " state = ? "
params += state.toUpperCase(Locale.ROOT)
}
Option(kyuubiInstance).filter(_.nonEmpty).foreach { _ =>
whereConditions += " KYUUBI_INSTANCE = ? "
whereConditions += " kyuubi_instance = ? "
params += kyuubiInstance
}
if (whereConditions.nonEmpty) {
queryBuilder.append(whereConditions.mkString(" WHERE ", " AND ", " "))
}
queryBuilder.append(" ORDER BY KEY_ID ")
queryBuilder.append(" ORDER BY key_id ")
val query = databaseAdaptor.addLimitAndOffsetToQuery(queryBuilder.toString(), size, from)
withConnection() { connection =>
withResultSet(connection, query, params: _*) { rs =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,13 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
sessionStateStore.getBatch(batchId)
}

def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = {
sessionStateStore.getBatchesByType(batchType, from, size)
def getBatches(
batchType: String,
batchUser: String,
batchState: String,
from: Int,
size: Int): Seq[Batch] = {
sessionStateStore.getBatches(batchType, batchUser, batchState, from, size)
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
sessionManager.allSessions().foreach { session =>
sessionManager.closeSession(session.handle)
}
sessionManager.getBatchesByType(null, 0, Int.MaxValue).foreach { batch =>
sessionManager.getBatches(null, null, null, 0, Int.MaxValue).foreach { batch =>
sessionManager.applicationManager.killApplication(None, batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
Expand All @@ -71,6 +71,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
var batch = response.readEntity(classOf[Batch])
assert(batch.getKyuubiInstance === fe.connectionUrl)
assert(batch.getBatchType === "SPARK")
assert(batch.getName === appName)

requestObj.setConf((requestObj.getConf.asScala ++
Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root")).asJava)
Expand All @@ -87,6 +88,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
batch = getBatchResponse.readEntity(classOf[Batch])
assert(batch.getKyuubiInstance === fe.connectionUrl)
assert(batch.getBatchType === "SPARK")
assert(batch.getName === appName)

// invalid batchId
getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ class BatchRestApiSuite extends RestClientTestHelper {
// delete batch
batchRestApi.deleteBatch(batch.getId(), null)

// list batches
var listBatchesResp = batchRestApi.listBatches("SPARK", null, null, 0, Int.MaxValue)
assert(listBatchesResp.getTotal > 0)

// list batches with non-existing user
listBatchesResp = batchRestApi.listBatches("SPARK", "non_existing_user", null, 0, Int.MaxValue)
assert(listBatchesResp.getTotal == 0)

// list batches with invalid batch state
intercept[KyuubiRestException] {
batchRestApi.listBatches("SPARK", null, "BAD_STATE", 0, Int.MaxValue)
}

spnegoKyuubiRestClient.close()
}
}

0 comments on commit b3f2cd8

Please sign in to comment.