From 17df0844d06c5104f45735cf35dc45c0549253b3 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 29 Apr 2024 11:05:22 +0800 Subject: [PATCH] fix test and add flink constant --- .../org/apache/kyuubi/engine/KyuubiApplicationManager.scala | 3 ++- .../apache/kyuubi/engine/flink/FlinkProcessBuilder.scala | 6 +++--- .../kyuubi/engine/flink/FlinkProcessBuilderSuite.scala | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 2e3d3c17ea0..7cf68cefa88 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -115,7 +115,8 @@ object KyuubiApplicationManager { } private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = { - val originalTag = conf.getOption(s"flink.${FlinkProcessBuilder.YARN_TAG_KEY}") + val originalTag = conf + .getOption(s"${FlinkProcessBuilder.FLINK}.${FlinkProcessBuilder.YARN_TAG_KEY}") .orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY)) .map(_ + ",").getOrElse("") val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index cf6c0c862f4..5cbc7bd5203 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -52,7 +52,7 @@ class FlinkProcessBuilder( val flinkHome: String = getEngineHome(shortName) val flinkExecutable: String = { - Paths.get(flinkHome, "bin", FLINK_EXEC_FILE).toFile.getCanonicalPath + Paths.get(flinkHome, "bin", FLINK).toFile.getCanonicalPath } // flink.execution.target are required in Kyuubi conf currently @@ -115,7 +115,7 @@ class FlinkProcessBuilder( flinkExtraJars += s"$hiveConfFile" } - val customFlinkConf = conf.getAllWithPrefix("flink", "") + val customFlinkConf = conf.getAllWithPrefix(FLINK, "") // add custom yarn.ship-files flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY) val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY) @@ -220,7 +220,7 @@ class FlinkProcessBuilder( } object FlinkProcessBuilder { - final val FLINK_EXEC_FILE = "flink" + final val FLINK = "flink" final val APP_KEY = "flink.app.name" final val YARN_TAG_KEY = "yarn.tags" final val YARN_SHIP_FILES_KEY = "yarn.ship-files" diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 575430468b5..952f71c087b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -183,9 +183,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { // scalastyle:off line.size.limit val expectedCommands = escapePaths( - s""".*flink run-application \\\\ + s"""${builder.flinkExecutable} run-application \\\\ |\\t-t yarn-application \\\\ - |\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;.*test-udf.jar;.*hive-site.xml;$customShipFiles \\\\ + |\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles \\\\ |\\t-Dyarn.application.name=$customAppName \\\\ |\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\ |\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\