Skip to content

Commit 27330dd

Browse files
turboFeiulysses-you
authored andcommitted
[KYUUBI #2628][FOLLOWUP] Refine kyuubi-ctl batch commands
### _Why are the changes needed?_ Refine kyuubi-ctl batch commands ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2889 from turboFei/refine_batch_command. Closes #2628 56bb7e3 [Fei Wang] print current state d7be6b7 [Fei Wang] print batch report 3c94fb2 [Fei Wang] set debug for doRequest 7c5fd65 [Fei Wang] fix ut bcd28a7 [Fei Wang] exit if batch job fail 5d73226 [Fei Wang] refactor 4a2bad8 [Fei Wang] refactor the state 9c98219 [Fei Wang] revert --conf c2b7008 [Fei Wang] fix ut 10778df [Fei Wang] print close resp first a453084 [Fei Wang] support --conf Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 57e3733 commit 27330dd

File tree

7 files changed

+82
-20
lines changed

7 files changed

+82
-20
lines changed

kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/RestClientFactory.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ object RestClientFactory {
4545
map: JMap[String, Object],
4646
conf: KyuubiConf): KyuubiRestClient = {
4747
val version = getApiVersion(map)
48-
val hostUrl = getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).get, cliConfig, map)
48+
val hostUrl =
49+
getRestConfig("hostUrl", conf.get(CTL_REST_CLIENT_BASE_URL).orNull, cliConfig, map)
4950
val authSchema =
5051
getRestConfig("authSchema", conf.get(CTL_REST_CLIENT_AUTH_SCHEMA), cliConfig, map)
5152

@@ -67,7 +68,7 @@ object RestClientFactory {
6768
.build()
6869
case "spnego" =>
6970
val spnegoHost =
70-
getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).get, cliConfig, map)
71+
getRestConfig("spnegoHost", conf.get(CTL_REST_CLIENT_SPNEGO_HOST).orNull, cliConfig, map)
7172
kyuubiRestClient = KyuubiRestClient.builder(hostUrl)
7273
.apiVersion(KyuubiRestClient.ApiVersion.valueOf(version))
7374
.authHeaderMethod(KyuubiRestClient.AuthHeaderMethod.SPNEGO)

kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/delete/DeleteBatchCommand.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,32 @@ package org.apache.kyuubi.ctl.cmd.delete
1818

1919
import org.apache.kyuubi.client.BatchRestApi
2020
import org.apache.kyuubi.client.util.JsonUtil
21-
import org.apache.kyuubi.ctl.CliConfig
21+
import org.apache.kyuubi.ctl.{CliConfig, ControlCliException}
2222
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
2323
import org.apache.kyuubi.ctl.cmd.Command
24+
import org.apache.kyuubi.ctl.util.BatchUtil
2425

2526
class DeleteBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
26-
27-
var result: String = null
28-
2927
def validate(): Unit = {}
3028

3129
def run(): Unit = {
3230
withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
3331
val batchRestApi: BatchRestApi = new BatchRestApi(kyuubiRestClient)
32+
val batchId = normalizedCliConfig.batchOpts.batchId
33+
34+
val result = batchRestApi.deleteBatch(batchId, normalizedCliConfig.batchOpts.hs2ProxyUser)
3435

35-
val result = batchRestApi.deleteBatch(
36-
normalizedCliConfig.batchOpts.batchId,
37-
normalizedCliConfig.batchOpts.hs2ProxyUser)
3836
info(JsonUtil.toJson(result))
37+
38+
if (!result.isSuccess) {
39+
val batch = batchRestApi.getBatchById(batchId)
40+
if (!BatchUtil.isTerminalState(batch.getState)) {
41+
error(s"Failed to delete batch $batchId, its current state is ${batch.getState}")
42+
throw ControlCliException(1)
43+
} else {
44+
warn(s"Batch $batchId is already in terminal state ${batch.getState}.")
45+
}
46+
}
3947
}
4048
}
4149

kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, OperationLog}
2323
import org.apache.kyuubi.ctl.CliConfig
2424
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
2525
import org.apache.kyuubi.ctl.cmd.Command
26+
import org.apache.kyuubi.ctl.util.BatchUtil
2627

2728
class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
2829

@@ -59,8 +60,7 @@ class LogBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
5960
Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
6061

6162
batch = batchRestApi.getBatchById(batchId)
62-
if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
63-
&& batch.getState() != "RUNNING") {
63+
if (log.getLogRowSet.size() == 0 && BatchUtil.isTerminalState(batch.getState)) {
6464
done = true
6565
}
6666
}

kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/submit/SubmitBatchCommand.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ import scala.collection.JavaConverters._
2222

2323
import org.apache.kyuubi.client.BatchRestApi
2424
import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest, OperationLog}
25-
import org.apache.kyuubi.ctl.CliConfig
25+
import org.apache.kyuubi.client.util.JsonUtil
26+
import org.apache.kyuubi.ctl.{CliConfig, ControlCliException}
2627
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
2728
import org.apache.kyuubi.ctl.cmd.Command
28-
import org.apache.kyuubi.ctl.util.{CtlUtils, Validator}
29+
import org.apache.kyuubi.ctl.util.{BatchUtil, CtlUtils, Validator}
2930

3031
class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
3132

@@ -65,12 +66,17 @@ class SubmitBatchCommand(cliConfig: CliConfig) extends Command(cliConfig) {
6566
Thread.sleep(DEFAULT_LOG_QUERY_INTERVAL)
6667

6768
batch = batchRestApi.getBatchById(batchId)
68-
if (log.getLogRowSet.size() == 0 && batch.getState() != "PENDING"
69-
&& batch.getState() != "RUNNING") {
69+
if (log.getLogRowSet.size() == 0 && BatchUtil.isTerminalState(batch.getState)) {
7070
done = true
7171
}
7272
}
73+
74+
if (BatchUtil.isTerminalState(batch.getState) && !BatchUtil.isFinishedState(batch.getState)) {
75+
error(s"Batch $batchId failed: ${JsonUtil.toJson(batch)}")
76+
throw ControlCliException(1)
77+
} else {
78+
info(s"Batch report for $batchId: ${JsonUtil.toJson(batch)}")
79+
}
7380
}
7481
}
75-
7682
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.ctl.util
19+
20+
import java.util.Locale
21+
22+
object BatchUtil {
23+
private val PENDING_STATE = "PENDING"
24+
private val RUNNING_STATE = "RUNNING"
25+
private val FINISHED_STATE = "FINISHED"
26+
private val ERROR_STATE = "ERROR"
27+
private val CANCELED_STATE = "CANCELED"
28+
private val terminalBatchStates = Seq(FINISHED_STATE, ERROR_STATE, CANCELED_STATE)
29+
30+
def isPendingState(state: String): Boolean = {
31+
PENDING_STATE.equalsIgnoreCase(state)
32+
}
33+
34+
def isRunningState(state: String): Boolean = {
35+
RUNNING_STATE.equalsIgnoreCase(state)
36+
}
37+
38+
def isFinishedState(state: String): Boolean = {
39+
FINISHED_STATE.equalsIgnoreCase(state)
40+
}
41+
42+
def isTerminalState(state: String): Boolean = {
43+
state != null && terminalBatchStates.contains(state.toUpperCase(Locale.ROOT))
44+
}
45+
}

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/RestClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private String doRequest(URI uri, String authHeader, RequestBuilder requestBuild
111111
.setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
112112
.build();
113113

114-
LOG.info("Executing {} request: {}", httpRequest.getMethod(), uri);
114+
LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
115115

116116
ResponseHandler<String> responseHandler =
117117
resp -> {
@@ -126,7 +126,7 @@ private String doRequest(URI uri, String authHeader, RequestBuilder requestBuild
126126
};
127127

128128
response = httpclient.execute(httpRequest, responseHandler);
129-
LOG.info("Response: {}", response);
129+
LOG.debug("Response: {}", response);
130130
} catch (ConnectException | ConnectTimeoutException e) {
131131
// net exception can be retried by connecting to other Kyuubi server
132132
throw new KyuubiRetryableException("Api request failed for " + uri.toString(), e);

kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/util/JsonUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,17 @@ public static String toJson(Object object) {
2828
try {
2929
return MAPPER.writeValueAsString(object);
3030
} catch (Exception e) {
31-
throw new KyuubiRestException("Failed to convert object to json", e);
31+
throw new KyuubiRestException(
32+
String.format("Failed to convert object(%s) to json", object), e);
3233
}
3334
}
3435

3536
public static <T> T toObject(String json, Class<T> clazz) {
3637
try {
3738
return MAPPER.readValue(json, clazz);
3839
} catch (Exception e) {
39-
throw new KyuubiRestException("Failed to convert json string to object", e);
40+
throw new KyuubiRestException(
41+
String.format("Failed to convert json string(%s) to %s", json, clazz.getName()), e);
4042
}
4143
}
4244
}

0 commit comments

Comments
 (0)