Skip to content

Commit

Permalink
[KYUUBI #6278] Update DB state when the query fallback to resource ma…
Browse files Browse the repository at this point in the history
…nager and the batch app is terminal (#6284)

* [KYUUBI #6278] Update DB state when the query fallback to yarn and the batch app is terminal

* update all engine info

---------

Co-authored-by: Wang, Fei <fwang12@ebay.com>
  • Loading branch information
yanghua and turboFei committed Apr 15, 2024
1 parent 9aa5269 commit 1591157
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.control.NonFatal
import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.commons.lang3.StringUtils
import org.glassfish.jersey.media.multipart.{FormDataContentDisposition, FormDataParam}

import org.apache.kyuubi.{Logging, Utils}
Expand All @@ -39,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
Expand Down Expand Up @@ -315,8 +316,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
buildBatch(batchSession)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
val isOperationTerminated = (StringUtils.isNotBlank(metadata.state)
&& OperationState.isTerminal(OperationState.withName(metadata.state)))
val isApplicationTerminated = (StringUtils.isNotBlank(metadata.engineState)
&& ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState)))

if (batchV2Enabled(metadata.requestConf) ||
OperationState.isTerminal(OperationState.withName(metadata.state)) ||
isOperationTerminated ||
isApplicationTerminated ||
metadata.kyuubiInstance == fe.connectionUrl) {
MetadataManager.buildBatch(metadata)
} else {
Expand All @@ -332,6 +339,17 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Some(userName),
// prevent that the batch be marked as terminated if application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
// if the batch app is terminated, update the metadata in db.
if (BatchJobSubmission.applicationTerminated(batchAppStatus)) {
val appInfo = batchAppStatus.get
sessionManager.updateMetadata(Metadata(
identifier = batchId,
engineId = appInfo.id,
engineName = appInfo.name,
engineUrl = appInfo.url.orNull,
engineState = appInfo.state.toString,
engineError = appInfo.error))
}
buildBatch(metadata, batchAppStatus)
}
}
Expand Down

0 comments on commit 1591157

Please sign in to comment.