diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 922af72604027..663a48a4061ca 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -42,7 +43,8 @@ private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - runInBackground: Boolean = true) + runInBackground: Boolean = true, + queryTimeout: Long) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with SparkOperation with Logging { @@ -198,6 +200,12 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run + if (queryTimeout > 0) { + Executors.newSingleThreadScheduledExecutor.schedule(new Runnable { + override def run(): Unit = timeoutCancel() + }, queryTimeout, TimeUnit.SECONDS) + } + if (!runInBackground) { execute() } else { @@ -302,6 +310,17 @@ private[hive] class SparkExecuteStatementOperation( } } + private def timeoutCancel(): Unit = { + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.TIMEDOUT) + cleanup() + logInfo(s"Timeout and Cancel query with $statementId ") + HiveThriftServer2.eventManager.onStatementCanceled(statementId) + } + } + } + override protected def cleanup(): Unit = { if (runInBackground) { val backgroundHandle = getBackgroundHandle() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index bc9c13eb0d4f8..ba42eefed2a22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - async: Boolean): ExecuteStatementOperation = synchronized { + async: Boolean, + queryTimeout: Long): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sqlContext, parentSession, statement, confOverlay, runInBackground) + sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 3fd46dc82f03f..655a24a06b0e5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -874,6 +874,35 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(rs.getString(1) === expected.toString) } } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver") { + withJdbcStatement() { statement => + if (HiveUtils.isHive23) { + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") + }.getMessage + assert(e.contains("Query timed out after")) + + statement.setQueryTimeout(0) + val rs1 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs1.next() + assert(rs1.getString(1) == "test") + + statement.setQueryTimeout(-1) + val rs2 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs2.next() + assert(rs2.getString(1) == "test") + } else { + val e = intercept[SQLException] { + statement.setQueryTimeout(1) + }.getMessage + assert(e.contains("Method not supported")) + } + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 4c2f29e0bf394..34e34ed584aab 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -108,7 +108,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark signal: Semaphore, finalState: OperationState) extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, - new util.HashMap, false) { + new util.HashMap, false, 0) { override def cleanup(): Unit = { super.cleanup() diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java index 1165180118413..884e336fbbe3b 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/OperationState.java @@ -32,7 +32,8 @@ public enum OperationState { CLOSED(TOperationState.CLOSED_STATE, true), ERROR(TOperationState.ERROR_STATE, true), UNKNOWN(TOperationState.UKNOWN_STATE, false), - PENDING(TOperationState.PENDING_STATE, false); + PENDING(TOperationState.PENDING_STATE, false), + TIMEDOUT(TOperationState.CANCELED_STATE, true); //do not want to change TOperationState in hive 1.2 private final TOperationState tOperationState; private final boolean terminal; diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 92c340a29c107..1d756d22a639c 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -87,7 +87,7 @@ private void initOperationLogCapture(String loggingMode) { } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync) + String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index e3fb54d9f47e9..aa25f36b1cf1f 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -464,7 +464,7 @@ private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation - .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0); + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = + ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement, + confOverlay, runAsync, queryTimeout); addOperation(executeStatementOperation); return executeStatementOperation; } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map confOverlay, boolean runAsync, long queryTimeout) - throws HiveSQLException { - return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); - } - public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); addOperation(operation);