Skip to content

Commit

Permalink
[KYUUBI #3922] Only the ApplicationInfo with non-empty id is valid fo…
Browse files Browse the repository at this point in the history
…r BatchJobSubmission

### _Why are the changes needed?_

![image](https://user-images.githubusercontent.com/6757692/206124635-2e43542a-337f-455b-bccc-f7907d5e26db.png)

The batch state is update to running, but app state is NOT_FOUND.

We shall check the ApplicationInfo return by applicationManager.

Because for YarnApplicationOperation, the result of getApplicationInfoByTag is always non-empty.
```
      if (reports.isEmpty) {
        debug(s"Application with tag $tag not found")
        ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
      }
```

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3922 from turboFei/not_found.

Closes #3922

f83c67c [fwang12] set not found state if finally state is empty
62e619b [fwang12] fix ut
53c76d2 [fwang12] fix ut
ff5c36b [fwang12] check id none empty
7b1180c [fwang12] add ut
4843f8a [fwang12] do not update not found as batch state

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 8, 2022
1 parent 27b525b commit 8eac513
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class BatchJobSubmission(
}

override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
// only the ApplicationInfo with non-empty id is valid for the operation
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
}

private[kyuubi] def killBatchApplication(): KillResponse = {
Expand All @@ -113,6 +114,13 @@ class BatchJobSubmission(
0L
}

if (isTerminalState(state)) {
if (applicationInfo.isEmpty) {
applicationInfo =
Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
}
}

applicationInfo.foreach { status =>
val metadataToUpdate = Metadata(
identifier = batchId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
assert(appInfo.exists(_.id.startsWith("application_")))
}

eventually(timeout(10.seconds)) {
val metadata = session.sessionManager.getBatchMetadata(session.handle.identifier.toString)
assert(metadata.state === "RUNNING")
assert(metadata.engineId.startsWith("application_"))
}

val killResponse = yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 startsWith "Succeeded to terminate:")
Expand Down Expand Up @@ -169,8 +175,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp

eventually(timeout(3.minutes), interval(50.milliseconds)) {
assert(batchJobSubmissionOp.currentApplicationInfo.isDefined)
assert(batchJobSubmissionOp.currentApplicationInfo.get.id == null)
assert(batchJobSubmissionOp.currentApplicationInfo.isEmpty)
assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
assert(session1.createTime === batchMetadata.createTime)
assert(session2.createTime === batchMetadata2.createTime)

eventually(timeout(5.seconds)) {
eventually(timeout(10.seconds)) {
assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING ||
session1.batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
assert(session1.batchJobSubmissionOp.builder.processLaunched)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.{BatchTestHelper, RestClientTestHelper, Utils}
import org.apache.kyuubi.ctl.TestPrematureExit
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.session.KyuubiSessionManager

Expand All @@ -37,6 +38,10 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
val basePath: String = Utils.getCodeSourceLocation(getClass)
val batchFile: String = s"${basePath}/batch.yaml"

override protected val otherConfigs: Map[String, String] = {
Map(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL.key -> "100")
}

override def beforeAll(): Unit = {
super.beforeAll()

Expand Down Expand Up @@ -234,12 +239,12 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"--password",
ldapUserPasswd,
"--waitCompletion",
"false")
"false",
"--conf",
s"${CtlConf.CTL_BATCH_LOG_QUERY_INTERVAL.key}=100")
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains(s"/bin/spark-submit"))
assert(!result.contains("ShutdownHookManager: Shutdown hook called"))
val numberOfRows = result.split("\n").length
assert(numberOfRows <= 100)
}

test("list batch test") {
Expand Down

0 comments on commit 8eac513

Please sign in to comment.