Skip to content

Commit b6d5c64

Browse files
committed
[KYUUBI #2353] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl
### _Why are the changes needed?_ To close #2306 and close #2307 In this PR, I implement BatchJobSubmission operation and introduce basic `KyuubiBatchSessionImpl`. TODO: - Normalize/validate the batch request - batch request fields - merge with server pre-defined batch conf ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2353 from turboFei/KPIP_4_batch_submission_op. Closes #2353 9bc6050 [Fei Wang] remove unused conf ef8e962 [Fei Wang] check application in current thread 8738660 [Fei Wang] dedup code 27f2200 [Fei Wang] use static secret id instead of conf 6794ff7 [Fei Wang] Use Seq instead of java.util.List 2f4f9b1 [Fei Wang] Remove BatchType enumaration 7d38080 [Fei Wang] remove dead code a94a9e6 [Fei Wang] remove jars,files fileds on BatchRequest 6021a1e [Fei Wang] add ut for result set 07a939c [Fei Wang] refactor long line a918a49 [Fei Wang] address comments 73229e7 [Fei Wang] set engine max life time bbe3f1f [Fei Wang] unique the application cehcker thread pool 9643e42 [Fei Wang] refactor 11dd71f [Fei Wang] add KyuubiBatchYarnClusterSuite 1216991 [Fei Wang] add ut for batch session 47da8c1 [Fei Wang] add open batch session api 6dcf60d [Fei Wang] add ut for static batch secret id a212e62 [Fei Wang] [SUB-TASK][KPIP-4] Implement BatchJobSubmission operation and basic KyuubiBatchSessionImpl Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
1 parent 65a272f commit b6d5c64

File tree

11 files changed

+450
-14
lines changed

11 files changed

+450
-14
lines changed

docs/deployment/settings.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ Key | Default | Meaning | Type | Since
154154
<code>kyuubi.backend.server.exec.pool.wait.queue.size</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool of Kyuubi server</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
155155

156156

157+
### Batch
158+
159+
Key | Default | Meaning | Type | Since
160+
--- | --- | --- | --- | ---
161+
<code>kyuubi.batch.application.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check batch job application information.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
162+
163+
157164
### Credentials
158165

159166
Key | Default | Meaning | Type | Since

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,13 @@ object KyuubiConf {
806806
.booleanConf
807807
.createWithDefault(true)
808808

809+
val BATCH_APPLICATION_CHECK_INTERVAL: ConfigEntry[Long] =
810+
buildConf("kyuubi.batch.application.check.interval")
811+
.doc("The interval to check batch job application information.")
812+
.version("1.6.0")
813+
.timeConf
814+
.createWithDefaultString("PT5S")
815+
809816
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
810817
buildConf("kyuubi.backend.server.exec.pool.size")
811818
.doc("Number of threads in the operation execution thread pool of Kyuubi server")

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class SparkBatchProcessBuilder(
4747
buffer += mainClass
4848

4949
val batchJobTag = batchRequest.conf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
50-
val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag)
50+
51+
val allConf = batchRequest.conf ++ Map(TAG_KEY -> batchJobTag) ++ sparkAppNameConf()
5152

5253
allConf.foreach { case (k, v) =>
5354
buffer += CONF
@@ -59,15 +60,21 @@ class SparkBatchProcessBuilder(
5960

6061
mainResource.foreach { r => buffer += r }
6162

62-
batchRequest.args.asScala.foreach { arg => buffer += arg }
63+
batchRequest.args.foreach { arg => buffer += arg }
6364

6465
buffer.toArray
6566
}
6667

68+
private def sparkAppNameConf(): Map[String, String] = {
69+
Option(batchRequest.name).filterNot(_.isEmpty).map { appName =>
70+
Map(APP_KEY -> appName)
71+
}.getOrElse(Map())
72+
}
73+
6774
override protected def module: String = "kyuubi-spark-batch-submit"
6875

6976
private[kyuubi] def getApplicationIdAndUrl(): Option[(String, String)] = {
70-
batchRequest.conf.get("spark.master") match {
77+
batchRequest.conf.get(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
7178
case Some("yarn") =>
7279
val yarnClient = getYarnClient
7380
val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.engine.spark
1919

20-
import java.io.IOException
20+
import java.io.{File, IOException}
2121
import java.nio.file.Paths
2222

2323
import scala.collection.mutable.ArrayBuffer
@@ -164,11 +164,20 @@ class SparkProcessBuilder(
164164
}
165165

166166
override protected def shortName: String = "spark"
167+
168+
protected def getSparkDefaultsConf(): Map[String, String] = {
169+
val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
170+
.orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
171+
.map(_ + File.separator + SPARK_CONF_FILE_NAME)
172+
.map(new File(_)).filter(_.exists())
173+
Utils.getPropertiesFromFile(sparkDefaultsConfFile)
174+
}
167175
}
168176

169177
object SparkProcessBuilder {
170178
final val APP_KEY = "spark.app.name"
171179
final val TAG_KEY = "spark.yarn.tags"
180+
final val MASTER_KEY = "spark.master"
172181

173182
final private[spark] val CONF = "--conf"
174183
final private[spark] val CLASS = "--class"
@@ -178,4 +187,7 @@ object SparkProcessBuilder {
178187
final private[spark] val KEYTAB = "spark.kerberos.keytab"
179188
// Get the appropriate spark-submit file
180189
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
190+
final private val SPARK_HOME = "SPARK_HOME"
191+
final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
192+
final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
181193
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.operation
19+
20+
import java.nio.ByteBuffer
21+
import java.util.{ArrayList => JArrayList, Locale}
22+
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.hive.service.rpc.thrift._
26+
27+
import org.apache.kyuubi.KyuubiException
28+
import org.apache.kyuubi.config.KyuubiConf
29+
import org.apache.kyuubi.engine.ProcBuilder
30+
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
31+
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
32+
import org.apache.kyuubi.operation.log.OperationLog
33+
import org.apache.kyuubi.server.api.v1.BatchRequest
34+
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
35+
import org.apache.kyuubi.util.ThriftUtils
36+
37+
class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchRequest)
38+
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
39+
40+
override def statement: String = "BATCH_JOB_SUBMISSION"
41+
42+
override def shouldRunAsync: Boolean = true
43+
44+
private lazy val _operationLog = OperationLog.createOperationLog(session, getHandle)
45+
46+
private var builder: ProcBuilder = _
47+
48+
@volatile
49+
private[kyuubi] var appIdAndUrl: Option[(String, String)] = None
50+
51+
private var resultFetched: Boolean = _
52+
53+
private val applicationCheckInterval =
54+
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
55+
56+
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
57+
58+
override protected def beforeRun(): Unit = {
59+
OperationLog.setCurrentOperationLog(_operationLog)
60+
setHasResultSet(false)
61+
setState(OperationState.PENDING)
62+
}
63+
64+
override protected def afterRun(): Unit = {
65+
OperationLog.removeCurrentOperationLog()
66+
}
67+
68+
override protected def runInternal(): Unit = {
69+
val asyncOperation: Runnable = () => {
70+
setState(OperationState.RUNNING)
71+
try {
72+
submitBatchJob()
73+
setState(OperationState.FINISHED)
74+
} catch onError()
75+
}
76+
try {
77+
val opHandle = session.sessionManager.submitBackgroundOperation(asyncOperation)
78+
setBackgroundHandle(opHandle)
79+
} catch onError("submitting batch job submission operation in background, request rejected")
80+
}
81+
82+
private def submitBatchJob(): Unit = {
83+
builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
84+
case Some("SPARK") =>
85+
new SparkBatchProcessBuilder(
86+
session.user,
87+
session.sessionConf,
88+
session.batchId,
89+
batchRequest,
90+
getOperationLog)
91+
92+
case _ =>
93+
throw new UnsupportedOperationException(s"Batch type ${batchRequest.batchType} unsupported")
94+
}
95+
96+
try {
97+
info(s"Submitting ${batchRequest.batchType} batch job: $builder")
98+
val process = builder.start
99+
while (appIdAndUrl.isEmpty) {
100+
try {
101+
builder match {
102+
case sparkBatchProcessBuilder: SparkBatchProcessBuilder =>
103+
sparkBatchProcessBuilder.getApplicationIdAndUrl() match {
104+
case Some(appInfo) => appIdAndUrl = Some(appInfo)
105+
case _ =>
106+
}
107+
108+
case _ =>
109+
}
110+
} catch {
111+
case e: Exception => error(s"Failed to check batch application", e)
112+
}
113+
Thread.sleep(applicationCheckInterval)
114+
}
115+
process.waitFor()
116+
if (process.exitValue() != 0) {
117+
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
118+
}
119+
} finally {
120+
builder.close()
121+
}
122+
}
123+
124+
override def getResultSetSchema: TTableSchema = {
125+
val schema = new TTableSchema()
126+
Seq("ApplicationId", "URL").zipWithIndex.foreach { case (colName, position) =>
127+
val tColumnDesc = new TColumnDesc()
128+
tColumnDesc.setColumnName(colName)
129+
val tTypeDesc = new TTypeDesc()
130+
tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(new TPrimitiveTypeEntry(TTypeId.STRING_TYPE)))
131+
tColumnDesc.setTypeDesc(tTypeDesc)
132+
tColumnDesc.setPosition(position)
133+
schema.addToColumns(tColumnDesc)
134+
}
135+
schema
136+
}
137+
138+
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
139+
validateDefaultFetchOrientation(order)
140+
assertState(OperationState.FINISHED)
141+
setHasResultSet(true)
142+
order match {
143+
case FETCH_NEXT => fetchNext()
144+
case FETCH_PRIOR => resultSet
145+
case FETCH_FIRST => resultSet
146+
}
147+
}
148+
149+
private lazy val resultSet: TRowSet = {
150+
val tRow = new TRowSet(0, new JArrayList[TRow](1))
151+
val (appId, url) = appIdAndUrl.toSeq.unzip
152+
153+
val tAppIdColumn = TColumn.stringVal(new TStringColumn(
154+
appId.asJava,
155+
ByteBuffer.allocate(0)))
156+
157+
val tUrlColumn = TColumn.stringVal(new TStringColumn(
158+
url.asJava,
159+
ByteBuffer.allocate(0)))
160+
161+
tRow.addToColumns(tAppIdColumn)
162+
tRow.addToColumns(tUrlColumn)
163+
tRow
164+
}
165+
166+
private def fetchNext(): TRowSet = {
167+
if (!resultFetched) {
168+
resultFetched = true
169+
resultSet
170+
} else {
171+
ThriftUtils.EMPTY_ROW_SET
172+
}
173+
}
174+
175+
override def close(): Unit = {
176+
if (!isClosedOrCanceled) {
177+
if (builder != null) {
178+
builder.close()
179+
}
180+
}
181+
super.close()
182+
}
183+
}

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
2626
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
2727
import org.apache.kyuubi.metrics.MetricsSystem
2828
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
29-
import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
29+
import org.apache.kyuubi.server.api.v1.BatchRequest
30+
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
3031
import org.apache.kyuubi.util.ThriftUtils
3132

3233
class KyuubiOperationManager private (name: String) extends OperationManager(name) {
@@ -62,6 +63,14 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
6263
addOperation(operation)
6364
}
6465

66+
def newBatchJobSubmissionOperation(
67+
session: KyuubiBatchSessionImpl,
68+
batchRequest: BatchRequest): BatchJobSubmission = {
69+
val operation = new BatchJobSubmission(session, batchRequest)
70+
addOperation(operation)
71+
operation
72+
}
73+
6574
override def newGetTypeInfoOperation(session: Session): Operation = {
6675
val operation = new GetTypeInfo(session)
6776
addOperation(operation)

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,6 @@ case class Field(dataType: String, value: Any)
111111
* @param resource the main resource jar, required.
112112
* @param proxyUser the proxy user, optional.
113113
* @param className the main class name, required.
114-
* @param jars comma-separated list of jars to include, optional.
115-
* @param files comma-separated list of files to include, optional.
116114
* @param name a name of your batch job, optional.
117115
* @param conf arbitrary configuration properties, optional.
118116
* @param args comma-separated list of batch job arguments, optional.
@@ -122,8 +120,6 @@ case class BatchRequest(
122120
resource: String,
123121
proxyUser: String,
124122
className: String,
125-
jars: java.util.List[String],
126-
files: java.util.List[String],
127123
name: String,
128124
conf: Map[String, String],
129-
args: java.util.List[String])
125+
args: Seq[String])

0 commit comments

Comments
 (0)