Skip to content

Commit

Permalink
[KYUUBI #3950] Fix the batch metadata in-consistent issue on open bat…
Browse files Browse the repository at this point in the history
…ch session failure

### _Why are the changes needed?_

I found that, there is some confused batch metadata.

The metadata state in mysql is PENDING, but the batch session state is `ERROR`.

I think the RC is that:
- the metadata insert is in KyuubiBatchSessionImpl
- If open batch session failed, the BatchJobSubmission opHandle has not been added into batch session operation set.
- then the close session will not update the batch metadata

In this pr, I re-order the code when opening batch session.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3950 from turboFei/update_batch_state.

Closes #3950

49a8e07 [fwang12] try to kill
fd495bc [fwang12] fix ut
6d3bd69 [fwang12] add ut
bb2bf89 [fwang12] add more log
0c9f32b [fwang12] check recovery metadata if failed before run batch op
23f8fef [fwang12] comments
e45b192 [fwang12] re-order
0f0ad84 [fwang12] set launchEngineOp to lazy to prevent the OP CONN metrics failure
0eed14d [fwang12] add batch op
c39da31 [fwang12] Revert "insert metadata when run operation"
61b40c4 [fwang12] insert metadata when run operation

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 9, 2022
1 parent 8eac513 commit e86df1a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ class BatchJobSubmission(
}

MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))

if (!builder.processLaunched) {
builder.close()
if (recoveryMetadata.isDefined) {
killMessage = killBatchApplication()
}
setState(OperationState.CANCELED)
updateBatchMetadata()
return
}

// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ class KyuubiBatchSessionImpl(

checkSessionAccessPathURIs()

// we should call super.open before running batch job submission operation
// create the operation root directory before running batch job submission operation
super.open()

runOperation(batchJobSubmissionOp)
}

override def close(): Unit = {
super.close()
batchJobSubmissionOp.close()
waitMetadataRequestsRetryCompletion()
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): SessionHandle = {
val user = batchSession.user
val ipAddress = batchSession.ipAddress
val handle = batchSession.handle
try {
val handle = batchSession.handle
batchSession.open()
setSession(handle, batchSession)
info(s"$user's batch session with $handle is opened, current opening sessions" +
Expand All @@ -153,14 +153,15 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchSession.close()
} catch {
case t: Throwable =>
warn(s"Error closing batch session for $user client ip: $ipAddress", t)
warn(s"Error closing batch session[$handle] for $user client ip: $ipAddress", t)
}
MetricsSystem.tracing { ms =>
ms.incCount(CONN_FAIL)
ms.incCount(MetricRegistry.name(CONN_FAIL, user))
}
throw KyuubiSQLException(
s"Error opening batch session for $user client ip $ipAddress, due to ${e.getMessage}",
s"Error opening batch session[$handle] for $user client ip $ipAddress," +
s" due to ${e.getMessage}",
e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.server.api.v1

import java.net.InetAddress
import java.nio.file.Paths
import java.util.{Base64, UUID}
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
Expand Down Expand Up @@ -49,6 +50,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
.set(
KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER,
classOf[UserDefinedEngineSecuritySecretProvider].getName)
.set(
KyuubiConf.SESSION_LOCAL_DIR_ALLOW_LIST,
Seq(Paths.get(sparkBatchTestResource.get).getParent.toString))

override def afterEach(): Unit = {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
Expand Down Expand Up @@ -219,7 +223,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
Map.empty,
newBatchRequest(
"spark",
"",
sparkBatchTestResource.get,
"",
""))
sessionManager.openSession(
Expand All @@ -241,7 +245,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
Map.empty,
newBatchRequest(
"spark",
"",
sparkBatchTestResource.get,
"",
""))
sessionManager.openBatchSession(
Expand All @@ -251,7 +255,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
Map.empty,
newBatchRequest(
"spark",
"",
sparkBatchTestResource.get,
"",
""))

Expand Down Expand Up @@ -628,4 +632,22 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
val counterName = s"${MetricsConstants.OPERATION_STATE}.$opType.${state.toString.toLowerCase}"
MetricsSystem.meterValue(counterName).getOrElse(0L)
}

test("the batch session should be consistent on open session failure") {
val sessionManager = server.frontendServices.head
.be.sessionManager.asInstanceOf[KyuubiSessionManager]

val e = intercept[Exception] {
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map.empty,
newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
}
val sessionHandleRegex = "\\[[\\S]*\\]".r
val batchId = sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
.replaceAll("\\[", "").replaceAll("\\]", "")
assert(sessionManager.getBatchMetadata(batchId).state == "CANCELED")
}
}

0 comments on commit e86df1a

Please sign in to comment.