Skip to content

Commit

Permalink
FlinkProcessBuilder prioritizes user configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Apr 28, 2024
1 parent 5cbbdc3 commit ece91cc
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ object KyuubiApplicationManager {
}

private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + ",").getOrElse("")
val originalTag = conf.getOption(s"flink.${FlinkProcessBuilder.YARN_TAG_KEY}")
.orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY))
.map(_ + ",").getOrElse("")
val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,27 @@ class FlinkProcessBuilder(
flinkExtraJars += s"$hiveConfFile"
}

val customFlinkConf = conf.getAllWithPrefix("flink", "")
// add custom yarn.ship-files
flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
.orElse(conf.getOption(APP_KEY))
buffer += "-t"
buffer += "yarn-application"
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
buffer += s"-Dyarn.application.name=${yarnAppName.get}"
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."

hiveConfDirOpt.foreach { _ =>
buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
}

val customFlinkConf = conf.getAllWithPrefix("flink", "")
customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
customFlinkConf.filter {
c =>
!Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY, YARN_TAG_KEY).contains(
c._1)
}.foreach { case (k, v) =>
buffer += s"-D$k=$v"
}

Expand Down Expand Up @@ -215,6 +223,9 @@ object FlinkProcessBuilder {
final val FLINK_EXEC_FILE = "flink"
final val APP_KEY = "flink.app.name"
final val YARN_TAG_KEY = "yarn.tags"
final val YARN_SHIP_FILES_KEY = "yarn.ship-files"
final val YARN_APPLICATION_NAME_KEY = "yarn.application.name"

final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
}
matchActualAndExpectedApplicationMode(builder)
}

test("user configuration takes priority") {
val customShipFiles = "testFile1.jar;testFile2.jar"
val customAppName = "testAppName"
val customYarnTags = "testTag1,testTag2"
val builderConf = applicationModeConf
builderConf.set("flink.yarn.ship-files", customShipFiles)
builderConf.set("flink.yarn.application.name", customAppName)
builderConf.set("flink.yarn.tags", customYarnTags)
val builder = new FlinkProcessBuilder("test", true, builderConf) {
override def env: Map[String, String] = envWithAllHadoop
}
val actualCommands = builder.toString
// scalastyle:off line.size.limit
val expectedCommands =
escapePaths(
s""".*flink run-application \\\\
|\\t-t yarn-application \\\\
|\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;.*test-udf.jar;.*hive-site.xml;$customShipFiles \\\\
|\\t-Dyarn.application.name=$customAppName \\\\
|\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\
|\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\
|\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\
|\\t-Dexecution.target=yarn-application \\\\
|\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine .*kyuubi-flink-sql-engine_.*jar""".stripMargin +
"(?: \\\\\\n\\t--conf \\S+=\\S+)+")
// scalastyle:on line.size.limit
val regex = new Regex(expectedCommands)
val matcher = regex.pattern.matcher(actualCommands)
assert(matcher.matches())
}
}

0 comments on commit ece91cc

Please sign in to comment.