Skip to content

Commit b47e1cd

Browse files
link3280yaooqinn
authored andcommitted
[KYUUBI #1825] Generate appName for Flink applications
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Generate Flink application names for per-job clusters and application-mode clusters with respect to the pattern in Spark engine. This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1827 from link3280/KYUUBI-1825. Closes #1825 c6670f9 [Paul Lin] [KYUUBI #1825] Remove `null` execution target case branch 1c093d6 [Paul Lin] [KYUUBI #1825] Set app name only if absent ee2845f [Paul Lin] [KYUUBI #1825] Add the default branch for pattern matching 3d9e092 [Paul Lin] [KYUUBI #1825] Generate appName for Flink applications Authored-by: Paul Lin <paullin3280@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 8bd7b5a commit b47e1cd

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
package org.apache.kyuubi.engine.flink
1919

20+
import java.time.Instant
2021
import java.util.concurrent.CountDownLatch
2122

2223
import scala.collection.JavaConverters._
2324

2425
import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI}
25-
import org.apache.flink.configuration.GlobalConfiguration
26+
import org.apache.flink.configuration.{DeploymentOptions, GlobalConfiguration}
2627
import org.apache.flink.table.client.gateway.context.DefaultContext
2728

2829
import org.apache.kyuubi.Logging
29-
import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY}
30+
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
3031
import org.apache.kyuubi.config.KyuubiConf
3132
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
3233
import org.apache.kyuubi.service.Serverable
@@ -62,6 +63,8 @@ object FlinkSQLEngine extends Logging {
6263
val kyuubiConf: KyuubiConf = KyuubiConf()
6364
var currentEngine: Option[FlinkSQLEngine] = None
6465

66+
private val user = currentUser
67+
6568
private val countDownLatch = new CountDownLatch(1)
6669

6770
def main(args: Array[String]): Unit = {
@@ -72,6 +75,24 @@ object FlinkSQLEngine extends Logging {
7275
try {
7376
val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
7477
val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
78+
79+
val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
80+
// set cluster name for per-job and application mode
81+
executionTarget match {
82+
case "yarn-per-job" | "yarn-application" =>
83+
if (!flinkConf.containsKey("yarn.application.name")) {
84+
val appName = s"kyuubi_${user}_flink_${Instant.now}"
85+
flinkConf.setString("yarn.application.name", appName)
86+
}
87+
case "kubernetes-application" =>
88+
if (!flinkConf.containsKey("kubernetes.cluster-id")) {
89+
val appName = s"kyuubi-${user}-flink-${Instant.now}"
90+
flinkConf.setString("kubernetes.cluster-id", appName)
91+
}
92+
case other =>
93+
debug(s"Skip generating app name for execution target $other")
94+
}
95+
7596
val engineContext = new DefaultContext(
7697
List.empty.asJava,
7798
flinkConf,

0 commit comments

Comments
 (0)