Skip to content

Commit

Permalink
[KYUUBI #2800] Refine batch mode code path
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

1. rename `Metadata` to `SessionMetadata`
2. wrap `CloseBatchResponse` for closeBatchSession
3. remove killApp in closeBatchSession, so every request will kill the app by default
4. correct when to update metadata
5. cleanupMetadataByAge should check state and only cleanup terminated metadata
6. correct the batch session state, we should never get a closed state

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2800 from ulysses-you/followup.

Closes #2800

a132439 [ulysses-you] comments
0f1c3aa [ulysses-you] fix
e79ec50 [ulysses-you] fix
e8862f8 [ulysses-you] improve
facad17 [ulysses-you] comments
777f00c [ulysses-you] Merge branch 'master' of https://github.com/apache/incubator-kyuubi into followup
df71d59 [ulysses-you] address comment
28010cb [ulysses-you] docs
dacdc87 [ulysses-you] lock
6516d32 [ulysses-you] cleanup

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
  • Loading branch information
ulysses-you authored and turboFei committed Jun 2, 2022
1 parent 2184526 commit bb98aa7
Show file tree
Hide file tree
Showing 19 changed files with 234 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object OperationState extends Enumeration {
val INITIALIZED, PENDING, RUNNING, COMPILED, FINISHED, TIMEOUT, CANCELED, CLOSED, ERROR, UNKNOWN =
Value

val terminalStates: Seq[OperationState] = Seq(FINISHED, TIMEOUT, CANCELED, CLOSED, ERROR)

implicit def toTOperationState(from: OperationState): TOperationState = from match {
case INITIALIZED => INITIALIZED_STATE
case PENDING => PENDING_STATE
Expand Down Expand Up @@ -59,8 +61,7 @@ object OperationState extends Enumeration {
}
}

def isTerminal(state: OperationState): Boolean = state match {
case FINISHED | TIMEOUT | CANCELED | CLOSED | ERROR => true
case _ => false
def isTerminal(state: OperationState): Boolean = {
terminalStates.contains(state)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public OperationLog getBatchLocalLog(String batchId, int from, int size) {
return this.getClient().get(path, params, OperationLog.class, client.getAuthHeader());
}

public void deleteBatch(String batchId, boolean killApp, String hs2ProxyUser) {
public void deleteBatch(String batchId, String hs2ProxyUser) {
Map<String, Object> params = new HashMap<>();
params.put("killApp", killApp);
params.put("hive.server2.proxy.user", hs2ProxyUser);

String path = String.format("%s/%s", API_BASE_PATH, batchId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.client.api.v1.dto;

import java.util.Objects;

public class CloseBatchResponse {
private boolean success = false;
private String msg = null;

public CloseBatchResponse() {}

public CloseBatchResponse(boolean success, String msg) {
this.success = success;
this.msg = msg;
}

public boolean isSuccess() {
return success;
}

public void setSuccess(boolean success) {
this.success = success;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CloseBatchResponse that = (CloseBatchResponse) o;
return success == that.success && Objects.equals(msg, that.msg);
}

@Override
public int hashCode() {
return Objects.hash(success, msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ public void getOperationLogTest() {
public void deleteBatchTest() {
// test spnego auth
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);
spnegoBatchRestApi.deleteBatch("71535", true, "b_test");
spnegoBatchRestApi.deleteBatch("71535", "b_test");

// test basic auth
BatchTestServlet.setAuthSchema(BASIC_AUTH);
BatchTestServlet.allowAnonymous(false);
basicBatchRestApi.deleteBatch("71535", true, "b_test");
basicBatchRestApi.deleteBatch("71535", "b_test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ protected void doDelete(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (!validAuthHeader(req, resp)) return;

if (req.getPathInfo().matches("/api/v1/batches/\\d+")
&& req.getQueryString().matches("[\\w.]+(=[\\w]*)(&[\\w.]+(=[\\w]*))+$")) {
if (req.getPathInfo().matches("/api/v1/batches/\\d+")) {
resp.setStatus(HttpServletResponse.SC_OK);
} else {
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- the metadata table ddl

CREATE TABLE metadata(
CREATE TABLE session_metadata(
key_id bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, -- the auto increment key id
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
session_type varchar(128) NOT NULL, -- the session type, SQL or BATCH
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- the metadata table ddl

CREATE TABLE metadata(
CREATE TABLE session_metadata(
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
session_type varchar(128) NOT NULL COMMENT 'the session type, SQL or BATCH',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,40 @@

package org.apache.kyuubi.operation

import java.io.IOException
import java.nio.ByteBuffer
import java.util.{ArrayList => JArrayList, Locale}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import com.codahale.metrics.MetricRegistry
import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
import org.apache.kyuubi.util.ThriftUtils

/**
* The state of batch operation is special. In general, the lifecycle of state is:
*
* / ERROR
* PENDING -> RUNNING -> FINISHED
* \ CANCELED (CLOSED)
*
* We can not change FINISHED/ERROR/CANCELED to CLOSED, and it's different with other operation
* which final status is always CLOSED, so we do not use CLOSED state in this class.
* To compatible with kill application we combine the semantics of `cancel` and `close`, so if
* user close the batch session that means the final status is CANCELED.
*/
class BatchJobSubmission(
session: KyuubiBatchSessionImpl,
val batchType: String,
Expand All @@ -56,6 +73,9 @@ class BatchJobSubmission(

private var applicationStatus: Option[Map[String, String]] = None

private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage

private val builder: ProcBuilder = {
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
case Some("SPARK") =>
Expand Down Expand Up @@ -86,35 +106,62 @@ class BatchJobSubmission(
private val applicationCheckInterval =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)

private def updateBatchMetadata(): Unit = {
val endTime =
if (isTerminalState(state)) {
lastAccessTime
} else {
0L
}
session.sessionManager.updateBatchMetadata(
batchId,
state,
applicationStatus.getOrElse(Map.empty),
endTime)
}

override def getOperationLog: Option[OperationLog] = Option(_operationLog)

// we can not set to other state if it is canceled
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
if (state != CANCELED) {
setState(newState)
}
}

override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
setState(OperationState.PENDING)
setStateIfNotCanceled(OperationState.PENDING)
}

override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
session.sessionManager.updateBatchMetadata(
batchId,
getStatus.state,
applicationStatus.getOrElse(Map.empty),
getStatus.lastModified)
}

override protected def runInternal(): Unit = {
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
setStateIfNotCanceled(OperationState.RUNNING)
try {
submitBatchJob()
setState(OperationState.FINISHED)
} catch onError()
setStateIfNotCanceled(OperationState.FINISHED)
} catch {
onError()
} finally {
updateBatchMetadata()
}
}

try {
val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(opHandle)
} catch onError("submitting batch job submission operation in background, request rejected")
} catch {
onError("submitting batch job submission operation in background, request rejected")
} finally {
if (isTerminalState(state)) {
updateBatchMetadata()
}
}
}

private def applicationFailed(applicationStatus: Option[Map[String, String]]): Boolean = {
Expand All @@ -130,10 +177,7 @@ class BatchJobSubmission(
applicationStatus = currentApplicationState
while (!applicationFailed(applicationStatus) && process.isAlive) {
if (!appStatusFirstUpdated && applicationStatus.isDefined) {
session.sessionManager.updateBatchMetadata(
batchId,
getStatus.state,
applicationStatus.get)
updateBatchMetadata()
appStatusFirstUpdated = true
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -189,12 +233,44 @@ class BatchJobSubmission(
}.getOrElse(ThriftUtils.EMPTY_ROW_SET)
}

override def close(): Unit = {
override def close(): Unit = state.synchronized {
if (!isClosedOrCanceled) {
if (builder != null) {
try {
getOperationLog.foreach(_.close())
} catch {
case e: IOException => error(e.getMessage, e)
}

MetricsSystem.tracing(_.decCount(
MetricRegistry.name(OPERATION_OPEN, statement.toLowerCase(Locale.getDefault))))

// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
builder.close()
return
}

try {
killMessage = killBatchApplication()
builder.close()
} finally {
if (killMessage._1 && !isTerminalState(state)) {
// kill success and we can change state safely
// note that, the batch operation state should never be closed
setState(OperationState.CANCELED)
updateBatchMetadata()
} else if (killMessage._1) {
// we can not change state safely
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
} else if (!isTerminalState(state)) {
// failed to kill, the kill message is enough
}
}
}
super.close()
}

override def cancel(): Unit = {
throw new IllegalStateException("Use close instead.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.server.api.v1

import java.util.Locale
import javax.ws.rs._
import javax.ws.rs.core.{MediaType, Response}
import javax.ws.rs.core.MediaType

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -149,14 +149,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description = "close a batch session")
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[CloseBatchResponse]))),
description = "close and cancel a batch session")
@DELETE
@Path("{batchId}")
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("killApp") killApp: Boolean,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {
var session: KyuubiBatchSessionImpl = null
try {
val sessionHandle = sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
Expand All @@ -183,14 +183,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
s"$userName is not allowed to close the session belong to ${session.user}")
}

if (killApp) {
val killResponse = session.batchJobSubmissionOp.killBatchApplication()
sessionManager.closeSession(session.handle)
Response.ok().entity(killResponse).build()
} else {
sessionManager.closeSession(session.handle)
Response.ok().build()
}
sessionManager.closeSession(session.handle)
val (success, msg) = session.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(success, msg)
}
}

Expand Down
Loading

0 comments on commit bb98aa7

Please sign in to comment.