Skip to content

Commit

Permalink
[SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeo…
Browse files Browse the repository at this point in the history
…utChecker thread properly upon shutdown

### What changes were proposed in this pull request?

This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark thrift server.

### Why are the changes needed?

When stopping the HiveServer2, the non-daemon thread stops the server from terminating

```sql
"HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178)
	at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

Here is an example to reproduce:
https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala

Also, it causes issues as HIVE-14817 described which

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?

Passing Jenkins

Closes #28870 from yaooqinn/SPARK-32034.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 9f8e15b)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Jun 21, 2020
1 parent 2f3618c commit c11078b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,20 @@ public synchronized void start() {
}
}

private final Object timeoutCheckerLock = new Object();

private void startTimeoutChecker() {
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
Runnable timeoutChecker = new Runnable() {
final Runnable timeoutChecker = new Runnable() {
@Override
public void run() {
for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
sleepFor(interval);
while (!shutdown) {
long current = System.currentTimeMillis();
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
if (shutdown) {
break;
}
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
SessionHandle handle = session.getSessionHandle();
Expand All @@ -170,24 +176,34 @@ public void run() {
session.closeExpiredOperations();
}
}
sleepFor(interval);
}
}

private void sleepInterval(long interval) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
private void sleepFor(long interval) {
synchronized (timeoutCheckerLock) {
try {
timeoutCheckerLock.wait(interval);
} catch (InterruptedException e) {
// Ignore, and break.
}
}
}
};
backgroundOperationPool.execute(timeoutChecker);
}

private void shutdownTimeoutChecker() {
shutdown = true;
synchronized (timeoutCheckerLock) {
timeoutCheckerLock.notify();
}
}

@Override
public synchronized void stop() {
super.stop();
shutdown = true;
shutdownTimeoutChecker();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
long timeout = hiveConf.getTimeVar(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,20 @@ public synchronized void start() {
}
}

private final Object timeoutCheckerLock = new Object();

private void startTimeoutChecker() {
final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds
Runnable timeoutChecker = new Runnable() {
final Runnable timeoutChecker = new Runnable() {
@Override
public void run() {
for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
sleepFor(interval);
while (!shutdown) {
long current = System.currentTimeMillis();
for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
if (shutdown) {
break;
}
if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current
&& (!checkOperation || session.getNoOperationTime() > sessionTimeout)) {
SessionHandle handle = session.getSessionHandle();
Expand All @@ -170,24 +176,34 @@ public void run() {
session.closeExpiredOperations();
}
}
sleepFor(interval);
}
}

private void sleepInterval(long interval) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
private void sleepFor(long interval) {
synchronized (timeoutCheckerLock) {
try {
timeoutCheckerLock.wait(interval);
} catch (InterruptedException e) {
// Ignore, and break.
}
}
}
};
backgroundOperationPool.execute(timeoutChecker);
}

private void shutdownTimeoutChecker() {
shutdown = true;
synchronized (timeoutCheckerLock) {
timeoutCheckerLock.notify();
}
}

@Override
public synchronized void stop() {
super.stop();
shutdown = true;
shutdownTimeoutChecker();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
long timeout = hiveConf.getTimeVar(
Expand Down

0 comments on commit c11078b

Please sign in to comment.