Skip to content

Commit cad0bcd

Browse files
committed
[KYUUBI #2500][FOLLOWUP] Resolve flink conf at engine side
### _Why are the changes needed?_ removes hacky method calling from `flink-sql-client` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2572 from yaooqinn/2500. Closes #2500 5531145 [Kent Yao] yarn test cb7e08f [Kent Yao] wip e696bdf [Kent Yao] wip 318b762 [Kent Yao] wip Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 9d60495 commit cad0bcd

File tree

3 files changed

+16
-10
lines changed

3 files changed

+16
-10
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.kyuubi.engine.flink
1919

2020
import java.io.File
2121
import java.net.URL
22+
import java.nio.file.Paths
2223
import java.time.Instant
2324
import java.util.concurrent.CountDownLatch
2425

2526
import scala.collection.JavaConverters._
2627
import scala.collection.mutable.ListBuffer
2728

28-
import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI, GenericCLI}
29+
import org.apache.flink.client.cli.{DefaultCLI, GenericCLI}
2930
import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration}
3031
import org.apache.flink.table.client.SqlClientException
3132
import org.apache.flink.table.client.gateway.context.DefaultContext
@@ -74,7 +75,17 @@ object FlinkSQLEngine extends Logging {
7475

7576
try {
7677
Utils.fromCommandLineArgs(args, kyuubiConf)
77-
val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
78+
val flinkConfDir = sys.env.getOrElse(
79+
"FLINK_CONF_DIR", {
80+
val flinkHome = sys.env.getOrElse(
81+
"FLINK_HOME", {
82+
// detect the FLINK_HOME by flink-core*.jar location if unset
83+
val jarLoc =
84+
classOf[GlobalConfiguration].getProtectionDomain.getCodeSource.getLocation
85+
new File(jarLoc.toURI).getParentFile.getParent
86+
})
87+
Paths.get(flinkHome, "conf").toString
88+
})
7889
val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
7990
val flinkConfFromArgs =
8091
kyuubiConf.getAll.filterKeys(_.startsWith("flink."))
@@ -106,7 +117,7 @@ object FlinkSQLEngine extends Logging {
106117
val engineContext = new DefaultContext(
107118
dependencies.asJava,
108119
flinkConf,
109-
List[CustomCommandLine](new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava)
120+
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava)
110121

111122
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
112123

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ trait ProcBuilder {
103103

104104
protected def env: Map[String, String] = conf.getEnvs
105105

106-
protected def childProcEnv: Map[String, String] = env
107-
108106
protected val extraEngineLog: Option[OperationLog]
109107

110108
protected val workingDir: Path = {
@@ -137,7 +135,7 @@ trait ProcBuilder {
137135
val pb = new ProcessBuilder(commands: _*)
138136

139137
val envs = pb.environment()
140-
envs.putAll(childProcEnv.asJava)
138+
envs.putAll(env.asJava)
141139
pb.directory(workingDir.toFile)
142140
pb.redirectError(engineLog)
143141
pb.redirectOutput(engineLog)

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,14 @@ class FlinkProcessBuilder(
3737
override val proxyUser: String,
3838
override val conf: KyuubiConf,
3939
val extraEngineLog: Option[OperationLog] = None)
40-
extends ProcBuilder with Logging {
40+
extends ProcBuilder {
4141

4242
private val flinkHome: String = getEngineHome(shortName)
4343

4444
override protected def module: String = "kyuubi-flink-sql-engine"
4545

4646
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
4747

48-
override protected def childProcEnv: Map[String, String] = conf.getEnvs +
49-
("FLINK_CONF_DIR" -> env.getOrElse("FLINK_CONF_DIR", s"$flinkHome${File.separator}conf"))
50-
5148
override protected def commands: Array[String] = {
5249
val buffer = new ArrayBuffer[String]()
5350
buffer += executable

0 commit comments

Comments
 (0)