New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26533][SQL][test-hadoop2.7] Support query auto timeout cancel on thriftserver #28991
Conversation
@wangyum please help to review |
|
||
test("SPARK-26533: Support query auto timeout cancel on thriftserver") { | ||
withJdbcStatement() { statement => | ||
statement.setQueryTimeout(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you test the cases 0
and -1
? They mean no limit?
/**
...
* @param seconds the new query timeout limit in seconds; zero means
* there is no limit
* @exception SQLException if a database access error occurs,
* this method is called on a closed <code>Statement</code>
* or the condition {@code seconds >= 0} is not satisfied
* @see #getQueryTimeout
*/
void setQueryTimeout(int seconds) throws SQLException;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed
@@ -204,6 +205,13 @@ private[hive] class SparkExecuteStatementOperation( | |||
parentSession.getUsername) | |||
setHasResultSet(true) // avoid no resultset for async run | |||
|
|||
if(queryTimeout > 0) { | |||
Executors.newSingleThreadScheduledExecutor | |||
.schedule(new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit format:
Executors.newSingleThreadScheduledExecutor.schedule(new Runnable {
override def run(): Unit = timeoutCancel()
}, queryTimeout, TimeUnit.SECONDS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed
if (!getStatus.getState.isTerminal) { | ||
logInfo(s"Timeout and Cancel query with $statementId ") | ||
cleanup() | ||
setState(OperationState.TIMEDOUT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logInfo
is located just before HiveThriftServer2.eventManager.onXXX
?
cleanup()
setState(OperationState.TIMEDOUT)
logInfo(s"Timeout and Cancel query with $statementId ")
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
Lines 51 to 52 in 42f01e3
logInfo(s"Close statement with $statementId") | |
HiveThriftServer2.eventManager.onOperationClosed(statementId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please setState before cleanup. It's an open bug, see #28912
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please setState before cleanup. It's an open bug, see #28912
Yea, nice suggestion! Might be better to add tests for that case, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please setState before cleanup. It's an open bug, see #28912
Fixed, thanks for your review
@@ -349,6 +357,17 @@ private[hive] class SparkExecuteStatementOperation( | |||
} | |||
} | |||
|
|||
def timeoutCancel(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we inline this method in the line 211?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not sure this method could be inline, because this method should be synchronized on SparkExecuteStatementOperation object
def timeoutCancel(): Unit = { synchronized {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. btw, def timeoutCancel()
-> private def timeoutCancel()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. btw,
def timeoutCancel()
->private def timeoutCancel()
?
Fixed, thanks!
@@ -32,7 +32,8 @@ | |||
CLOSED(TOperationState.CLOSED_STATE, true), | |||
ERROR(TOperationState.ERROR_STATE, true), | |||
UNKNOWN(TOperationState.UKNOWN_STATE, false), | |||
PENDING(TOperationState.PENDING_STATE, false); | |||
PENDING(TOperationState.PENDING_STATE, false), | |||
TIMEDOUT(TOperationState.CANCELED_STATE, true); //do not want to change TOperationState in hive 1.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does //do not want to change TOperationState in hive 1.2
means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class of TOperationState was generated by thift, so we should not change it directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not work and never be used with hive-1.2 anyway, because it's not in HIVE_CLI_SERVICE_PROTOCOL_V8.
But adding the state here does not hurt, it allows it to compile with hive-1.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Only hive-2.3
support timeout.
ok to test. |
ok to test |
Test build #124982 has finished for PR 28991 at commit
|
retest this please |
Test build #125039 has finished for PR 28991 at commit
|
@@ -874,6 +874,22 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { | |||
assert(rs.getString(1) === expected.toString) | |||
} | |||
} | |||
|
|||
test("SPARK-26533: Support query auto timeout cancel on thriftserver") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check if it can throw an exception (when hive1.2 used internally) by using HiveUtils.builtinHiveVersion
or the other related variables?
Error Message
java.sql.SQLException: Method not supported
Stacktrace
sbt.ForkMain$ForkError: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveStatement.setQueryTimeout(HiveStatement.java:739)
at org.apache.spark.sql.hive.thriftserver.HiveThriftBinaryServerSuite.$anonfun$new$97(HiveThriftServer2Suites.scala:880)
at
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed, thanks
assert(e.contains("Query timed out after")) | ||
|
||
statement.setQueryTimeout(0) | ||
statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change query to: SELECT 'test1', java_method('java.lang.Thread', 'sleep', 3000L);
and assert the result to make the test more robust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed, thanks
Test build #125063 has finished for PR 28991 at commit
|
Test build #125060 has finished for PR 28991 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a config: spark.sql.thriftServer.queryTimeout
?
The thriftserver supports the global config for timeout? This PR itself proposes per-statement config for timeout thoguh. |
Yes. Hive 2.3 support the global config for timeout: https://issues.apache.org/jira/browse/HIVE-13760. |
Test build #125077 has finished for PR 28991 at commit
|
Yes. This will be useful for user to manage queries with SLA. @maropu @juliuszsompolski What do you think? |
Yea, looks okay to add it. But, it might better to make anotehr PR for the support. |
Looks okay to add it to me as well. |
ok, I will make another PR to support global config for timeout |
Could you resolve the conflict? |
23b43f9
to
3a07fc5
Compare
@maropu can you take a look, I have resolved the conflict, thanks |
Test build #127204 has finished for PR 28991 at commit
|
@leoluan2009 Could you fix the build failure? |
3a07fc5
to
4ca75ba
Compare
Test build #127300 has finished for PR 28991 at commit
|
Test build #127303 has finished for PR 28991 at commit
|
@maropu all build are ok now, can you take a look again? thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay cc: @dongjoon-hyun @cloud-fan
@@ -86,20 +86,15 @@ private void initOperationLogCapture(String loggingMode) { | |||
} | |||
|
|||
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to replace the previous methods here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this update is just to pass queryTimeout
into newExecuteStatementOperation
.
@@ -87,7 +87,7 @@ private void initOperationLogCapture(String loggingMode) { | |||
} | |||
|
|||
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, | |||
String statement, Map<String, String> confOverlay, boolean runAsync) | |||
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change these at all since queryTimeout
is supported only in hive-2.3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This update is needed to add the queryTimeout
param in SparkSQLOperationManager#newExecuteStatementOperation
:
https://github.com/apache/spark/pull/28991/files#diff-9d2cd65aaeae992250b5f40d8c289287R48
Looked around the related code and I think the current one is the simplest. If you have better idea or another suggestion, please let me know in #29933.
Made a pass. I would reduce the unnecessary changes in hive-1.2 and avoid collapsing the two |
@leoluan2009 Are you still here? Could you address the review comments above? |
retest this please |
Test build #127805 has finished for PR 28991 at commit
|
kindly ping @leoluan2009 |
Support query auto cancelling when running too long on thriftserver.
What changes were proposed in this pull request?
Why are the changes needed?
For some cases,we use thriftserver as long-running applications.
Some times we want all the query need not to run more than given time.
In these cases,we can enable auto cancel for time-consumed query.Which can let us release resources for other queries to run.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added UT