Skip to content

Commit e725725

Browse files
turboFeiulysses-you
authored andcommitted
[KYUUBI #2373][SUB-TASK][KPIP-4] Support to recovery batch session on Kyuubi instances restart
### _Why are the changes needed?_ To close #2373 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2790 from turboFei/recovery_batch. Closes #2373 8163219 [Fei Wang] trigger test 6ac6f9b [Fei Wang] async recovery 6d9edf4 [Fei Wang] comments daa6719 [Fei Wang] refactor 0c3a2e1 [Fei Wang] remove waitAppCompletion 04e139e [Fei Wang] comments b5d11d8 [Fei Wang] comment for method name 321dfa9 [Fei Wang] refactor 90be2df [Fei Wang] address comments 1440958 [Fei Wang] batch recovery Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent afb08c7 commit e725725

File tree

11 files changed

+357
-45
lines changed

11 files changed

+357
-45
lines changed

docs/deployment/settings.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,8 @@ kyuubi.server.state.store.jdbc.password||The password for server jdbc state stor
360360
kyuubi.server.state.store.jdbc.url|jdbc:derby:memory:kyuubi_state_store_db;create=true|The jdbc url for server jdbc state store. By defaults, it is a DERBY in-memory database url, and the state information is not shared across kyuubi instances. To enable multiple kyuubi instances high available, please specify a production jdbc url.|string|1.6.0
361361
kyuubi.server.state.store.jdbc.user||The username for server jdbc state store.|string|1.6.0
362362
kyuubi.server.state.store.max.age|PT72H|The maximum age of state info in state store.|duration|1.6.0
363+
kyuubi.server.state.store.sessions.recovery.num.threads|10|The number of threads for sessions recovery from state store.|int|1.6.0
364+
kyuubi.server.state.store.sessions.recovery.per.batch|100|The number of sessions to recover from state store per batch.|int|1.6.0
363365

364366

365367
### Session

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,20 @@ object KyuubiConf {
884884
.timeConf
885885
.createWithDefault(Duration.ofMinutes(30).toMillis)
886886

887+
val SERVER_STATE_STORE_SESSIONS_RECOVERY_PER_BATCH: ConfigEntry[Int] =
888+
buildConf("kyuubi.server.state.store.sessions.recovery.per.batch")
889+
.doc("The number of sessions to recover from state store per batch.")
890+
.version("1.6.0")
891+
.intConf
892+
.createWithDefault(100)
893+
894+
val SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS: ConfigEntry[Int] =
895+
buildConf("kyuubi.server.state.store.sessions.recovery.num.threads")
896+
.doc("The number of threads for sessions recovery from state store.")
897+
.version("1.6.0")
898+
.intConf
899+
.createWithDefault(10)
900+
887901
val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
888902
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
889903
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +

kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.util
1919

20-
import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
20+
import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
2121

2222
import scala.concurrent.Awaitable
2323
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -53,6 +53,11 @@ object ThreadUtils extends Logging {
5353
executor
5454
}
5555

56+
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
57+
val threadFactory = new NamedThreadFactory(prefix, daemon = true)
58+
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
59+
}
60+
5661
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
5762
try {
5863
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.nio.file.{Files, Path, Paths}
2424

2525
import scala.collection.JavaConverters._
2626

27+
import com.google.common.annotations.VisibleForTesting
2728
import com.google.common.collect.EvictingQueue
2829
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
2930

@@ -152,6 +153,8 @@ trait ProcBuilder {
152153
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
153154
private var logCaptureThread: Thread = _
154155
private var process: Process = _
156+
@VisibleForTesting
157+
@volatile private[kyuubi] var processLaunched: Boolean = _
155158

156159
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
157160
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -191,6 +194,7 @@ trait ProcBuilder {
191194

192195
final def start: Process = synchronized {
193196
process = processBuilder.start()
197+
processLaunched = true
194198
val reader = Files.newBufferedReader(engineLog.toPath, StandardCharsets.UTF_8)
195199

196200
val redirect: Runnable = { () =>

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@ import java.util.concurrent.TimeUnit
2525
import scala.collection.JavaConverters._
2626

2727
import com.codahale.metrics.MetricRegistry
28+
import com.google.common.annotations.VisibleForTesting
2829
import org.apache.hive.service.rpc.thrift._
2930

3031
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
3132
import org.apache.kyuubi.config.KyuubiConf
3233
import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
34+
import org.apache.kyuubi.engine.ApplicationOperation._
3335
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
3436
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
3537
import org.apache.kyuubi.metrics.MetricsSystem
3638
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
3739
import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState}
3840
import org.apache.kyuubi.operation.log.OperationLog
41+
import org.apache.kyuubi.server.statestore.api.SessionMetadata
3942
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
4043
import org.apache.kyuubi.util.ThriftUtils
4144

@@ -58,7 +61,8 @@ class BatchJobSubmission(
5861
resource: String,
5962
className: String,
6063
batchConf: Map[String, String],
61-
batchArgs: Seq[String])
64+
batchArgs: Seq[String],
65+
recoveryMetadata: Option[SessionMetadata])
6266
extends KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
6367

6468
override def statement: String = "BATCH_JOB_SUBMISSION"
@@ -76,7 +80,8 @@ class BatchJobSubmission(
7680
private var killMessage: KillResponse = (false, "UNKNOWN")
7781
def getKillMessage: KillResponse = killMessage
7882

79-
private val builder: ProcBuilder = {
83+
@VisibleForTesting
84+
private[kyuubi] val builder: ProcBuilder = {
8085
Option(batchType).map(_.toUpperCase(Locale.ROOT)) match {
8186
case Some("SPARK") =>
8287
new SparkBatchProcessBuilder(
@@ -143,7 +148,22 @@ class BatchJobSubmission(
143148
val asyncOperation: Runnable = () => {
144149
setStateIfNotCanceled(OperationState.RUNNING)
145150
try {
146-
submitBatchJob()
151+
// If it is in recovery mode, only re-submit batch job if previous state is PENDING and
152+
// fail to fetch the status including appId from resource manager. Otherwise, monitor the
153+
// submitted batch application.
154+
recoveryMetadata.map { metadata =>
155+
if (metadata.state == OperationState.PENDING.toString) {
156+
applicationStatus = currentApplicationState
157+
applicationStatus.map(_.get(APP_ID_KEY)).map {
158+
case Some(appId) => monitorBatchJob(appId)
159+
case None => submitAndMonitorBatchJob()
160+
}
161+
} else {
162+
monitorBatchJob(metadata.engineId)
163+
}
164+
}.getOrElse {
165+
submitAndMonitorBatchJob()
166+
}
147167
setStateIfNotCanceled(OperationState.FINISHED)
148168
} catch {
149169
onError()
@@ -169,10 +189,15 @@ class BatchJobSubmission(
169189
s.contains("KILLED") || s.contains("FAILED"))
170190
}
171191

172-
private def submitBatchJob(): Unit = {
192+
private def applicationTerminated(applicationStatus: Option[Map[String, String]]): Boolean = {
193+
applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
194+
s.contains("KILLED") || s.contains("FAILED") || s.contains("FINISHED"))
195+
}
196+
197+
private def submitAndMonitorBatchJob(): Unit = {
173198
var appStatusFirstUpdated = false
174199
try {
175-
info(s"Submitting $batchType batch job: $builder")
200+
info(s"Submitting $batchType batch[$batchId] job: $builder")
176201
val process = builder.start
177202
applicationStatus = currentApplicationState
178203
while (!applicationFailed(applicationStatus) && process.isAlive) {
@@ -192,12 +217,46 @@ class BatchJobSubmission(
192217
if (process.exitValue() != 0) {
193218
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
194219
}
220+
221+
applicationStatus.map(_.get(APP_ID_KEY)).map {
222+
case Some(appId) => monitorBatchJob(appId)
223+
case _ =>
224+
}
195225
}
196226
} finally {
197227
builder.close()
198228
}
199229
}
200230

231+
private def monitorBatchJob(appId: String): Unit = {
232+
info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
233+
if (applicationStatus.isEmpty) {
234+
applicationStatus = currentApplicationState
235+
}
236+
if (applicationStatus.isEmpty) {
237+
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
238+
} else if (applicationFailed(applicationStatus)) {
239+
throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
240+
applicationStatus.get.mkString(","))
241+
} else {
242+
// TODO: add limit for max batch job submission lifetime
243+
while (applicationStatus.isDefined && !applicationTerminated(applicationStatus)) {
244+
Thread.sleep(applicationCheckInterval)
245+
val newApplicationStatus = currentApplicationState
246+
if (newApplicationStatus != applicationStatus) {
247+
applicationStatus = newApplicationStatus
248+
info(s"Batch report for $batchId" +
249+
applicationStatus.map(_.mkString("(", ",", ")")).getOrElse("()"))
250+
}
251+
}
252+
253+
if (applicationFailed(applicationStatus)) {
254+
throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
255+
applicationStatus.get.mkString(","))
256+
}
257+
}
258+
}
259+
201260
def getOperationLogRowSet(
202261
order: FetchOrientation,
203262
from: Int,

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
2626
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
2727
import org.apache.kyuubi.metrics.MetricsSystem
2828
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
29+
import org.apache.kyuubi.server.statestore.api.SessionMetadata
2930
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, Session}
3031
import org.apache.kyuubi.util.ThriftUtils
3132

@@ -69,15 +70,17 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
6970
resource: String,
7071
className: String,
7172
batchConf: Map[String, String],
72-
batchArgs: Seq[String]): BatchJobSubmission = {
73+
batchArgs: Seq[String],
74+
recoveryMetadata: Option[SessionMetadata]): BatchJobSubmission = {
7375
val operation = new BatchJobSubmission(
7476
session,
7577
batchType,
7678
batchName,
7779
resource,
7880
className,
7981
batchConf,
80-
batchArgs)
82+
batchArgs,
83+
recoveryMetadata)
8184
addOperation(operation)
8285
operation
8386
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,24 @@
1818
package org.apache.kyuubi.server
1919

2020
import java.util.EnumSet
21-
import java.util.concurrent.atomic.AtomicBoolean
21+
import java.util.concurrent.Future
22+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2223
import javax.servlet.DispatcherType
2324

25+
import com.google.common.annotations.VisibleForTesting
2426
import org.apache.hadoop.conf.Configuration
2527
import org.eclipse.jetty.servlet.FilterHolder
2628

2729
import org.apache.kyuubi.{KyuubiException, Utils}
2830
import org.apache.kyuubi.config.KyuubiConf
29-
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
31+
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT, SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS}
3032
import org.apache.kyuubi.server.api.v1.ApiRootResource
3133
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
3234
import org.apache.kyuubi.server.ui.JettyServer
3335
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
3436
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
37+
import org.apache.kyuubi.session.KyuubiSessionManager
38+
import org.apache.kyuubi.util.ThreadUtils
3539

3640
/**
3741
* A frontend service based on RESTful api via HTTP protocol.
@@ -46,6 +50,8 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
4650

4751
private def hadoopConf: Configuration = KyuubiServer.getHadoopConf()
4852

53+
private def sessionManager = be.sessionManager.asInstanceOf[KyuubiSessionManager]
54+
4955
override def initialize(conf: KyuubiConf): Unit = synchronized {
5056
val host = conf.get(FRONTEND_REST_BIND_HOST)
5157
.getOrElse(Utils.findLocalInetAddress.getHostAddress)
@@ -71,10 +77,50 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
7177
server.addRedirectHandler("/docs", "/swagger")
7278
}
7379

80+
@VisibleForTesting
81+
private[kyuubi] def recoverBatchSessions(): Unit = {
82+
val recoveryNumThreads = conf.get(SERVER_STATE_STORE_SESSIONS_RECOVERY_NUM_THREADS)
83+
val batchRecoveryExecutor =
84+
ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, "batch-recovery-executor")
85+
try {
86+
val batchSessionsToRecover = sessionManager.getBatchSessionsToRecover(connectionUrl)
87+
val pendingRecoveryTasksCount = new AtomicInteger(0)
88+
val tasks = batchSessionsToRecover.flatMap { batchSession =>
89+
val batchId = batchSession.batchJobSubmissionOp.batchId
90+
try {
91+
val task: Future[Unit] = batchRecoveryExecutor.submit(() =>
92+
Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession)))
93+
Some(task -> batchId)
94+
} catch {
95+
case e: Throwable =>
96+
error(s"Error while submitting batch[$batchId] for recovery", e)
97+
None
98+
}
99+
}
100+
101+
pendingRecoveryTasksCount.addAndGet(tasks.size)
102+
103+
tasks.foreach { case (task, batchId) =>
104+
try {
105+
task.get()
106+
} catch {
107+
case e: Throwable =>
108+
error(s"Error while recovering batch[$batchId]", e)
109+
} finally {
110+
val pendingTasks = pendingRecoveryTasksCount.decrementAndGet()
111+
info(s"Batch[$batchId] recovery task terminated, current pending tasks $pendingTasks")
112+
}
113+
}
114+
} finally {
115+
ThreadUtils.shutdown(batchRecoveryExecutor)
116+
}
117+
}
118+
74119
override def start(): Unit = synchronized {
75120
if (!isStarted.get) {
76121
try {
77122
server.start()
123+
recoverBatchSessions()
78124
isStarted.set(true)
79125
info(s"$getName has started at ${server.getServerUri}")
80126
startInternal()

kyuubi-server/src/main/scala/org/apache/kyuubi/server/statestore/SessionStateStore.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,24 @@ class SessionStateStore extends AbstractService("SessionStateStore") {
8383
true).map(buildBatch)
8484
}
8585

86+
def getBatchesRecoveryMetadata(
87+
state: String,
88+
kyuubiInstance: String,
89+
from: Int,
90+
size: Int): Seq[SessionMetadata] = {
91+
_stateStore.getMetadataList(
92+
SessionType.BATCH,
93+
null,
94+
null,
95+
state,
96+
kyuubiInstance,
97+
0,
98+
0,
99+
from,
100+
size,
101+
false)
102+
}
103+
86104
def updateBatchMetadata(
87105
batchId: String,
88106
state: String,

0 commit comments

Comments
 (0)