From b3f2cd864fa8ff6ec608ba91a90161f712c26176 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Thu, 2 Jun 2022 21:15:39 +0800 Subject: [PATCH] return more info for batch save Refine batch refactor save refine save --- .../apache/kyuubi/client/BatchRestApi.java | 5 +++- .../kyuubi/client/api/v1/dto/Batch.java | 22 ++++++++++++++ .../kyuubi/client/BatchRestClientTest.java | 13 +++++++-- .../kyuubi/client/RestClientTestUtil.java | 4 ++- .../server/api/v1/BatchesResource.scala | 29 ++++++++++++++++--- .../server/statestore/SessionStateStore.scala | 13 +++++++-- .../statestore/jdbc/JDBCStateStore.scala | 8 ++--- .../kyuubi/session/KyuubiSessionManager.scala | 9 ++++-- .../server/api/v1/BatchesResourceSuite.scala | 4 ++- .../rest/client/BatchRestApiSuite.scala | 13 +++++++++ 10 files changed, 102 insertions(+), 18 deletions(-) diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java index 94a7287b676..066413e279a 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java @@ -47,9 +47,12 @@ public Batch getBatchById(String batchId) { return this.getClient().get(path, null, Batch.class, client.getAuthHeader()); } - public GetBatchesResponse listBatches(String batchType, int from, int size) { + public GetBatchesResponse listBatches( + String batchType, String batchUser, String batchState, int from, int size) { Map params = new HashMap<>(); params.put("batchType", batchType); + params.put("batchUser", batchUser); + params.put("batchState", batchState); params.put("from", from); params.put("size", size); return this.getClient() diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java index e318279e3e3..83369b8ccd8 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java @@ -23,7 +23,9 @@ public class Batch { private String id; + private String user; private String batchType; + private String name; private Map batchInfo; private String kyuubiInstance; private String state; @@ -32,12 +34,16 @@ public Batch() {} public Batch( String id, + String user, String batchType, + String name, Map batchInfo, String kyuubiInstance, String state) { this.id = id; + this.user = user; this.batchType = batchType; + this.name = name; this.batchInfo = batchInfo; this.kyuubiInstance = kyuubiInstance; this.state = state; @@ -51,6 +57,14 @@ public void setId(String id) { this.id = id; } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + public String getBatchType() { return batchType; } @@ -59,6 +73,14 @@ public void setBatchType(String batchType) { this.batchType = batchType; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + public Map getBatchInfo() { if (null == batchInfo) { return Collections.emptyMap(); diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java index 3fa05e9f800..7ec8a5ea408 100644 --- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java +++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java @@ -110,7 +110,9 @@ public void testNoPasswordBasicClient() { Batch result = noPasswordBasicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); } @@ -130,7 +132,9 @@ public void testAnonymousBasicClient() { Batch result = anonymousBasicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); } @@ -153,7 +157,9 @@ public void createBatchTest() { result = basicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); } @@ -176,7 +182,9 @@ public void getBatchByIdTest() { result = basicBatchRestApi.getBatchById("71535"); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); } @@ -186,7 +194,8 @@ public void getBatchInfoListTest() { BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH); GetBatchesResponse expectedBatchesInfo = generateTestBatchesResponse(); - GetBatchesResponse result = spnegoBatchRestApi.listBatches("spark", 0, 10); + GetBatchesResponse result = + spnegoBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0, 10); assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size()); assertEquals(expectedBatchesInfo.getFrom(), result.getFrom()); @@ -196,7 +205,7 @@ public void getBatchInfoListTest() { BatchTestServlet.setAuthSchema(BASIC_AUTH); BatchTestServlet.allowAnonymous(false); - result = basicBatchRestApi.listBatches("spark", 0, 10); + result = basicBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0, 10); assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size()); assertEquals(expectedBatchesInfo.getFrom(), result.getFrom()); diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java index 88d0d5441de..18b14662ad1 100644 --- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java +++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java @@ -53,7 +53,9 @@ public static Batch generateTestBatch(String id) { + "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar"); batchInfo.put("state", "RUNNING"); - Batch batch = new Batch(id, "spark", batchInfo, "192.168.31.130:64573", "RUNNING"); + Batch batch = + new Batch( + id, TEST_USERNAME, "spark", "batch_name", batchInfo, "192.168.31.130:64573", "RUNNING"); return batch; } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index ba53e14d594..6c2d005baa3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -31,9 +31,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.Logging import org.apache.kyuubi.client.api.v1.dto._ -import org.apache.kyuubi.operation.FetchOrientation +import org.apache.kyuubi.operation.{FetchOrientation, OperationState} import org.apache.kyuubi.server.api.ApiRequestContext -import org.apache.kyuubi.server.api.v1.BatchesResource.{supportedBatchType, REST_BATCH_PROTOCOL, SUPPORTED_BATCH_TYPES} +import org.apache.kyuubi.server.api.v1.BatchesResource._ import org.apache.kyuubi.server.http.authentication.AuthenticationFilter import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle} @@ -52,7 +52,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val batchOp = session.batchJobSubmissionOp new Batch( batchOp.batchId, + session.user, batchOp.batchType, + batchOp.batchName, batchOp.currentApplicationState.getOrElse(Map.empty).asJava, fe.connectionUrl, batchOp.getStatus.state.toString) @@ -106,14 +108,23 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { content = Array(new Content( mediaType = MediaType.APPLICATION_JSON, schema = new Schema(implementation = classOf[GetBatchesResponse]))), - description = "returns the active batch sessions") + description = "returns the batch sessions. Supports optional use of batchType, batchUser" + + " and batchState to filter batches. The valid batchState can be one of the following:" + + " PENDING,RUNNING,FINISHED,ERROR,CANCELED.") @GET @Consumes(Array(MediaType.APPLICATION_JSON)) def getBatchInfoList( @QueryParam("batchType") batchType: String, + @QueryParam("batchState") batchState: String, + @QueryParam("batchUser") batchUser: String, @QueryParam("from") from: Int, @QueryParam("size") size: Int): GetBatchesResponse = { - val batches = sessionManager.getBatchesByType(batchType, from, size) + if (batchState != null) { + require( + validBatchState(batchState), + s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}") + } + val batches = sessionManager.getBatches(batchType, batchUser, batchState, from, size) new GetBatchesResponse(from, batches.size, batches.asJava) } @@ -192,8 +203,18 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { object BatchesResource { val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11 val SUPPORTED_BATCH_TYPES = Seq("SPARK") + val VALID_BATCH_STATES = Seq( + OperationState.PENDING, + OperationState.RUNNING, + OperationState.FINISHED, + OperationState.ERROR, + OperationState.CANCELED).map(_.toString) def supportedBatchType(batchType: String): Boolean = { Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT))) } + + def validBatchState(batchState: String): Boolean = { + Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT))) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala index 40b3b281fc5..42775b6fa6f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala @@ -62,12 +62,17 @@ class SessionStateStore extends AbstractService("SessionStateStore") { Option(_stateStore.getMetadata(batchId, true)).map(buildBatch).orNull } - def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = { + def getBatches( + batchType: String, + batchUser: String, + batchState: String, + from: Int, + size: Int): Seq[Batch] = { _stateStore.getMetadataList( SessionType.BATCH, batchType, - null, - null, + batchUser, + batchState, null, from, size, @@ -112,7 +117,9 @@ class SessionStateStore extends AbstractService("SessionStateStore") { new Batch( batchMetadata.identifier, + batchMetadata.username, batchMetadata.engineType, + batchMetadata.requestName, batchAppInfo.asJava, batchMetadata.kyuubiInstance, batchMetadata.state) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala index 2ded66cc30b..69662f40b7e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala @@ -195,17 +195,17 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging { params += userName } Option(state).filter(_.nonEmpty).foreach { _ => - whereConditions += " STATE = ? " - params += state + whereConditions += " state = ? " + params += state.toUpperCase(Locale.ROOT) } Option(kyuubiInstance).filter(_.nonEmpty).foreach { _ => - whereConditions += " KYUUBI_INSTANCE = ? " + whereConditions += " kyuubi_instance = ? " params += kyuubiInstance } if (whereConditions.nonEmpty) { queryBuilder.append(whereConditions.mkString(" WHERE ", " AND ", " ")) } - queryBuilder.append(" ORDER BY KEY_ID ") + queryBuilder.append(" ORDER BY key_id ") val query = databaseAdaptor.addLimitAndOffsetToQuery(queryBuilder.toString(), size, from) withConnection() { connection => withResultSet(connection, query, params: _*) { rs => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index dfc4a966264..690626a8731 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -183,8 +183,13 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { sessionStateStore.getBatch(batchId) } - def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = { - sessionStateStore.getBatchesByType(batchType, from, size) + def getBatches( + batchType: String, + batchUser: String, + batchState: String, + from: Int, + size: Int): Seq[Batch] = { + sessionStateStore.getBatches(batchType, batchUser, batchState, from, size) } @VisibleForTesting diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 339be62ae3f..3e8fa1e7e8b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -44,7 +44,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { sessionManager.allSessions().foreach { session => sessionManager.closeSession(session.handle) } - sessionManager.getBatchesByType(null, 0, Int.MaxValue).foreach { batch => + sessionManager.getBatches(null, null, null, 0, Int.MaxValue).foreach { batch => sessionManager.applicationManager.killApplication(None, batch.getId) sessionManager.cleanupMetadata(batch.getId) } @@ -71,6 +71,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { var batch = response.readEntity(classOf[Batch]) assert(batch.getKyuubiInstance === fe.connectionUrl) assert(batch.getBatchType === "SPARK") + assert(batch.getName === appName) requestObj.setConf((requestObj.getConf.asScala ++ Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root")).asJava) @@ -87,6 +88,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { batch = getBatchResponse.readEntity(classOf[Batch]) assert(batch.getKyuubiInstance === fe.connectionUrl) assert(batch.getBatchType === "SPARK") + assert(batch.getName === appName) // invalid batchId getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index 91edc3b3f2b..415d2eb5368 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -127,6 +127,19 @@ class BatchRestApiSuite extends RestClientTestHelper { // delete batch batchRestApi.deleteBatch(batch.getId(), null) + // list batches + var listBatchesResp = batchRestApi.listBatches("SPARK", null, null, 0, Int.MaxValue) + assert(listBatchesResp.getTotal > 0) + + // list batches with non-existing user + listBatchesResp = batchRestApi.listBatches("SPARK", "non_existing_user", null, 0, Int.MaxValue) + assert(listBatchesResp.getTotal == 0) + + // list batches with invalid batch state + intercept[KyuubiRestException] { + batchRestApi.listBatches("SPARK", null, "BAD_STATE", 0, Int.MaxValue) + } + spnegoKyuubiRestClient.close() } }