Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26533][SQL][test-hadoop2.7] Support query auto timeout cancel on thriftserver #28991

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand All @@ -42,7 +43,8 @@ private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
runInBackground: Boolean = true,
queryTimeout: Long)
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with SparkOperation
with Logging {
Expand Down Expand Up @@ -198,6 +200,12 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getUsername)
setHasResultSet(true) // avoid no resultset for async run

if (queryTimeout > 0) {
Executors.newSingleThreadScheduledExecutor.schedule(new Runnable {
override def run(): Unit = timeoutCancel()
}, queryTimeout, TimeUnit.SECONDS)
}

if (!runInBackground) {
execute()
} else {
Expand Down Expand Up @@ -302,6 +310,17 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def timeoutCancel(): Unit = {
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.TIMEDOUT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: logInfo is located just before HiveThriftServer2.eventManager.onXXX?

        cleanup()
        setState(OperationState.TIMEDOUT)
        logInfo(s"Timeout and Cancel query with $statementId ")
        HiveThriftServer2.eventManager.onStatementCanceled(statementId)

logInfo(s"Close statement with $statementId")
HiveThriftServer2.eventManager.onOperationClosed(statementId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please setState before cleanup. It's an open bug, see #28912

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please setState before cleanup. It's an open bug, see #28912

Yea, nice suggestion! Might be better to add tests for that case, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please setState before cleanup. It's an open bug, see #28912

Fixed, thanks for your review

cleanup()
logInfo(s"Timeout and Cancel query with $statementId ")
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}

override protected def cleanup(): Unit = {
if (runInBackground) {
val backgroundHandle = getBackgroundHandle()
Expand Down
Expand Up @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager()
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
async: Boolean,
queryTimeout: Long): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(
sqlContext, parentSession, statement, confOverlay, runInBackground)
sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
Expand Down
Expand Up @@ -874,6 +874,35 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
assert(rs.getString(1) === expected.toString)
}
}

test("SPARK-26533: Support query auto timeout cancel on thriftserver") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check if it can throw an exception (when hive1.2 used internally) by using HiveUtils.builtinHiveVersion or the other related variables?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125039/testReport/org.apache.spark.sql.hive.thriftserver/HiveThriftBinaryServerSuite/SPARK_26533__Support_query_auto_timeout_cancel_on_thriftserver/

Error Message
java.sql.SQLException: Method not supported
Stacktrace
sbt.ForkMain$ForkError: java.sql.SQLException: Method not supported
	at org.apache.hive.jdbc.HiveStatement.setQueryTimeout(HiveStatement.java:739)
	at org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite.$anonfun$new$97(HiveThriftServer2Suites.scala:880)
	at 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks

withJdbcStatement() { statement =>
if (HiveUtils.isHive23) {
statement.setQueryTimeout(1)
val e = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)")
}.getMessage
assert(e.contains("Query timed out after"))

statement.setQueryTimeout(0)
val rs1 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs1.next()
assert(rs1.getString(1) == "test")

statement.setQueryTimeout(-1)
val rs2 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs2.next()
assert(rs2.getString(1) == "test")
} else {
val e = intercept[SQLException] {
statement.setQueryTimeout(1)
}.getMessage
assert(e.contains("Method not supported"))
}
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down
Expand Up @@ -108,7 +108,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark
signal: Semaphore,
finalState: OperationState)
extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement,
new util.HashMap, false) {
new util.HashMap, false, 0) {

override def cleanup(): Unit = {
super.cleanup()
Expand Down
Expand Up @@ -32,7 +32,8 @@ public enum OperationState {
CLOSED(TOperationState.CLOSED_STATE, true),
ERROR(TOperationState.ERROR_STATE, true),
UNKNOWN(TOperationState.UKNOWN_STATE, false),
PENDING(TOperationState.PENDING_STATE, false);
PENDING(TOperationState.PENDING_STATE, false),
TIMEDOUT(TOperationState.CANCELED_STATE, true); //do not want to change TOperationState in hive 1.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does //do not want to change TOperationState in hive 1.2 means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class of TOperationState was generated by thift, so we should not change it directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work and never be used with hive-1.2 anyway, because it's not in HIVE_CLI_SERVICE_PROTOCOL_V8.
But adding the state here does not hurt, it allows it to compile with hive-1.2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Only hive-2.3 support timeout.


private final TOperationState tOperationState;
private final boolean terminal;
Expand Down
Expand Up @@ -87,7 +87,7 @@ private void initOperationLogCapture(String loggingMode) {
}

public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map<String, String> confOverlay, boolean runAsync)
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change these at all since queryTimeout is supported only in hive-2.3?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update is needed to add the queryTimeout param in SparkSQLOperationManager#newExecuteStatementOperation:
https://github.com/apache/spark/pull/28991/files#diff-9d2cd65aaeae992250b5f40d8c289287R48
Looked around the related code and I think the current one is the simplest. If you have better idea or another suggestion, please let me know in #29933.

throws HiveSQLException {
ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
.newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
Expand Down
Expand Up @@ -464,7 +464,7 @@ private OperationHandle executeStatementInternal(String statement, Map<String, S

OperationManager operationManager = getOperationManager();
ExecuteStatementOperation operation = operationManager
.newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
.newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync, 0);
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
Expand Down
Expand Up @@ -86,20 +86,15 @@ private void initOperationLogCapture(String loggingMode) {
}

public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to replace the previous methods here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this update is just to pass queryTimeout into newExecuteStatementOperation.

String statement, Map<String, String> confOverlay, boolean runAsync)
throws HiveSQLException {
ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation
.newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0);
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
throws HiveSQLException {
ExecuteStatementOperation executeStatementOperation =
ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement,
confOverlay, runAsync, queryTimeout);
addOperation(executeStatementOperation);
return executeStatementOperation;
}

public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
throws HiveSQLException {
return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
}

public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession);
addOperation(operation);
Expand Down