Skip to content

Commit bb98aa7

Browse files
ulysses-youturboFei
authored andcommitted
[KYUUBI #2800] Refine batch mode code path
### _Why are the changes needed?_ 1. rename `Metadata` to `SessionMetadata` 2. wrap `CloseBatchResponse` for closeBatchSession 3. remove killApp in closeBatchSession, so every request will kill the app by default 4. correct when to update metadata 5. cleanupMetadataByAge should check state and only cleanup terminated metadata 6. correct the batch session state, we should never get a closed state ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2800 from ulysses-you/followup. Closes #2800 a132439 [ulysses-you] comments 0f1c3aa [ulysses-you] fix e79ec50 [ulysses-you] fix e8862f8 [ulysses-you] improve facad17 [ulysses-you] comments 777f00c [ulysses-you] Merge branch 'master' of https://github.com/apache/incubator-kyuubi into followup df71d59 [ulysses-you] address comment 28010cb [ulysses-you] docs dacdc87 [ulysses-you] lock 6516d32 [ulysses-you] cleanup Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
1 parent 2184526 commit bb98aa7

File tree

19 files changed

+234
-110
lines changed

19 files changed

+234
-110
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationState.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ object OperationState extends Enumeration {
3131
val INITIALIZED, PENDING, RUNNING, COMPILED, FINISHED, TIMEOUT, CANCELED, CLOSED, ERROR, UNKNOWN =
3232
Value
3333

34+
val terminalStates: Seq[OperationState] = Seq(FINISHED, TIMEOUT, CANCELED, CLOSED, ERROR)
35+
3436
implicit def toTOperationState(from: OperationState): TOperationState = from match {
3537
case INITIALIZED => INITIALIZED_STATE
3638
case PENDING => PENDING_STATE
@@ -59,8 +61,7 @@ object OperationState extends Enumeration {
5961
}
6062
}
6163

62-
def isTerminal(state: OperationState): Boolean = state match {
63-
case FINISHED | TIMEOUT | CANCELED | CLOSED | ERROR => true
64-
case _ => false
64+
def isTerminal(state: OperationState): Boolean = {
65+
terminalStates.contains(state)
6566
}
6667
}

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ public OperationLog getBatchLocalLog(String batchId, int from, int size) {
6666
return this.getClient().get(path, params, OperationLog.class, client.getAuthHeader());
6767
}
6868

69-
public void deleteBatch(String batchId, boolean killApp, String hs2ProxyUser) {
69+
public void deleteBatch(String batchId, String hs2ProxyUser) {
7070
Map<String, Object> params = new HashMap<>();
71-
params.put("killApp", killApp);
7271
params.put("hive.server2.proxy.user", hs2ProxyUser);
7372

7473
String path = String.format("%s/%s", API_BASE_PATH, batchId);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.client.api.v1.dto;
19+
20+
import java.util.Objects;
21+
22+
public class CloseBatchResponse {
23+
private boolean success = false;
24+
private String msg = null;
25+
26+
public CloseBatchResponse() {}
27+
28+
public CloseBatchResponse(boolean success, String msg) {
29+
this.success = success;
30+
this.msg = msg;
31+
}
32+
33+
public boolean isSuccess() {
34+
return success;
35+
}
36+
37+
public void setSuccess(boolean success) {
38+
this.success = success;
39+
}
40+
41+
public String getMsg() {
42+
return msg;
43+
}
44+
45+
public void setMsg(String msg) {
46+
this.msg = msg;
47+
}
48+
49+
@Override
50+
public boolean equals(Object o) {
51+
if (this == o) return true;
52+
if (o == null || getClass() != o.getClass()) return false;
53+
CloseBatchResponse that = (CloseBatchResponse) o;
54+
return success == that.success && Objects.equals(msg, that.msg);
55+
}
56+
57+
@Override
58+
public int hashCode() {
59+
return Objects.hash(success, msg);
60+
}
61+
}

kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchRestClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ public void getOperationLogTest() {
226226
public void deleteBatchTest() {
227227
// test spnego auth
228228
BatchTestServlet.setAuthSchema(NEGOTIATE_AUTH);
229-
spnegoBatchRestApi.deleteBatch("71535", true, "b_test");
229+
spnegoBatchRestApi.deleteBatch("71535", "b_test");
230230

231231
// test basic auth
232232
BatchTestServlet.setAuthSchema(BASIC_AUTH);
233233
BatchTestServlet.allowAnonymous(false);
234-
basicBatchRestApi.deleteBatch("71535", true, "b_test");
234+
basicBatchRestApi.deleteBatch("71535", "b_test");
235235
}
236236
}

kyuubi-rest-client/src/test/java/org/apache/kyuubi/client/BatchTestServlet.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ protected void doDelete(HttpServletRequest req, HttpServletResponse resp)
101101
throws ServletException, IOException {
102102
if (!validAuthHeader(req, resp)) return;
103103

104-
if (req.getPathInfo().matches("/api/v1/batches/\\d+")
105-
&& req.getQueryString().matches("[\\w.]+(=[\\w]*)(&[\\w.]+(=[\\w]*))+$")) {
104+
if (req.getPathInfo().matches("/api/v1/batches/\\d+")) {
106105
resp.setStatus(HttpServletResponse.SC_OK);
107106
} else {
108107
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);

kyuubi-server/src/main/resources/sql/derby/statestore-schema-derby.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
-- the metadata table ddl
22

3-
CREATE TABLE metadata(
3+
CREATE TABLE session_metadata(
44
key_id bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, -- the auto increment key id
55
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
66
session_type varchar(128) NOT NULL, -- the session type, SQL or BATCH

kyuubi-server/src/main/resources/sql/mysql/statestore-schema-mysql.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
-- the metadata table ddl
22

3-
CREATE TABLE metadata(
3+
CREATE TABLE session_metadata(
44
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
55
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
66
session_type varchar(128) NOT NULL COMMENT 'the session type, SQL or BATCH',

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 93 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,40 @@
1717

1818
package org.apache.kyuubi.operation
1919

20+
import java.io.IOException
2021
import java.nio.ByteBuffer
2122
import java.util.{ArrayList => JArrayList, Locale}
2223
import java.util.concurrent.TimeUnit
2324

2425
import scala.collection.JavaConverters._
2526

27+
import com.codahale.metrics.MetricRegistry
2628
import org.apache.hive.service.rpc.thrift._
2729

2830
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
2931
import org.apache.kyuubi.config.KyuubiConf
3032
import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
3133
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
34+
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
35+
import org.apache.kyuubi.metrics.MetricsSystem
3236
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
37+
import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
3338
import org.apache.kyuubi.operation.log.OperationLog
3439
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
3540
import org.apache.kyuubi.util.ThriftUtils
3641

42+
/**
43+
* The state of batch operation is special. In general, the lifecycle of state is:
44+
*
45+
* / ERROR
46+
* PENDING -> RUNNING -> FINISHED
47+
* \ CANCELED (CLOSED)
48+
*
49+
* We can not change FINISHED/ERROR/CANCELED to CLOSED, and it's different with other operation
50+
* which final status is always CLOSED, so we do not use CLOSED state in this class.
51+
* To compatible with kill application we combine the semantics of `cancel` and `close`, so if
52+
* user close the batch session that means the final status is CANCELED.
53+
*/
3754
class BatchJobSubmission(
3855
session: KyuubiBatchSessionImpl,
3956
val batchType: String,
@@ -56,6 +73,9 @@ class BatchJobSubmission(
5673

5774
private var applicationStatus: Option[Map[String, String]] = None
5875

76+
private var killMessage: KillResponse = (false, "UNKNOWN")
77+
def getKillMessage: KillResponse = killMessage
78+
5979
private val builder: ProcBuilder = {
6080
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
6181
case Some("SPARK") =>
@@ -86,35 +106,62 @@ class BatchJobSubmission(
86106
private val applicationCheckInterval =
87107
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
88108

109+
private def updateBatchMetadata(): Unit = {
110+
val endTime =
111+
if (isTerminalState(state)) {
112+
lastAccessTime
113+
} else {
114+
0L
115+
}
116+
session.sessionManager.updateBatchMetadata(
117+
batchId,
118+
state,
119+
applicationStatus.getOrElse(Map.empty),
120+
endTime)
121+
}
122+
89123
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
90124

125+
// we can not set to other state if it is canceled
126+
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
127+
if (state != CANCELED) {
128+
setState(newState)
129+
}
130+
}
131+
91132
override protected def beforeRun(): Unit = {
92133
OperationLog.setCurrentOperationLog(_operationLog)
93134
setHasResultSet(true)
94-
setState(OperationState.PENDING)
135+
setStateIfNotCanceled(OperationState.PENDING)
95136
}
96137

97138
override protected def afterRun(): Unit = {
98139
OperationLog.removeCurrentOperationLog()
99-
session.sessionManager.updateBatchMetadata(
100-
batchId,
101-
getStatus.state,
102-
applicationStatus.getOrElse(Map.empty),
103-
getStatus.lastModified)
104140
}
105141

106142
override protected def runInternal(): Unit = {
107143
val asyncOperation: Runnable = () => {
108-
setState(OperationState.RUNNING)
144+
setStateIfNotCanceled(OperationState.RUNNING)
109145
try {
110146
submitBatchJob()
111-
setState(OperationState.FINISHED)
112-
} catch onError()
147+
setStateIfNotCanceled(OperationState.FINISHED)
148+
} catch {
149+
onError()
150+
} finally {
151+
updateBatchMetadata()
152+
}
113153
}
154+
114155
try {
115156
val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
116157
setBackgroundHandle(opHandle)
117-
} catch onError("submitting batch job submission operation in background, request rejected")
158+
} catch {
159+
onError("submitting batch job submission operation in background, request rejected")
160+
} finally {
161+
if (isTerminalState(state)) {
162+
updateBatchMetadata()
163+
}
164+
}
118165
}
119166

120167
private def applicationFailed(applicationStatus: Option[Map[String, String]]): Boolean = {
@@ -130,10 +177,7 @@ class BatchJobSubmission(
130177
applicationStatus = currentApplicationState
131178
while (!applicationFailed(applicationStatus) && process.isAlive) {
132179
if (!appStatusFirstUpdated && applicationStatus.isDefined) {
133-
session.sessionManager.updateBatchMetadata(
134-
batchId,
135-
getStatus.state,
136-
applicationStatus.get)
180+
updateBatchMetadata()
137181
appStatusFirstUpdated = true
138182
}
139183
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
@@ -189,12 +233,44 @@ class BatchJobSubmission(
189233
}.getOrElse(ThriftUtils.EMPTY_ROW_SET)
190234
}
191235

192-
override def close(): Unit = {
236+
override def close(): Unit = state.synchronized {
193237
if (!isClosedOrCanceled) {
194-
if (builder != null) {
238+
try {
239+
getOperationLog.foreach(_.close())
240+
} catch {
241+
case e: IOException => error(e.getMessage, e)
242+
}
243+
244+
MetricsSystem.tracing(_.decCount(
245+
MetricRegistry.name(OPERATION_OPEN, statement.toLowerCase(Locale.getDefault))))
246+
247+
// fast fail
248+
if (isTerminalState(state)) {
249+
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
250+
builder.close()
251+
return
252+
}
253+
254+
try {
255+
killMessage = killBatchApplication()
195256
builder.close()
257+
} finally {
258+
if (killMessage._1 && !isTerminalState(state)) {
259+
// kill success and we can change state safely
260+
// note that, the batch operation state should never be closed
261+
setState(OperationState.CANCELED)
262+
updateBatchMetadata()
263+
} else if (killMessage._1) {
264+
// we can not change state safely
265+
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
266+
} else if (!isTerminalState(state)) {
267+
// failed to kill, the kill message is enough
268+
}
196269
}
197270
}
198-
super.close()
271+
}
272+
273+
override def cancel(): Unit = {
274+
throw new IllegalStateException("Use close instead.")
199275
}
200276
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.kyuubi.server.api.v1
1919

2020
import java.util.Locale
2121
import javax.ws.rs._
22-
import javax.ws.rs.core.{MediaType, Response}
22+
import javax.ws.rs.core.MediaType
2323

2424
import scala.collection.JavaConverters._
2525
import scala.util.control.NonFatal
@@ -149,14 +149,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
149149
@ApiResponse(
150150
responseCode = "200",
151151
content = Array(new Content(
152-
mediaType = MediaType.APPLICATION_JSON)),
153-
description = "close a batch session")
152+
mediaType = MediaType.APPLICATION_JSON,
153+
schema = new Schema(implementation = classOf[CloseBatchResponse]))),
154+
description = "close and cancel a batch session")
154155
@DELETE
155156
@Path("{batchId}")
156157
def closeBatchSession(
157158
@PathParam("batchId") batchId: String,
158-
@QueryParam("killApp") killApp: Boolean,
159-
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
159+
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): CloseBatchResponse = {
160160
var session: KyuubiBatchSessionImpl = null
161161
try {
162162
val sessionHandle = sessionManager.getBatchSessionHandle(batchId, REST_BATCH_PROTOCOL)
@@ -183,14 +183,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
183183
s"$userName is not allowed to close the session belong to ${session.user}")
184184
}
185185

186-
if (killApp) {
187-
val killResponse = session.batchJobSubmissionOp.killBatchApplication()
188-
sessionManager.closeSession(session.handle)
189-
Response.ok().entity(killResponse).build()
190-
} else {
191-
sessionManager.closeSession(session.handle)
192-
Response.ok().build()
193-
}
186+
sessionManager.closeSession(session.handle)
187+
val (success, msg) = session.batchJobSubmissionOp.getKillMessage
188+
new CloseBatchResponse(success, msg)
194189
}
195190
}
196191

0 commit comments

Comments
 (0)