Skip to content

Commit

Permalink
[KYUUBI #5315][FLINK] Propagate HIVE_CONF_DIR on launching Flink engine
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

create a Hive catalog in Flink SQL Client as follows:

```
CREATE CATALOG myhive WITH (
  'type' = 'hive',
  'hive-conf-dir' = '/opt/hive-conf'
)
```
we should propagate `hive-conf-dir` and pass `HIVE_CONF_DIR` as an env variable to the AppMaster

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No.

Closes #5316 from hadoopkandy/KYUUBI-5315.

Closes #5315

3f12acb [kandy01.wang] [KYUUBI #5315] [Improvement] Propagate HIVE_CONF_DIR on launching Flink engine

Authored-by: kandy01.wang <kandy01.wang@vipshop.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
hadoopkandy authored and pan3793 committed Sep 21, 2023
1 parent 167e6c1 commit ffebc64
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,27 @@ class FlinkProcessBuilder(
val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
userJars.foreach(jars => flinkExtraJars ++= jars.split(","))

val hiveConfDirOpt = env.get("HIVE_CONF_DIR")
hiveConfDirOpt.foreach { hiveConfDir =>
val hiveConfFile = Paths.get(hiveConfDir).resolve("hive-site.xml")
if (!Files.exists(hiveConfFile)) {
throw new KyuubiException(s"The file $hiveConfFile does not exists. " +
s"Please put hive-site.xml when HIVE_CONF_DIR env $hiveConfDir is configured.")
}
flinkExtraJars += s"$hiveConfFile"
}

buffer += "-t"
buffer += "yarn-application"
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."

hiveConfDirOpt.foreach { _ =>
buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
}

val customFlinkConf = conf.getAllWithPrefix("flink", "")
customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
buffer += s"-D$k=$v"
Expand Down Expand Up @@ -166,6 +180,7 @@ class FlinkProcessBuilder(
env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
env.get("HIVE_CONF_DIR").foreach(classpathEntries.add)
val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
hadoopCp.foreach(classpathEntries.add)
val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,18 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "usrlib")).toFile
private val tempUdfJar =
Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar"))
private val tempHiveDir =
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "hive-conf")).toFile
Files.createFile(Paths.get(tempHiveDir.toPath.toString, "hive-site.xml"))

private def envDefault: ListMap[String, String] = ListMap(
"JAVA_HOME" -> s"${File.separator}jdk",
"FLINK_HOME" -> s"${tempFlinkHome.toPath}")
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") +
("HIVE_CONF_DIR" -> s"$tempHiveDir")
private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
Expand All @@ -89,10 +93,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
val expectedCommands =
escapePaths(s"${builder.flinkExecutable} run-application ") +
s"-t yarn-application " +
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar " +
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar" +
s";.*\\/hive-site\\.xml " +
s"-Dyarn\\.application\\.name=kyuubi_.* " +
s"-Dyarn\\.tags=KYUUBI " +
s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
s"-Dcontainerized\\.master\\.env\\.HIVE_CONF_DIR=\\. " +
s"-Dexecution.target=yarn-application " +
s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " +
s".*kyuubi-flink-sql-engine_.*jar" +
Expand Down Expand Up @@ -151,9 +157,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
matchActualAndExpectedSessionMode(builder)
}

test("application mode - default env") {
test("application mode - all hadoop related environment variables are configured") {
val builder = new FlinkProcessBuilder("paullam", applicationModeConf) {
override def env: Map[String, String] = envDefault
override def env: Map[String, String] = envWithAllHadoop
}
matchActualAndExpectedApplicationMode(builder)
}
Expand Down

0 comments on commit ffebc64

Please sign in to comment.