Skip to content

Commit dfd7124

Browse files
pan3793ulysses-you
authored andcommitted
[KYUUBI #1988] Tune initialization of vars depend on system level confs
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> The `new SparkConf()` and `KyuubiConf()` executes early than UT, and can not re-initialize after changing system properties, which makes conf overwriting does not work. After the change, the `sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)` work as expected when we overwriting `spark.sql.catalogImplementation` in UT, and Hudi uses `in-memory` catalog to create table. <img width="891" alt="Xnip2022-03-02_19-10-06" src="https://user-images.githubusercontent.com/26535726/156351805-bb403ee8-0b89-4db2-b91f-d7b2b9957838.png"> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1995 from pan3793/lazy. Closes #1988 c264a22 [Cheng Pan] Fix d1a23fb [Cheng Pan] Lazy initialize vars depends on system level confs Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent b99a224 commit dfd7124

File tree

5 files changed

+22
-40
lines changed

5 files changed

+22
-40
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,41 +71,48 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
7171

7272
object SparkSQLEngine extends Logging {
7373

74-
val sparkConf: SparkConf = new SparkConf()
74+
private var _sparkConf: SparkConf = _
7575

76-
val kyuubiConf: KyuubiConf = KyuubiConf()
76+
private var _kyuubiConf: KyuubiConf = _
77+
78+
def kyuubiConf: KyuubiConf = _kyuubiConf
7779

7880
var currentEngine: Option[SparkSQLEngine] = None
7981

80-
private val user = currentUser
82+
private lazy val user = currentUser
8183

8284
private val countDownLatch = new CountDownLatch(1)
8385

86+
SignalRegister.registerLogger(logger)
87+
setupConf()
88+
8489
def setupConf(): Unit = {
85-
val rootDir = sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(sparkConf))
90+
_sparkConf = new SparkConf()
91+
_kyuubiConf = KyuubiConf()
92+
val rootDir = _sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf))
8693
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
87-
sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold", "10000")
88-
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
89-
sparkConf.setIfMissing("spark.master", "local")
90-
sparkConf.setIfMissing("spark.ui.port", "0")
94+
_sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold", "10000")
95+
_sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
96+
_sparkConf.setIfMissing("spark.master", "local")
97+
_sparkConf.setIfMissing("spark.ui.port", "0")
9198
// register the repl's output dir with the file server.
9299
// see also `spark.repl.classdir`
93-
sparkConf.set("spark.repl.class.outputDir", outputDir.toFile.getAbsolutePath)
94-
sparkConf.setIfMissing(
100+
_sparkConf.set("spark.repl.class.outputDir", outputDir.toFile.getAbsolutePath)
101+
_sparkConf.setIfMissing(
95102
"spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads",
96103
"20")
97104

98105
val appName = s"kyuubi_${user}_spark_${Instant.now}"
99-
sparkConf.setIfMissing("spark.app.name", appName)
106+
_sparkConf.setIfMissing("spark.app.name", appName)
100107
val defaultCat = if (KyuubiSparkUtil.hiveClassesArePresent) "hive" else "in-memory"
101-
sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)
108+
_sparkConf.setIfMissing("spark.sql.catalogImplementation", defaultCat)
102109

103110
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
104111
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
105112

106113
// Pass kyuubi config from spark with `spark.kyuubi`
107114
val sparkToKyuubiPrefix = "spark.kyuubi."
108-
sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
115+
_sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
109116
kyuubiConf.set(s"kyuubi.$k", v)
110117
}
111118

@@ -117,7 +124,7 @@ object SparkSQLEngine extends Logging {
117124
}
118125

119126
def createSpark(): SparkSession = {
120-
val session = SparkSession.builder.config(sparkConf).getOrCreate
127+
val session = SparkSession.builder.config(_sparkConf).getOrCreate
121128
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
122129
.foreach { sqlStr =>
123130
session.sparkContext.setJobGroup(
@@ -174,8 +181,6 @@ object SparkSQLEngine extends Logging {
174181
}
175182

176183
def main(args: Array[String]): Unit = {
177-
SignalRegister.registerLogger(logger)
178-
setupConf()
179184
val startedTime = System.currentTimeMillis()
180185
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
181186
case Some(t) => t.toLong

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
2828
protected var engine: SparkSQLEngine = _
2929
// conf will be loaded until start spark engine
3030
def withKyuubiConf: Map[String, String]
31-
val kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
31+
def kyuubiConf: KyuubiConf = SparkSQLEngine.kyuubiConf
3232

3333
protected var connectionUrl: String = _
3434

@@ -61,7 +61,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
6161
System.setProperty("spark.ui.enabled", "false")
6262
withKyuubiConf.foreach { case (k, v) =>
6363
System.setProperty(k, v)
64-
kyuubiConf.set(k, v)
6564
}
6665

6766
SparkSession.clearActiveSession()
@@ -82,7 +81,6 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
8281
// we need to clean up conf since it's the global config in same jvm.
8382
withKyuubiConf.foreach { case (k, _) =>
8483
System.clearProperty(k)
85-
kyuubiConf.unset(k)
8684
}
8785

8886
if (engine != null) {

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkDeltaOperationSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,4 @@ class SparkDeltaOperationSuite extends WithSparkSQLEngine
2929
override protected def jdbcUrl: String = getJdbcUrl
3030

3131
override def withKyuubiConf: Map[String, String] = extraConfigs
32-
33-
override def afterAll(): Unit = {
34-
super.afterAll()
35-
for ((k, _) <- withKyuubiConf) {
36-
System.clearProperty(k)
37-
}
38-
}
3932
}

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkHudiOperationSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,4 @@ class SparkHudiOperationSuite extends WithSparkSQLEngine with HudiMetadataTests
2727
override protected def jdbcUrl: String = getJdbcUrl
2828

2929
override def withKyuubiConf: Map[String, String] = extraConfigs
30-
31-
override def afterAll(): Unit = {
32-
super.afterAll()
33-
for ((k, _) <- withKyuubiConf) {
34-
System.clearProperty(k)
35-
}
36-
}
3730
}

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkIcebergOperationSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,4 @@ class SparkIcebergOperationSuite extends WithSparkSQLEngine
2929
override protected def jdbcUrl: String = getJdbcUrl
3030

3131
override def withKyuubiConf: Map[String, String] = extraConfigs
32-
33-
override def afterAll(): Unit = {
34-
super.afterAll()
35-
for ((k, _) <- withKyuubiConf) {
36-
System.clearProperty(k)
37-
}
38-
}
3932
}

0 commit comments

Comments
 (0)