Skip to content

Commit f629992

Browse files
iodoneulysses-you
authored andcommitted
[KYUUBI #2927] Fix the thread in ScheduleThreadExecutorPool can't be shutdown immediately
fix #2927 ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2928 from iodone/dev-2. Closes #2927 a5cb190 [odone] [KYUUBI #2927] fixed Authored-by: odone <odone.zhang@gmail.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent 125730a commit f629992

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
5454
protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
5555
if (queryTimeout > 0) {
5656
val timeoutExecutor =
57-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
57+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
5858
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
5959
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
6060
statementTimeoutCleaner = Some(timeoutExecutor)

kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ import org.apache.kyuubi.{KyuubiException, Logging}
2626

2727
object ThreadUtils extends Logging {
2828

29-
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
29+
def newDaemonSingleThreadScheduledExecutor(
30+
threadName: String,
31+
executeExistingDelayedTasksAfterShutdown: Boolean = true): ScheduledExecutorService = {
3032
val threadFactory = new NamedThreadFactory(threadName, daemon = true)
3133
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
3234
executor.setRemoveOnCancelPolicy(true)
3335
executor
36+
.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdown)
37+
executor
3438
}
3539

3640
def newDaemonQueuedThreadPool(

kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.kyuubi.KyuubiFunSuite
2323

2424
class ThreadUtilsSuite extends KyuubiFunSuite {
2525

26-
test("New daemon single thread scheduled executor") {
26+
test("New daemon single thread scheduled executor for shutdown") {
2727
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
2828
@volatile var threadName = ""
2929
service.submit(new Runnable {
@@ -35,4 +35,30 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
3535
service.awaitTermination(10, TimeUnit.SECONDS)
3636
assert(threadName startsWith "ThreadUtilsTest")
3737
}
38+
39+
test("New daemon single thread scheduled executor for shutdownNow") {
40+
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest")
41+
@volatile var threadName = ""
42+
service.submit(new Runnable {
43+
override def run(): Unit = {
44+
threadName = Thread.currentThread().getName
45+
}
46+
})
47+
service.shutdownNow()
48+
service.awaitTermination(10, TimeUnit.SECONDS)
49+
assert(threadName startsWith "")
50+
}
51+
52+
test("New daemon single thread scheduled executor for cancel delayed tasks") {
53+
val service = ThreadUtils.newDaemonSingleThreadScheduledExecutor("ThreadUtilsTest", false)
54+
@volatile var threadName = ""
55+
service.submit(new Runnable {
56+
override def run(): Unit = {
57+
threadName = Thread.currentThread().getName
58+
}
59+
})
60+
service.shutdown()
61+
service.awaitTermination(10, TimeUnit.SECONDS)
62+
assert(threadName startsWith "")
63+
}
3864
}

0 commit comments

Comments
 (0)