From 75fe12fdca03d400665dcebd80885877f1d28dfd Mon Sep 17 00:00:00 2001 From: Luan Date: Fri, 3 Jul 2020 14:30:25 +0800 Subject: [PATCH 1/8] [SPARK-26533][SQL] Support query auto timeout cancel on thriftserver --- .../SparkExecuteStatementOperation.scala | 22 ++++++++++++++++++- .../server/SparkSQLOperationManager.scala | 5 +++-- .../HiveThriftServer2Suites.scala | 10 +++++++++ .../service/cli/thrift/TOperationState.java | 5 ++++- .../hive/service/cli/OperationState.java | 3 ++- .../cli/operation/OperationManager.java | 2 +- .../service/cli/session/HiveSessionImpl.java | 2 +- .../cli/operation/OperationManager.java | 15 +++++-------- 8 files changed, 47 insertions(+), 17 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 922af72604027..91afb3e4b2777 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -42,7 +43,8 @@ private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - runInBackground: Boolean = true) + runInBackground: Boolean = true, + queryTimeout: Long) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with SparkOperation with Logging { @@ -198,6 +200,13 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run + if(queryTimeout > 0) { + Executors.newSingleThreadScheduledExecutor + .schedule(new Runnable { + override def run(): Unit = timeoutCancel() + }, queryTimeout, TimeUnit.SECONDS) + } + if (!runInBackground) { execute() } else { @@ -302,6 +311,17 @@ private[hive] class SparkExecuteStatementOperation( } } + def timeoutCancel(): Unit = { + synchronized { + if (!getStatus.getState.isTerminal) { + logInfo(s"Timeout and Cancel query with $statementId ") + cleanup() + setState(OperationState.TIMEDOUT) + HiveThriftServer2.eventManager.onStatementCanceled(statementId) + } + } + } + override protected def cleanup(): Unit = { if (runInBackground) { val backgroundHandle = getBackgroundHandle() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index bc9c13eb0d4f8..ba42eefed2a22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - async: Boolean): ExecuteStatementOperation = synchronized { + async: Boolean, + queryTimeout: Long): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sqlContext, parentSession, statement, confOverlay, runInBackground) + sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 3fd46dc82f03f..b35b13b6c0b2d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -874,6 +874,16 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(rs.getString(1) === expected.toString) } } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver") { + withJdbcStatement() { statement => + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 50000L)") + }.getMessage + assert(e.contains("Query timed out after")) + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java b/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java index 219866223a6b0..efbb412c794d0 100644 --- a/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java +++ b/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java @@ -19,7 +19,8 @@ public enum TOperationState implements org.apache.thrift.TEnum { CLOSED_STATE(4), ERROR_STATE(5), UKNOWN_STATE(6), - PENDING_STATE(7); + PENDING_STATE(7), + TIMEDOUT_STATE(8); private final int value; @@ -56,6 +57,8 @@ public static TOperationState findByValue(int value) { return UKNOWN_STATE; case 7: return PENDING_STATE; + case 8: + return TIMEDOUT_STATE; default: return null; } diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java index 1165180118413..e6da0138b5114 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java @@ -32,7 +32,8 @@ public enum OperationState { CLOSED(TOperationState.CLOSED_STATE, true), ERROR(TOperationState.ERROR_STATE, true), UNKNOWN(TOperationState.UKNOWN_STATE, false), - PENDING(TOperationState.PENDING_STATE, false); + PENDING(TOperationState.PENDING_STATE, false), + TIMEDOUT(TOperationState.TIMEDOUT_STATE, true); private final TOperationState tOperationState; private final boolean terminal; diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 92c340a29c107..1d756d22a639c 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -87,7 +87,7 @@ private void initOperationLogCapture(String loggingMode) { } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync) + String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index e3fb54d9f47e9..aa25f36b1cf1f 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -464,7 +464,7 @@ private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation - .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0); + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = + ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement, + confOverlay, runAsync, queryTimeout); addOperation(executeStatementOperation); return executeStatementOperation; } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync, long queryTimeout) - throws HiveSQLException { - return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); - } - public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); addOperation(operation); From 5dc76c634acce514765c1337904800e59cbaafda Mon Sep 17 00:00:00 2001 From: Luan Date: Fri, 3 Jul 2020 14:57:45 +0800 Subject: [PATCH 2/8] do not change TOperationState in hive 1.2 --- .../org/apache/hive/service/cli/thrift/TOperationState.java | 5 +---- .../java/org/apache/hive/service/cli/OperationState.java | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java b/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java index efbb412c794d0..219866223a6b0 100644 --- a/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java +++ b/sql/hive-thriftserver/v1.2/src/gen/java/org/apache/hive/service/cli/thrift/TOperationState.java @@ -19,8 +19,7 @@ public enum TOperationState implements org.apache.thrift.TEnum { CLOSED_STATE(4), ERROR_STATE(5), UKNOWN_STATE(6), - PENDING_STATE(7), - TIMEDOUT_STATE(8); + PENDING_STATE(7); private final int value; @@ -57,8 +56,6 @@ public static TOperationState findByValue(int value) { return UKNOWN_STATE; case 7: return PENDING_STATE; - case 8: - return TIMEDOUT_STATE; default: return null; } diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java index e6da0138b5114..884e336fbbe3b 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java @@ -33,7 +33,7 @@ public enum OperationState { ERROR(TOperationState.ERROR_STATE, true), UNKNOWN(TOperationState.UKNOWN_STATE, false), PENDING(TOperationState.PENDING_STATE, false), - TIMEDOUT(TOperationState.TIMEDOUT_STATE, true); + TIMEDOUT(TOperationState.CANCELED_STATE, true); //do not want to change TOperationState in hive 1.2 private final TOperationState tOperationState; private final boolean terminal; From 7e7ea7e421725356e372bae101a09ba805180068 Mon Sep 17 00:00:00 2001 From: Luan Date: Fri, 3 Jul 2020 17:14:31 +0800 Subject: [PATCH 3/8] fix comment --- .../thriftserver/SparkExecuteStatementOperation.scala | 5 ++--- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 8 +++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 91afb3e4b2777..f8c7b382c41f1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -201,8 +201,7 @@ private[hive] class SparkExecuteStatementOperation( setHasResultSet(true) // avoid no resultset for async run if(queryTimeout > 0) { - Executors.newSingleThreadScheduledExecutor - .schedule(new Runnable { + Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { override def run(): Unit = timeoutCancel() }, queryTimeout, TimeUnit.SECONDS) } @@ -314,9 +313,9 @@ private[hive] class SparkExecuteStatementOperation( def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - logInfo(s"Timeout and Cancel query with $statementId ") cleanup() setState(OperationState.TIMEDOUT) + logInfo(s"Timeout and Cancel query with $statementId ") HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index b35b13b6c0b2d..6bc27b800cc90 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -879,9 +879,15 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { withJdbcStatement() { statement => statement.setQueryTimeout(1) val e = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 50000L)") + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") }.getMessage assert(e.contains("Query timed out after")) + + statement.setQueryTimeout(0) + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + + statement.setQueryTimeout(-1) + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") } } } From e352fcd1b181a5e6a9c63fecca631dda84f2324d Mon Sep 17 00:00:00 2001 From: Luan Date: Fri, 3 Jul 2020 17:39:41 +0800 Subject: [PATCH 4/8] fix code style --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f8c7b382c41f1..e5a066a47ed9e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -200,7 +200,7 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run - if(queryTimeout > 0) { + if (queryTimeout > 0) { Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { override def run(): Unit = timeoutCancel() }, queryTimeout, TimeUnit.SECONDS) From e0d97c5e2a84ebcb665371426767d5bc209d7454 Mon Sep 17 00:00:00 2001 From: Luan Date: Sat, 4 Jul 2020 19:52:15 +0800 Subject: [PATCH 5/8] fix comment --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e5a066a47ed9e..663a48a4061ca 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -310,11 +310,11 @@ private[hive] class SparkExecuteStatementOperation( } } - def timeoutCancel(): Unit = { + private def timeoutCancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { - cleanup() setState(OperationState.TIMEDOUT) + cleanup() logInfo(s"Timeout and Cancel query with $statementId ") HiveThriftServer2.eventManager.onStatementCanceled(statementId) } From 217249d41931443582f8dc32d0a95bec94788f9f Mon Sep 17 00:00:00 2001 From: Luan Date: Mon, 6 Jul 2020 18:11:27 +0800 Subject: [PATCH 6/8] fix comment --- .../HiveThriftServer2Suites.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 6bc27b800cc90..028ffdbebe8d6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -877,17 +877,24 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { test("SPARK-26533: Support query auto timeout cancel on thriftserver") { withJdbcStatement() { statement => - statement.setQueryTimeout(1) - val e = intercept[SQLException] { - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") - }.getMessage - assert(e.contains("Query timed out after")) + if (HiveUtils.isHive23) { + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e.contains("Query timed out after")) - statement.setQueryTimeout(0) - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + statement.setQueryTimeout(0) + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") - statement.setQueryTimeout(-1) - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + statement.setQueryTimeout(-1) + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + } else { + val e = intercept[SQLException] { + statement.setQueryTimeout(1) + }.getMessage + assert(e.contains("Method not supported")) + } } } } From 4ca75baea102fb1c2e7467ed1093d4c0520a5245 Mon Sep 17 00:00:00 2001 From: Luan Date: Mon, 6 Jul 2020 18:23:59 +0800 Subject: [PATCH 7/8] fix comment --- .../hive/thriftserver/HiveThriftServer2Suites.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 028ffdbebe8d6..655a24a06b0e5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -885,10 +885,16 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(e.contains("Query timed out after")) statement.setQueryTimeout(0) - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + val rs1 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs1.next() + assert(rs1.getString(1) == "test") statement.setQueryTimeout(-1) - statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + val rs2 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs2.next() + assert(rs2.getString(1) == "test") } else { val e = intercept[SQLException] { statement.setQueryTimeout(1) From da76e1a7ba25ca96495cbb2ce56377983ee9f74f Mon Sep 17 00:00:00 2001 From: Luan Date: Tue, 11 Aug 2020 10:47:47 +0800 Subject: [PATCH 8/8] fix build failure --- .../hive/thriftserver/SparkExecuteStatementOperationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 4c2f29e0bf394..34e34ed584aab 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -108,7 +108,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark signal: Semaphore, finalState: OperationState) extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, - new util.HashMap, false) { + new util.HashMap, false, 0) { override def cleanup(): Unit = { super.cleanup()