Skip to content

Commit 9518724

Browse files
SteNicholasulysses-you
authored andcommitted
[KYUUBI #2054] [KYUUBI-1819] Support closing Flink SQL engine process
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Support closing Flink SQL engine process. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2054 from SteNicholas/KYUUBI-1819. Closes #2054 992bd0b [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine process 6956712 [SteNicholas] [KYUUBI-1819] Support closing Flink SQL engine process Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent b1e949d commit 9518724

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.kyuubi.engine.flink
1919

2020
import java.io.{File, FilenameFilter}
21+
import java.lang.ProcessBuilder.Redirect
2122
import java.net.URI
2223
import java.nio.file.{Files, Paths}
2324

25+
import scala.collection.JavaConverters._
26+
2427
import com.google.common.annotations.VisibleForTesting
2528

2629
import org.apache.kyuubi._
@@ -99,6 +102,29 @@ class FlinkProcessBuilder(
99102

100103
override protected def commands: Array[String] = Array(executable)
101104

105+
override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
106+
"Job ID: .*".r.findFirstIn(line) match {
107+
case Some(jobIdLine) =>
108+
val jobId = jobIdLine.split("Job ID: ")(1).trim
109+
env.get("FLINK_HOME") match {
110+
case Some(flinkHome) =>
111+
val pb = new ProcessBuilder("/bin/sh", s"$flinkHome/bin/flink", "stop", jobId)
112+
pb.environment()
113+
.putAll(childProcEnv.asJava)
114+
pb.redirectError(Redirect.appendTo(engineLog))
115+
pb.redirectOutput(Redirect.appendTo(engineLog))
116+
val process = pb.start()
117+
process.waitFor() match {
118+
case id if id != 0 => s"Failed to kill Application $jobId, please kill it manually. "
119+
case _ => s"Killed Application $jobId successfully. "
120+
}
121+
case None =>
122+
s"FLINK_HOME is not set! Failed to kill Application $jobId, please kill it manually."
123+
}
124+
case None => ""
125+
}
126+
}
127+
102128
override def toString: String = commands.map {
103129
case arg if arg.startsWith("--") => s"\\\n\t$arg"
104130
case arg => arg

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,25 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
2828
val commands = builder.toString.split(' ')
2929
assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
3030
}
31+
32+
test("kill application") {
33+
val processBuilder = new FakeFlinkProcessBuilder(conf) {
34+
override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
35+
}
36+
val exit1 = processBuilder.killApplication(
37+
"""
38+
|[INFO] SQL update statement has been successfully submitted to the cluster:
39+
|Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
40+
|""".stripMargin)
41+
assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
42+
&& !exit1.contains("FLINK_HOME is not set!"))
43+
44+
val exit2 = processBuilder.killApplication("unknow")
45+
assert(exit2.equals(""))
46+
}
47+
}
48+
49+
class FakeFlinkProcessBuilder(config: KyuubiConf)
50+
extends FlinkProcessBuilder("fake", config) {
51+
override protected def commands: Array[String] = Array("ls")
3152
}

0 commit comments

Comments
 (0)