diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index 84c28ad6592..0a7c28fabc5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -141,12 +141,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) { }) } + def getSessionOption(sessionHandle: SessionHandle): Option[Session] = { + Option(handleToSession.get(sessionHandle)) + } + def getSession(sessionHandle: SessionHandle): Session = { - val session = handleToSession.get(sessionHandle) - if (session == null) { - throw KyuubiSQLException(s"Invalid $sessionHandle") - } - session + getSessionOption(sessionHandle).getOrElse(throw KyuubiSQLException(s"Invalid $sessionHandle")) } final protected def setSession(sessionHandle: SessionHandle, session: Session): Unit = { diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java index 76f016d92e0..121579607f2 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java @@ -48,9 +48,24 @@ public Batch getBatchById(String batchId) { return this.getClient().get(path, null, Batch.class, client.getAuthHeader()); } - public GetBatchesResponse listBatches(String batchType, int from, int size) { + public GetBatchesResponse listBatches( + String batchType, + String batchUser, + String batchState, + Long createTime, + Long endTime, + int from, + int size) { Map params = new HashMap<>(); params.put("batchType", batchType); + params.put("batchUser", batchUser); + params.put("batchState", batchState); + if (null != createTime && createTime >= 0) { + params.put("createTime", createTime); + } + if (null != endTime && endTime >= 0) { + params.put("endTime", endTime); + } params.put("from", from); params.put("size", size); return this.getClient() diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java index e318279e3e3..dddb4f9c34e 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Batch.java @@ -23,24 +23,36 @@ public class Batch { private String id; + private String user; private String batchType; + private String name; private Map batchInfo; private String kyuubiInstance; private String state; + private long createTime; + private long endTime; public Batch() {} public Batch( String id, + String user, String batchType, + String name, Map batchInfo, String kyuubiInstance, - String state) { + String state, + long createTime, + long endTime) { this.id = id; + this.user = user; this.batchType = batchType; + this.name = name; this.batchInfo = batchInfo; this.kyuubiInstance = kyuubiInstance; this.state = state; + this.createTime = createTime; + this.endTime = endTime; } public String getId() { @@ -51,6 +63,14 @@ public void setId(String id) { this.id = id; } + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + public String getBatchType() { return batchType; } @@ -59,6 +79,14 @@ public void setBatchType(String batchType) { this.batchType = batchType; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + public Map getBatchInfo() { if (null == batchInfo) { return Collections.emptyMap(); @@ -86,6 +114,22 @@ public void setState(String state) { this.state = state; } + public long getCreateTime() { + return createTime; + } + + public void setCreateTime(long createTime) { + this.createTime = createTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java index 8f97f73a9e8..a039351ef6a 100644 --- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java +++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java @@ -111,8 +111,12 @@ public void testNoPasswordBasicClient() { Batch result = noPasswordBasicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); + assertEquals(result.getCreateTime(), expectedBatch.getCreateTime()); + assertEquals(result.getEndTime(), expectedBatch.getEndTime()); } @Test @@ -131,8 +135,12 @@ public void testAnonymousBasicClient() { Batch result = anonymousBasicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); + assertEquals(result.getCreateTime(), expectedBatch.getCreateTime()); + assertEquals(result.getEndTime(), expectedBatch.getEndTime()); } @Test @@ -154,8 +162,12 @@ public void createBatchTest() { result = basicBatchRestApi.createBatch(batchRequest); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); + assertEquals(result.getCreateTime(), expectedBatch.getCreateTime()); + assertEquals(result.getEndTime(), expectedBatch.getEndTime()); } @Test @@ -177,8 +189,12 @@ public void getBatchByIdTest() { result = basicBatchRestApi.getBatchById("71535"); assertEquals(result.getId(), expectedBatch.getId()); + assertEquals(result.getUser(), expectedBatch.getUser()); assertEquals(result.getBatchType(), expectedBatch.getBatchType()); + assertEquals(result.getName(), expectedBatch.getName()); assertEquals(result.getState(), expectedBatch.getState()); + assertEquals(result.getCreateTime(), expectedBatch.getCreateTime()); + assertEquals(result.getEndTime(), expectedBatch.getEndTime()); } @Test @@ -187,7 +203,8 @@ public void getBatchInfoListTest() { BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH); GetBatchesResponse expectedBatchesInfo = generateTestBatchesResponse(); - GetBatchesResponse result = spnegoBatchRestApi.listBatches("spark", 0, 10); + GetBatchesResponse result = + spnegoBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", 0L, 0L, 0, 10); assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size()); assertEquals(expectedBatchesInfo.getFrom(), result.getFrom()); @@ -197,7 +214,7 @@ public void getBatchInfoListTest() { BatchTestServlet.setAuthSchema(BASIC_AUTH); BatchTestServlet.allowAnonymous(false); - result = basicBatchRestApi.listBatches("spark", 0, 10); + result = basicBatchRestApi.listBatches("spark", TEST_USERNAME, "RUNNING", null, null, 0, 10); assertEquals(expectedBatchesInfo.getBatches().size(), result.getBatches().size()); assertEquals(expectedBatchesInfo.getFrom(), result.getFrom()); diff --git a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java index 44043017296..d2f9ec8d35c 100644 --- a/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java +++ b/kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/RestClientTestUtil.java @@ -35,6 +35,8 @@ public class RestClientTestUtil { public static final String TEST_USERNAME = "test_user"; public static final String TEST_PASSWORD = "test_password"; + public static final Long BATCH_CREATE_TIME = System.currentTimeMillis(); + public static Batch generateTestBatch() { return generateTestBatch("71535"); } @@ -58,7 +60,17 @@ public static Batch generateTestBatch(String id) { + "/MySpace/kyuubi-spark-sql-engine_2.12-1.6.0-SNAPSHOT.jar"); batchInfo.put("state", "RUNNING"); - Batch batch = new Batch(id, "spark", batchInfo, "192.168.31.130:64573", "RUNNING"); + Batch batch = + new Batch( + id, + TEST_USERNAME, + "spark", + "batch_name", + batchInfo, + "192.168.31.130:64573", + "RUNNING", + BATCH_CREATE_TIME, + 0); return batch; } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index ba53e14d594..89fd60bbc36 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -31,9 +31,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.Logging import org.apache.kyuubi.client.api.v1.dto._ -import org.apache.kyuubi.operation.FetchOrientation +import org.apache.kyuubi.operation.{FetchOrientation, OperationState} import org.apache.kyuubi.server.api.ApiRequestContext -import org.apache.kyuubi.server.api.v1.BatchesResource.{supportedBatchType, REST_BATCH_PROTOCOL, SUPPORTED_BATCH_TYPES} +import org.apache.kyuubi.server.api.v1.BatchesResource._ import org.apache.kyuubi.server.http.authentication.AuthenticationFilter import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager, SessionHandle} @@ -44,18 +44,19 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { private def sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] - private def buildBatch(sessionHandle: SessionHandle): Batch = { - buildBatch(sessionManager.getBatchSessionImpl(sessionHandle)) - } - private def buildBatch(session: KyuubiBatchSessionImpl): Batch = { val batchOp = session.batchJobSubmissionOp + val batchOpStatus = batchOp.getStatus new Batch( batchOp.batchId, + session.user, batchOp.batchType, + batchOp.batchName, batchOp.currentApplicationState.getOrElse(Map.empty).asJava, fe.connectionUrl, - batchOp.getStatus.state.toString) + batchOpStatus.state.toString, + session.createTime, + batchOpStatus.completed) } @ApiResponse( @@ -83,7 +84,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { ipAddress, request.getConf.asScala.toMap, request) - buildBatch(sessionHandle) + buildBatch(sessionManager.getBatchSessionImpl(sessionHandle)) } @ApiResponse( @@ -95,9 +96,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { @GET @Path("{batchId}") def batchInfo(@PathParam("batchId") batchId: String): Batch = { - Option(sessionManager.getBatch(batchId)).getOrElse { - error(s"Invalid batchId: $batchId") - throw new NotFoundException(s"Invalid batchId: $batchId") + val sessionHandle = normalizedBatchSessionHandle(batchId) + Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession => + buildBatch(batchSession) + }.getOrElse { + Option(sessionManager.getBatchFromStateStore(batchId)).getOrElse { + error(s"Invalid batchId: $batchId") + throw new NotFoundException(s"Invalid batchId: $batchId") + } } } @@ -106,14 +112,34 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { content = Array(new Content( mediaType = MediaType.APPLICATION_JSON, schema = new Schema(implementation = classOf[GetBatchesResponse]))), - description = "returns the active batch sessions") + description = "returns the batch sessions.") @GET @Consumes(Array(MediaType.APPLICATION_JSON)) def getBatchInfoList( @QueryParam("batchType") batchType: String, + @QueryParam("batchState") batchState: String, + @QueryParam("batchUser") batchUser: String, + @QueryParam("createTime") createTime: Long, + @QueryParam("endTime") endTime: Long, @QueryParam("from") from: Int, @QueryParam("size") size: Int): GetBatchesResponse = { - val batches = sessionManager.getBatchesByType(batchType, from, size) + require( + createTime >= 0 && endTime >= 0 && (endTime == 0 || createTime <= endTime), + "Invalid time range") + if (batchState != null) { + require( + validBatchState(batchState), + s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}") + } + val batches = + sessionManager.getBatchesFromStateStore( + batchType, + batchUser, + batchState, + createTime, + endTime, + from, + size) new GetBatchesResponse(from, batches.size, batches.asJava) } @@ -129,10 +155,10 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { @PathParam("batchId") batchId: String, @QueryParam("from") @DefaultValue("-1") from: Int, @QueryParam("size") size: Int): OperationLog = { + val sessionHandle = normalizedBatchSessionHandle(batchId) try { - val submissionOpt = sessionManager.getBatchSessionImpl(batchId, REST_BATCH_PROTOCOL) - .batchJobSubmissionOp - val rowSet = submissionOpt.getOperationLogRowSet( + val submissionOp = sessionManager.getBatchSessionImpl(sessionHandle).batchJobSubmissionOp + val rowSet = submissionOp.getOperationLogRowSet( FetchOrientation.FETCH_NEXT, from, size) @@ -157,9 +183,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { def closeBatchSession( @PathParam("batchId") batchId: String, @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = { + val sessionHandle = normalizedBatchSessionHandle(batchId) var session: KyuubiBatchSessionImpl = null try { - val sessionHandle = sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL) session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl] } catch { case NonFatal(e) => @@ -187,13 +213,33 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val (success, msg) = session.batchJobSubmissionOp.getKillMessage new CloseBatchResponse(success, msg) } + + private def normalizedBatchSessionHandle(batchId: String): SessionHandle = { + try { + sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL) + } catch { + case NonFatal(e) => + error(s"Invalid batchId: $batchId", e) + throw new NotFoundException(s"Invalid batchId: $batchId") + } + } } object BatchesResource { val REST_BATCH_PROTOCOL = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11 val SUPPORTED_BATCH_TYPES = Seq("SPARK") + val VALID_BATCH_STATES = Seq( + OperationState.PENDING, + OperationState.RUNNING, + OperationState.FINISHED, + OperationState.ERROR, + OperationState.CANCELED).map(_.toString) def supportedBatchType(batchType: String): Boolean = { Option(batchType).exists(bt => SUPPORTED_BATCH_TYPES.contains(bt.toUpperCase(Locale.ROOT))) } + + def validBatchState(batchState: String): Boolean = { + Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT))) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala index 40b3b281fc5..0ef14b00cbf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala @@ -62,13 +62,22 @@ class SessionStateStore extends AbstractService("SessionStateStore") { Option(_stateStore.getMetadata(batchId, true)).map(buildBatch).orNull } - def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = { + def getBatches( + batchType: String, + batchUser: String, + batchState: String, + createTime: Long, + endTime: Long, + from: Int, + size: Int): Seq[Batch] = { _stateStore.getMetadataList( SessionType.BATCH, batchType, + batchUser, + batchState, null, - null, - null, + createTime, + endTime, from, size, true).map(buildBatch) @@ -112,10 +121,14 @@ class SessionStateStore extends AbstractService("SessionStateStore") { new Batch( batchMetadata.identifier, + batchMetadata.username, batchMetadata.engineType, + batchMetadata.requestName, batchAppInfo.asJava, batchMetadata.kyuubiInstance, - batchMetadata.state) + batchMetadata.state, + batchMetadata.createTime, + batchMetadata.endTime) } private def startStateStoreCleaner(): Unit = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala index 17cb12d0763..bcf710382ca 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/StateStore.scala @@ -44,6 +44,8 @@ trait StateStore extends Closeable { * @param userName the user name. * @param state the state. * @param kyuubiInstance the kyuubi instance. + * @param createTime the metadata create time. + * @param endTime the end time. * @param from the batch offset. * @param size the batch size to get. * @param stateOnly only return the state related column values. @@ -55,6 +57,8 @@ trait StateStore extends Closeable { userName: String, state: String, kyuubiInstance: String, + createTime: Long, + endTime: Long, from: Int, size: Int, stateOnly: Boolean): Seq[SessionMetadata] diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala index 2ded66cc30b..3dbb09cd1f1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStore.scala @@ -171,6 +171,8 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging { userName: String, state: String, kyuubiInstance: String, + createTime: Long, + endTime: Long, from: Int, size: Int, stateOnly: Boolean): Seq[SessionMetadata] = { @@ -195,17 +197,26 @@ class JDBCStateStore(conf: KyuubiConf) extends StateStore with Logging { params += userName } Option(state).filter(_.nonEmpty).foreach { _ => - whereConditions += " STATE = ? " - params += state + whereConditions += " state = ? " + params += state.toUpperCase(Locale.ROOT) } Option(kyuubiInstance).filter(_.nonEmpty).foreach { _ => - whereConditions += " KYUUBI_INSTANCE = ? " + whereConditions += " kyuubi_instance = ? " params += kyuubiInstance } + if (createTime > 0) { + whereConditions += " create_time >= ? " + params += createTime + } + if (endTime > 0) { + whereConditions += " end_time > 0 " + whereConditions += " end_time <= ? " + params += endTime + } if (whereConditions.nonEmpty) { queryBuilder.append(whereConditions.mkString(" WHERE ", " AND ", " ")) } - queryBuilder.append(" ORDER BY KEY_ID ") + queryBuilder.append(" ORDER BY key_id ") val query = databaseAdaptor.addLimitAndOffsetToQuery(queryBuilder.toString(), size, from) withConnection() { connection => withResultSet(connection, query, params: _*) { rs => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index dfc4a966264..957233e69bb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -159,12 +159,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { SessionHandle(HandleIdentifier(UUID.fromString(batchId), STATIC_BATCH_SECRET_UUID), protocol) } - def getBatchSessionImpl(batchId: String, protocol: TProtocolVersion): KyuubiBatchSessionImpl = { - getSession(getBatchSessionHandle(batchId, protocol)).asInstanceOf[KyuubiBatchSessionImpl] - } - def getBatchSessionImpl(sessionHandle: SessionHandle): KyuubiBatchSessionImpl = { - getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl] + getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSessionImpl]).orNull } def insertMetadata(metadata: SessionMetadata): Unit = { @@ -179,12 +175,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { sessionStateStore.updateBatchMetadata(batchId, state.toString, applicationStatus, endTime) } - def getBatch(batchId: String): Batch = { + def getBatchFromStateStore(batchId: String): Batch = { sessionStateStore.getBatch(batchId) } - def getBatchesByType(batchType: String, from: Int, size: Int): Seq[Batch] = { - sessionStateStore.getBatchesByType(batchType, from, size) + def getBatchesFromStateStore( + batchType: String, + batchUser: String, + batchState: String, + createTime: Long, + endTime: Long, + from: Int, + size: Int): Seq[Batch] = { + sessionStateStore.getBatches(batchType, batchUser, batchState, createTime, endTime, from, size) } @VisibleForTesting diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 339be62ae3f..d32ee10d7c2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -44,9 +44,10 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { sessionManager.allSessions().foreach { session => sessionManager.closeSession(session.handle) } - sessionManager.getBatchesByType(null, 0, Int.MaxValue).foreach { batch => - sessionManager.applicationManager.killApplication(None, batch.getId) - sessionManager.cleanupMetadata(batch.getId) + sessionManager.getBatchesFromStateStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach { + batch => + sessionManager.applicationManager.killApplication(None, batch.getId) + sessionManager.cleanupMetadata(batch.getId) } } @@ -71,6 +72,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { var batch = response.readEntity(classOf[Batch]) assert(batch.getKyuubiInstance === fe.connectionUrl) assert(batch.getBatchType === "SPARK") + assert(batch.getName === appName) + assert(batch.getCreateTime > 0) + assert(batch.getEndTime === 0) requestObj.setConf((requestObj.getConf.asScala ++ Map(KyuubiAuthenticationFactory.HS2_PROXY_USER -> "root")).asJava) @@ -87,6 +91,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { batch = getBatchResponse.readEntity(classOf[Batch]) assert(batch.getKyuubiInstance === fe.connectionUrl) assert(batch.getBatchType === "SPARK") + assert(batch.getName === appName) + assert(batch.getCreateTime > 0) + assert(batch.getEndTime === 0) // invalid batchId getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId") @@ -185,6 +192,10 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val response = webTarget.path("api/v1/batches") .queryParam("batchType", "spark") + .queryParam("batchUser", "anonymous") + .queryParam("batchState", "RUNNING") + .queryParam("createTime", "0") + .queryParam("endTime", "0") .queryParam("from", "0") .queryParam("size", "2") .request(MediaType.APPLICATION_JSON_TYPE) @@ -298,6 +309,16 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val getBatchListResponse6 = response6.readEntity(classOf[GetBatchesResponse]) assert(getBatchListResponse6.getTotal == 1) sessionManager.allSessions().map(_.close()) + + val queryCreateTime = System.currentTimeMillis() + val response7 = webTarget.path("api/v1/batches") + .queryParam("createTime", queryCreateTime.toString) + .queryParam("endTime", (queryCreateTime - 1).toString) + .queryParam("from", "2") + .queryParam("size", "2") + .request(MediaType.APPLICATION_JSON_TYPE) + .get() + assert(response7.getStatus === 500) } test("negative request") { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala index 1485c3112d2..3115f5bd05e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchRestApiSuite.scala @@ -129,6 +129,38 @@ class BatchRestApiSuite extends RestClientTestHelper { val closeResp = batchRestApi.deleteBatch(batch.getId(), null) assert(closeResp.isSuccess) + // list batches + var listBatchesResp = batchRestApi.listBatches("SPARK", null, null, null, null, 0, Int.MaxValue) + assert(listBatchesResp.getTotal > 0) + + listBatchesResp = + batchRestApi.listBatches( + "SPARK", + null, + null, + Long.MaxValue - 1, + Long.MaxValue, + 0, + Int.MaxValue) + assert(listBatchesResp.getTotal === 0) + + listBatchesResp = + batchRestApi.listBatches("SPARK", null, null, Long.MaxValue, null, 0, Int.MaxValue) + assert(listBatchesResp.getTotal === 0) + + listBatchesResp = batchRestApi.listBatches("SPARK", null, null, null, 1000, 0, Int.MaxValue) + assert(listBatchesResp.getTotal === 0) + + // list batches with non-existing user + listBatchesResp = + batchRestApi.listBatches("SPARK", "non_existing_user", null, 0, 0, 0, Int.MaxValue) + assert(listBatchesResp.getTotal == 0) + + // list batches with invalid batch state + intercept[KyuubiRestException] { + batchRestApi.listBatches("SPARK", null, "BAD_STATE", 0, 0, 0, Int.MaxValue) + } + spnegoKyuubiRestClient.close() } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala index 0955c08568e..25fe6520ec7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/statestore/jdbc/JDBCStateStoreSuite.scala @@ -39,7 +39,17 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { override def afterAll(): Unit = { super.afterAll() - jdbcStateStore.getMetadataList(null, null, null, null, null, 0, Int.MaxValue, true).foreach { + jdbcStateStore.getMetadataList( + null, + null, + null, + null, + null, + 0, + 0, + 0, + Int.MaxValue, + true).foreach { batch => jdbcStateStore.cleanupMetadataByIdentifier(batch.identifier) } @@ -96,7 +106,7 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { jdbcStateStore.insertMetadata(batchState2) var batches = - jdbcStateStore.getMetadataList(SessionType.BATCH, "Spark", null, null, null, 0, 1, true) + jdbcStateStore.getMetadataList(SessionType.BATCH, "Spark", null, null, null, 0, 0, 0, 1, true) assert(batches == Seq(batchStateOnlyMetadata)) batches = jdbcStateStore.getMetadataList( @@ -106,6 +116,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { null, null, 0, + 0, + 0, Int.MaxValue, true) assert(batches == Seq(batchStateOnlyMetadata, batchState2)) @@ -120,6 +132,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", null, 0, + 0, + 0, Int.MaxValue, true) assert(batches.isEmpty) @@ -132,6 +146,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", null, 0, + 0, + 0, Int.MaxValue, true) assert(batches == Seq(batchStateOnlyMetadata)) @@ -144,6 +160,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "RUNNING", null, 0, + 0, + 0, Int.MaxValue, true) assert(batches.isEmpty) @@ -156,6 +174,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", null, 0, + 0, + 0, Int.MaxValue, true) assert(batches.isEmpty) @@ -167,6 +187,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", null, 0, + 0, + 0, Int.MaxValue, true) assert(batches == Seq(batchStateOnlyMetadata)) @@ -178,6 +200,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { null, null, 0, + 0, + 0, Int.MaxValue, true) assert(batches == Seq(batchStateOnlyMetadata)) @@ -189,6 +213,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", kyuubiInstance, 0, + 0, + 0, Int.MaxValue, false) assert(batchesToRecover == Seq(batchMetadata)) @@ -200,6 +226,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "RUNNING", kyuubiInstance, 0, + 0, + 0, Int.MaxValue, false) assert(batchesToRecover.isEmpty) @@ -226,6 +254,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "PENDING", kyuubiInstance, 0, + 0, + 0, Int.MaxValue, false).isEmpty) @@ -236,6 +266,8 @@ class JDBCStateStoreSuite extends KyuubiFunSuite { "RUNNING", kyuubiInstance, 0, + 0, + 0, Int.MaxValue, false).isEmpty)