Skip to content

Commit 010a34d

Browse files
cxzl25ulysses-you
authored andcommitted
[KYUUBI #2690] Make ProcessBuilder.commands immutable
### _Why are the changes needed?_ close #2690 When `SparkProcessBuilder` is constructed, the commands are generated according to `conf` and are immutable, so the tags will not take effect if they are set later. - Make several existing `ProcessBuilder` commands immutable. - Set tags when building commands. - Fix Flink tags may not be set to the right value. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2691 from cxzl25/KYUUBI-2690. Closes #2690 863e942 [sychen] make ProcBuilder.commands immutable 341796f [sychen] fix UT 9ca7da9 [sychen] use val commands 5ed10d4 [sychen] use TAG_KEY f4a830f [sychen] add construct for test a424b3d [sychen] SparkProcessBuilder commands immutable 70d0d7d [sychen] SparkProcessBuilder commands mutable Authored-by: sychen <sychen@ctrip.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 5585dd0 commit 010a34d

File tree

10 files changed

+75
-32
lines changed

10 files changed

+75
-32
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -167,21 +167,15 @@ private[kyuubi] class EngineRef(
167167
builder = engineType match {
168168
case SPARK_SQL =>
169169
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
170-
new SparkProcessBuilder(appUser, conf, extraEngineLog)
170+
new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
171171
case FLINK_SQL =>
172172
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
173-
new FlinkProcessBuilder(appUser, conf, extraEngineLog)
173+
new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
174174
case TRINO =>
175-
new TrinoProcessBuilder(appUser, conf, extraEngineLog)
175+
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
176176
case HIVE_SQL =>
177-
new HiveProcessBuilder(appUser, conf, extraEngineLog)
177+
new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
178178
}
179-
// TODO: Better to do this inside ProcBuilder
180-
KyuubiApplicationManager.tagApplication(
181-
engineRefId,
182-
builder.shortName,
183-
builder.clusterManager(),
184-
builder.conf)
185179

186180
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
187181
try {

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
2323
import scala.util.control.NonFatal
2424

2525
import org.apache.kyuubi.config.KyuubiConf
26+
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
27+
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
2628
import org.apache.kyuubi.service.AbstractService
2729

2830
class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") {
@@ -88,20 +90,19 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
8890

8991
object KyuubiApplicationManager {
9092
private def setupSparkYarnTag(tag: String, conf: KyuubiConf): Unit = {
91-
val originalTag = conf.getOption("spark.yarn.tags").map(_ + ",").getOrElse("")
92-
val newTag = s"${originalTag}KYUUBI,$tag"
93-
conf.set("spark.yarn.tags", newTag)
93+
val originalTag = conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("")
94+
val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
95+
conf.set(SparkProcessBuilder.TAG_KEY, newTag)
9496
}
9597

9698
private def setupSparkK8sTag(tag: String, conf: KyuubiConf): Unit = {
9799
conf.set("spark.kubernetes.driver.label.kyuubi_unique_tag", tag)
98100
}
99101

100102
private def setupFlinkK8sTag(tag: String, conf: KyuubiConf): Unit = {
101-
// TODO: yarn.tags or flink.yarn.tags, the mess of flink settings confuses me now.
102-
val originalTag = conf.getOption("yarn.tags").map(_ + ",")
103-
val newTag = s"${originalTag}KYUUBI,$tag"
104-
conf.set("yarn.tags", newTag)
103+
val originalTag = conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("")
104+
val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
105+
conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
105106
}
106107

107108
/**

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ trait ProcBuilder {
9797

9898
protected def proxyUser: String
9999

100-
protected def commands: Array[String]
100+
protected val commands: Array[String]
101101

102102
def conf: KyuubiConf
103103

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ import java.util.LinkedHashSet
2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable.ArrayBuffer
2626

27+
import com.google.common.annotations.VisibleForTesting
28+
2729
import org.apache.kyuubi._
2830
import org.apache.kyuubi.config.KyuubiConf
2931
import org.apache.kyuubi.config.KyuubiConf._
3032
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
31-
import org.apache.kyuubi.engine.ProcBuilder
33+
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
3234
import org.apache.kyuubi.operation.log.OperationLog
3335

3436
/**
@@ -37,9 +39,15 @@ import org.apache.kyuubi.operation.log.OperationLog
3739
class FlinkProcessBuilder(
3840
override val proxyUser: String,
3941
override val conf: KyuubiConf,
42+
val engineRefId: String,
4043
val extraEngineLog: Option[OperationLog] = None)
4144
extends ProcBuilder {
4245

46+
@VisibleForTesting
47+
def this(proxyUser: String, conf: KyuubiConf) {
48+
this(proxyUser, conf, "")
49+
}
50+
4351
private val flinkHome: String = getEngineHome(shortName)
4452

4553
private val FLINK_HADOOP_CLASSPATH: String = "FLINK_HADOOP_CLASSPATH"
@@ -48,7 +56,8 @@ class FlinkProcessBuilder(
4856

4957
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
5058

51-
override protected def commands: Array[String] = {
59+
override protected val commands: Array[String] = {
60+
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
5261
val buffer = new ArrayBuffer[String]()
5362
buffer += executable
5463

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,27 @@ import java.util.LinkedHashSet
2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable.ArrayBuffer
2626

27+
import com.google.common.annotations.VisibleForTesting
28+
2729
import org.apache.kyuubi._
2830
import org.apache.kyuubi.config.KyuubiConf
2931
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
3032
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
31-
import org.apache.kyuubi.engine.ProcBuilder
33+
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
3234
import org.apache.kyuubi.operation.log.OperationLog
3335

3436
class HiveProcessBuilder(
3537
override val proxyUser: String,
3638
override val conf: KyuubiConf,
39+
val engineRefId: String,
3740
val extraEngineLog: Option[OperationLog] = None)
3841
extends ProcBuilder with Logging {
3942

43+
@VisibleForTesting
44+
def this(proxyUser: String, conf: KyuubiConf) {
45+
this(proxyUser, conf, "")
46+
}
47+
4048
private val hiveHome: String = getEngineHome(shortName)
4149

4250
private val HIVE_HADOOP_CLASSPATH: String = "HIVE_HADOOP_CLASSPATH"
@@ -45,7 +53,8 @@ class HiveProcessBuilder(
4553

4654
override protected def mainClass: String = "org.apache.kyuubi.engine.hive.HiveSQLEngine"
4755

48-
override protected def commands: Array[String] = {
56+
override protected val commands: Array[String] = {
57+
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
4958
val buffer = new ArrayBuffer[String]()
5059
buffer += executable
5160

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class SparkBatchProcessBuilder(
3030
batchId: String,
3131
batchRequest: BatchRequest,
3232
override val extraEngineLog: Option[OperationLog])
33-
extends SparkProcessBuilder(proxyUser, conf, extraEngineLog) {
33+
extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) {
3434
import SparkProcessBuilder._
3535

3636
override def mainClass: String = batchRequest.getClassName

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,28 @@ import java.nio.file.Paths
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

25+
import com.google.common.annotations.VisibleForTesting
2526
import org.apache.hadoop.security.UserGroupInformation
2627

2728
import org.apache.kyuubi._
2829
import org.apache.kyuubi.config.KyuubiConf
29-
import org.apache.kyuubi.engine.ProcBuilder
30+
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
3031
import org.apache.kyuubi.ha.HighAvailabilityConf
3132
import org.apache.kyuubi.ha.client.AuthTypes
3233
import org.apache.kyuubi.operation.log.OperationLog
3334

3435
class SparkProcessBuilder(
3536
override val proxyUser: String,
3637
override val conf: KyuubiConf,
38+
val engineRefId: String,
3739
val extraEngineLog: Option[OperationLog] = None)
3840
extends ProcBuilder with Logging {
3941

42+
@VisibleForTesting
43+
def this(proxyUser: String, conf: KyuubiConf) {
44+
this(proxyUser, conf, "")
45+
}
46+
4047
import SparkProcessBuilder._
4148

4249
private val sparkHome = getEngineHome(shortName)
@@ -48,6 +55,7 @@ class SparkProcessBuilder(
4855
override def mainClass: String = "org.apache.kyuubi.engine.spark.SparkSQLEngine"
4956

5057
override protected val commands: Array[String] = {
58+
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
5159
val buffer = new ArrayBuffer[String]()
5260
buffer += executable
5361
buffer += CLASS

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,33 @@ import java.util.LinkedHashSet
2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable.ArrayBuffer
2626

27+
import com.google.common.annotations.VisibleForTesting
28+
2729
import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils}
2830
import org.apache.kyuubi.config.KyuubiConf
2931
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, ENGINE_TRINO_CONNECTION_URL, ENGINE_TRINO_EXTRA_CLASSPATH, ENGINE_TRINO_JAVA_OPTIONS, ENGINE_TRINO_MEMORY}
3032
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
31-
import org.apache.kyuubi.engine.ProcBuilder
33+
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
3234
import org.apache.kyuubi.operation.log.OperationLog
3335

3436
class TrinoProcessBuilder(
3537
override val proxyUser: String,
3638
override val conf: KyuubiConf,
37-
val extraEngineLog: Option[OperationLog] = None) extends ProcBuilder with Logging {
39+
val engineRefId: String,
40+
val extraEngineLog: Option[OperationLog] = None)
41+
extends ProcBuilder with Logging {
42+
43+
@VisibleForTesting
44+
def this(proxyUser: String, conf: KyuubiConf) {
45+
this(proxyUser, conf, "")
46+
}
3847

3948
override protected def module: String = "kyuubi-trino-engine"
4049

4150
override protected def mainClass: String = "org.apache.kyuubi.engine.trino.TrinoSqlEngine"
4251

43-
override protected def commands: Array[String] = {
52+
override protected val commands: Array[String] = {
53+
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
4454
require(
4555
conf.get(ENGINE_TRINO_CONNECTION_URL).nonEmpty,
4656
s"Trino server url can not be null! Please set ${ENGINE_TRINO_CONNECTION_URL.key}")

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
4242
private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
4343
("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
4444
private def confStr: String = {
45-
conf.getAll.map { case (k, v) => s"\\\n\t--conf $k=$v" }.mkString(" ")
45+
conf.clone.set("yarn.tags", "KYUUBI").getAll.map {
46+
case (k, v) => s"\\\n\t--conf $k=$v"
47+
}.mkString(" ")
4648
}
4749
private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
4850
val actualCommands = builder.toString
@@ -53,7 +55,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
5355
s"$confStr"
5456
info(s"\n\n actualCommands $actualCommands")
5557
info(s"\n\n expectedCommands $expectedCommands")
56-
assert(actualCommands.equals(expectedCommands))
58+
assert(actualCommands == expectedCommands)
5759
}
5860

5961
private def constructClasspathStr(builder: FlinkProcessBuilder) = {
@@ -100,10 +102,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
100102
}
101103

102104
test("all hadoop related environment variables are configured except FLINK_HADOOP_CLASSPATH") {
103-
val builder = new FlinkProcessBuilder("vinoyang", conf) {
105+
assertThrows[KyuubiException](new FlinkProcessBuilder("vinoyang", conf) {
104106
override def env: Map[String, String] = envWithoutHadoopCLASSPATH
105-
}
106-
assertThrows[KyuubiException](builder.toString)
107+
})
107108
}
108109

109110
test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark
2020
import java.io.File
2121
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
2222
import java.time.Duration
23+
import java.util.UUID
2324
import java.util.concurrent.{Executors, TimeUnit}
2425

2526
import org.scalatest.time.SpanSugar._
@@ -260,7 +261,17 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
260261

261262
val b1 = new SparkProcessBuilder("test", conf)
262263
assert(b1.toString.contains(s"--conf spark.files=$testKeytab"))
264+
}
263265

266+
test("SparkProcessBuilder commands immutable") {
267+
val conf = KyuubiConf(false)
268+
val engineRefId = UUID.randomUUID().toString
269+
val pb = new SparkProcessBuilder("", conf, engineRefId)
270+
assert(pb.toString.contains(engineRefId))
271+
val engineRefId2 = UUID.randomUUID().toString
272+
conf.set("spark.yarn.tags", engineRefId2)
273+
assert(!pb.toString.contains(engineRefId2))
274+
assert(pb.toString.contains(engineRefId))
264275
}
265276
}
266277

0 commit comments

Comments
 (0)