Skip to content

Commit 3b04d99

Browse files
jiaoqingboyaooqinn
authored andcommitted
[KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable
…xecutable ### _Why are the changes needed?_ fix #2346 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2418 from jiaoqingbo/kyuubi2346. Closes #2346 eda5d38 [jiaoqingbo] after code review 720fede [jiaoqingbo] execute source config.sh and fix ut failed and delete debug log 1b53a9b [jiaoqingbo] Merge branch 'master' into kyuubi2346 0c7eb5e [jiaoqingbo] add hadoop classpath to UT ff2bb14 [jiaoqingbo] Merge branch 'master' into kyuubi2346 c9e8019 [jiaoqingbo] change ut 308ae5f [jiaoqingbo] code review f3eb068 [jiaoqingbo] fix ui failed 4e6d168 [jiaoqingbo] spoltless apply 9eae557 [jiaoqingbo] fix ut failed bc00e69 [jiaoqingbo] delete flink-sql-engine.sh d6b87b9 [jiaoqingbo] delete childProcEnv 3aa8738 [jiaoqingbo] [KYUUBI #2346] [Improvement] Simplify FlinkProcessBuilder with java executable Authored-by: jiaoqingbo <1178404354@qq.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent c969433 commit 3b04d99

File tree

5 files changed

+166
-112
lines changed

5 files changed

+166
-112
lines changed

build/dist

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ mkdir -p "$DISTDIR/pid"
214214
mkdir -p "$DISTDIR/logs"
215215
mkdir -p "$DISTDIR/work"
216216
mkdir -p "$DISTDIR/externals/engines/flink"
217-
mkdir -p "$DISTDIR/externals/engines/flink/lib"
218217
mkdir -p "$DISTDIR/externals/engines/spark"
219218
mkdir -p "$DISTDIR/externals/engines/trino"
220219
mkdir -p "$DISTDIR/externals/engines/hive"
@@ -245,9 +244,7 @@ done
245244
cd -
246245

247246
# Copy flink engines
248-
cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/bin/" "$DISTDIR/externals/engines/flink/bin/"
249-
chmod a+x "$DISTDIR/externals/engines/flink/bin/flink-sql-engine.sh"
250-
cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink/lib"
247+
cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink"
251248

252249
# Copy spark engines
253250
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/spark"

externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh

Lines changed: 0 additions & 69 deletions
This file was deleted.

integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,33 @@
1717

1818
package org.apache.kyuubi.it.flink.operation
1919

20+
import java.io.File
21+
import java.nio.file.Paths
22+
23+
import org.apache.kyuubi.{HADOOP_COMPILE_VERSION, SCALA_COMPILE_VERSION, Utils}
2024
import org.apache.kyuubi.config.KyuubiConf
21-
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
25+
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX}
2226
import org.apache.kyuubi.it.flink.WithKyuubiServerAndFlinkMiniCluster
2327
import org.apache.kyuubi.operation.HiveJDBCTestHelper
2428
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
2529

2630
class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJDBCTestHelper {
31+
val kyuubiHome: String = Utils.getCodeSourceLocation(getClass).split("integration-tests")(0)
32+
val hadoopClasspath: String = Paths.get(
33+
kyuubiHome,
34+
"externals",
35+
"kyuubi-flink-sql-engine",
36+
"target",
37+
s"scala-$SCALA_COMPILE_VERSION",
38+
"jars").toAbsolutePath.toString
2739
override val conf: KyuubiConf = KyuubiConf()
2840
.set(ENGINE_TYPE, "FLINK_SQL")
2941
.set("flink.parallelism.default", "6")
42+
.set(
43+
s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
44+
s"$hadoopClasspath${File.separator}" +
45+
s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
46+
s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")
3047

3148
override protected def jdbcUrl: String = getJdbcUrl
3249

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ package org.apache.kyuubi.engine.flink
1919

2020
import java.io.{File, FilenameFilter}
2121
import java.nio.file.Paths
22+
import java.util.LinkedHashSet
23+
24+
import scala.collection.JavaConverters._
25+
import scala.collection.mutable.ArrayBuffer
2226

2327
import com.google.common.annotations.VisibleForTesting
2428

2529
import org.apache.kyuubi._
2630
import org.apache.kyuubi.config.KyuubiConf
31+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
2732
import org.apache.kyuubi.engine.ProcBuilder
28-
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder.FLINK_ENGINE_BINARY_FILE
2933
import org.apache.kyuubi.operation.log.OperationLog
3034

3135
/**
@@ -37,43 +41,75 @@ class FlinkProcessBuilder(
3741
val extraEngineLog: Option[OperationLog] = None)
3842
extends ProcBuilder with Logging {
3943

40-
override protected def executable: String = {
41-
val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
42-
val cwd = Utils.getCodeSourceLocation(getClass)
43-
.split("kyuubi-server")
44-
assert(cwd.length > 1)
45-
Option(
46-
Paths.get(cwd.head)
47-
.resolve("externals")
48-
.resolve("kyuubi-flink-sql-engine")
49-
.toFile)
50-
.map(_.getAbsolutePath)
51-
}
52-
53-
flinkEngineHomeOpt.map { dir =>
54-
Paths.get(dir, "bin", FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
55-
} getOrElse {
56-
throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
57-
"For more detail information on installing and configuring Flink, please visit " +
58-
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
59-
}
60-
}
61-
6244
override protected def module: String = "kyuubi-flink-sql-engine"
6345

6446
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
6547

6648
override protected def childProcEnv: Map[String, String] = conf.getEnvs +
6749
("FLINK_HOME" -> FLINK_HOME) +
6850
("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
69-
("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
70-
("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
71-
conf.getAll.filter { case (k, _) =>
72-
k.startsWith("kyuubi.") || k.startsWith("flink.") ||
73-
k.startsWith("hadoop.") || k.startsWith("yarn.")
74-
}.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
75-
76-
override protected def commands: Array[String] = Array(executable)
51+
("_FLINK_HOME_DETERMINED" -> s"1")
52+
53+
override protected def commands: Array[String] = {
54+
val buffer = new ArrayBuffer[String]()
55+
buffer += s"bash"
56+
buffer += s"-c"
57+
val commandStr = new StringBuilder()
58+
59+
commandStr.append(s"source $FLINK_HOME${File.separator}bin" +
60+
s"${File.separator}config.sh && $executable")
61+
62+
// TODO: How shall we deal with proxyUser,
63+
// user.name
64+
// kyuubi.session.user
65+
// or just leave it, because we can handle it at operation layer
66+
commandStr.append(s" -D$KYUUBI_SESSION_USER_KEY=$proxyUser ")
67+
68+
// TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or kyuubi.engine.flink.memory to configure
69+
// -Xmx5g
70+
// java options
71+
val confStr = conf.getAll.filter { case (k, _) =>
72+
k.startsWith("kyuubi.") || k.startsWith("flink.") ||
73+
k.startsWith("hadoop.") || k.startsWith("yarn.")
74+
}.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
75+
commandStr.append(confStr)
76+
77+
commandStr.append(" -cp ")
78+
val classpathEntries = new LinkedHashSet[String]
79+
// flink engine runtime jar
80+
mainResource.foreach(classpathEntries.add)
81+
// flink sql client jar
82+
val flinkSqlClientPath = Paths.get(FLINK_HOME)
83+
.resolve("opt")
84+
.toFile
85+
.listFiles(new FilenameFilter {
86+
override def accept(dir: File, name: String): Boolean = {
87+
name.toLowerCase.startsWith("flink-sql-client")
88+
}
89+
}).head.getAbsolutePath
90+
classpathEntries.add(flinkSqlClientPath)
91+
92+
// jars from flink lib
93+
classpathEntries.add(s"$FLINK_HOME${File.separator}lib${File.separator}*")
94+
95+
// classpath contains flink configurations, default to flink.home/conf
96+
classpathEntries.add(env.getOrElse("FLINK_CONF_DIR", s"$FLINK_HOME${File.separator}conf"))
97+
// classpath contains hadoop configurations
98+
env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
99+
env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
100+
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
101+
val hadoopClasspath = env.get("HADOOP_CLASSPATH")
102+
if (hadoopClasspath.isEmpty) {
103+
throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
104+
"For more detail information on configuring HADOOP_CLASSPATH" +
105+
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
106+
}
107+
classpathEntries.add(hadoopClasspath.get)
108+
commandStr.append(classpathEntries.asScala.mkString(File.pathSeparator))
109+
commandStr.append(s" $mainClass")
110+
buffer += commandStr.toString()
111+
buffer.toArray
112+
}
77113

78114
@VisibleForTesting
79115
def FLINK_HOME: String = {
@@ -112,6 +148,4 @@ class FlinkProcessBuilder(
112148
object FlinkProcessBuilder {
113149
final val APP_KEY = "yarn.application.name"
114150
final val TAG_KEY = "yarn.tags"
115-
116-
final private val FLINK_ENGINE_BINARY_FILE = "flink-sql-engine.sh"
117151
}

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,90 @@
1717

1818
package org.apache.kyuubi.engine.flink
1919

20-
import org.apache.kyuubi.KyuubiFunSuite
20+
import java.io.File
21+
22+
import scala.collection.JavaConverters._
23+
import scala.collection.immutable.ListMap
24+
25+
import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite, KyuubiSQLException, SCALA_COMPILE_VERSION}
2126
import org.apache.kyuubi.config.KyuubiConf
2227

2328
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
2429
private def conf = KyuubiConf().set("kyuubi.on", "off")
30+
private def envDefault: ListMap[String, String] = ListMap(
31+
"JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
32+
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
33+
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
34+
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
35+
("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
36+
private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
37+
("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
38+
private def confStr: String = {
39+
conf.getAll.filter { case (k, _) =>
40+
k.startsWith("kyuubi.") || k.startsWith("flink.") ||
41+
k.startsWith("hadoop.") || k.startsWith("yarn.")
42+
}.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
43+
}
44+
private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
45+
val actualCommands = builder.toString
46+
val classpathStr: String = constructClasspathStr(builder)
47+
val expectedCommands = s"bash -c source ${builder.FLINK_HOME}" +
48+
s"${File.separator}bin${File.separator}config.sh && $javaPath " +
49+
s"-Dkyuubi.session.user=vinoyang $confStr" +
50+
s" -cp $classpathStr $mainClassStr"
51+
info(s"\n\n actualCommands $actualCommands")
52+
info(s"\n\n expectedCommands $expectedCommands")
53+
assert(actualCommands.equals(expectedCommands))
54+
}
55+
56+
private def constructClasspathStr(builder: FlinkProcessBuilder) = {
57+
val classpathEntries = new java.util.LinkedHashSet[String]
58+
builder.mainResource.foreach(classpathEntries.add)
59+
val flinkSqlClientJarPath = s"${builder.FLINK_HOME}$flinkSqlClientJarPathSuffix"
60+
val flinkLibPath = s"${builder.FLINK_HOME}$flinkLibPathSuffix"
61+
val flinkConfPath = s"${builder.FLINK_HOME}$flinkConfPathSuffix"
62+
classpathEntries.add(flinkSqlClientJarPath)
63+
classpathEntries.add(flinkLibPath)
64+
classpathEntries.add(flinkConfPath)
65+
val envMethod = classOf[FlinkProcessBuilder].getDeclaredMethod("env")
66+
envMethod.setAccessible(true)
67+
val envMap = envMethod.invoke(builder).asInstanceOf[Map[String, String]]
68+
envMap.foreach { case (k, v) =>
69+
if (!k.equals("JAVA_HOME")) {
70+
classpathEntries.add(v)
71+
}
72+
}
73+
val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
74+
classpathStr
75+
}
76+
77+
private val javaPath = s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
78+
private val flinkSqlClientJarPathSuffix = s"${File.separator}opt${File.separator}" +
79+
s"flink-sql-client_$SCALA_COMPILE_VERSION-$FLINK_COMPILE_VERSION.jar"
80+
private val flinkLibPathSuffix = s"${File.separator}lib${File.separator}*"
81+
private val flinkConfPathSuffix = s"${File.separator}conf"
82+
private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
83+
84+
test("all hadoop related environment variables are configured") {
85+
val builder = new FlinkProcessBuilder("vinoyang", conf) {
86+
override protected def env: Map[String, String] = envWithAllHadoop
87+
88+
}
89+
compareActualAndExpected(builder)
90+
}
91+
92+
test("all hadoop related environment variables are configured except HADOOP_CLASSPATH") {
93+
val builder = new FlinkProcessBuilder("vinoyang", conf) {
94+
override def env: Map[String, String] = envWithoutHadoopCLASSPATH
95+
}
96+
assertThrows[KyuubiSQLException](builder.toString)
97+
}
2598

26-
test("flink engine process builder") {
27-
val builder = new FlinkProcessBuilder("vinoyang", conf)
28-
val commands = builder.toString.split(' ')
29-
assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
99+
test("only HADOOP_CLASSPATH environment variables are configured") {
100+
val builder = new FlinkProcessBuilder("vinoyang", conf) {
101+
override def env: Map[String, String] = envDefault +
102+
("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
103+
}
104+
compareActualAndExpected(builder)
30105
}
31106
}

0 commit comments

Comments
 (0)