Skip to content

Commit

Permalink
[KYUUBI #1919] Add more enum values for OperationModes
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Add more enum values for `OperationModes`.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1933 from SteNicholas/KYUUBI-1919.

Closes #1919

adcc7f5 [SteNicholas] [KYUUBI #1919] Add more enum values for OperationModes

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
SteNicholas authored and pan3793 committed Feb 18, 2022
1 parent 1eddd68 commit 6c6238c
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 43 deletions.
2 changes: 1 addition & 1 deletion docs/deployment/settings.md
Expand Up @@ -294,7 +294,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.operation.interrupt.on.cancel</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.operation.language</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SQL</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Choose a programing language for the following inputs <ul><li>SQL: (Default) Run all following statements as SQL queries.</li> <li>SCALA: Run all following input a scala codes</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
<code>kyuubi.operation.log.dir.root</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>server_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at server-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.operation.plan.only.mode</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.operation.plan.only.mode</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION only way without executing the query. When it is NONE, the statement will be fully executed</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.operation.query.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.operation.scheduler.pool</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.1.1</div>
<code>kyuubi.operation.status.polling.max.attempts</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>5</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max attempts for long polling asynchronous running sql query's status on raw transport failures, e.g. TTransportException</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
Expand Down
Expand Up @@ -37,6 +37,7 @@ class PlanOnlyStatement(
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {

private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
private val lineSeparator: String = System.lineSeparator()
override def getOperationLog: Option[OperationLog] = Option(operationLog)

override protected def runInternal(): Unit = {
Expand All @@ -56,20 +57,20 @@ class PlanOnlyStatement(

private def explainOperation(statement: String): Unit = {
val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
mode match {
case PARSE =>
val sqlPlan = tableEnv.explainSql(statement)
resultSet =
ResultSetUtil.stringListToResultSet(
List(sqlPlan.split(System.lineSeparator()).apply(1)),
"plan")
val explainPlans =
tableEnv.explainSql(statement).split(s"$lineSeparator$lineSeparator")
val operationPlan = mode match {
case PARSE => explainPlans(0).split(s"== Abstract Syntax Tree ==$lineSeparator")(1)
case PHYSICAL =>
explainPlans(1).split(s"== Optimized Physical Plan ==$lineSeparator")(1)
case EXECUTION =>
explainPlans(2).split(s"== Optimized Execution Plan ==$lineSeparator")(1)
case _ =>
throw KyuubiSQLException(
s"""
|The operation mode ${mode.toString} doesn't support in Flink SQL engine.
|Flink only supports the AST and the execution plan of the sql statement.
|Flink engine will support EXECUTION operation plan mode in future.
|""".stripMargin)
throw KyuubiSQLException(s"The operation mode $mode doesn't support in Flink SQL engine.")
}
resultSet =
ResultSetUtil.stringListToResultSet(
List(operationPlan),
"plan")
}
}
Expand Up @@ -37,7 +37,7 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper

test("Plan only operation with system defaults") {
withJdbcStatement() { statement =>
testPlanOnlyStatement(statement)
testPlanOnlyStatementWithParseMode(statement)
}
}

Expand All @@ -46,7 +46,7 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper
withJdbcStatement() { statement =>
val exceptionMsg = intercept[Exception](statement.executeQuery("select 1")).getMessage
assert(exceptionMsg.contains(
s"The operation mode ${ANALYZE.toString} doesn't support in Flink SQL engine."))
s"The operation mode $ANALYZE doesn't support in Flink SQL engine."))
}
}
}
Expand All @@ -55,14 +55,40 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
testPlanOnlyStatement(statement)
testPlanOnlyStatementWithParseMode(statement)
}
}
}

private def testPlanOnlyStatement(statement: Statement): Unit = {
test("Plan only operation with PHYSICAL mode") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> PHYSICAL.toString))(Map.empty) {
withJdbcStatement() { statement =>
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("Calc(select=[1 AS EXPR$0])") &&
operationPlan.contains("Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])"))
}
}
}

test("Plan only operation with EXECUTION mode") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> EXECUTION.toString))(Map.empty) {
withJdbcStatement() { statement =>
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("Calc(select=[1 AS EXPR$0])") &&
operationPlan.contains("Values(tuples=[[{ 0 }]])"))
}
}
}

private def testPlanOnlyStatementWithParseMode(statement: Statement): Unit = {
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("LogicalProject(EXPR$0=[1])") &&
operationPlan.contains("LogicalValues(tuples=[[{ 0 }]])"))
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1")
assert(resultSet.next())
assert(resultSet.getString(1) === "LogicalProject(EXPR$0=[1])")
resultSet.getString(1)
}
}
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.config.KyuubiConf.OperationModes.{ANALYZE, OperationMode, OPTIMIZE, PARSE}
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -57,18 +57,24 @@ class PlanOnlyStatement(
case cmd if isSetOrReset(cmd) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())
case otherPlan => mode match {
case plan => mode match {
case PARSE =>
iter = new IterableFetchIterator(Seq(Row(otherPlan.toString())))
iter = new IterableFetchIterator(Seq(Row(plan.toString())))
case ANALYZE =>
val analyzed = spark.sessionState.analyzer.execute(otherPlan)
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
iter = new IterableFetchIterator(Seq(Row(analyzed.toString())))
case OPTIMIZE =>
val analyzed = spark.sessionState.analyzer.execute(otherPlan)
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
iter = new IterableFetchIterator(Seq(Row(optimized.toString())))
case PHYSICAL =>
val physical = spark.sql(statement).queryExecution.sparkPlan
iter = new IterableFetchIterator(Seq(Row(physical.toString())))
case EXECUTION =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
}
}
} catch {
Expand Down
Expand Up @@ -1143,13 +1143,14 @@ object KyuubiConf {

object OperationModes extends Enumeration {
type OperationMode = Value
val PARSE, ANALYZE, OPTIMIZE, NONE = Value
val PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION, NONE = Value
}

val OPERATION_PLAN_ONLY: ConfigEntry[String] =
buildConf("operation.plan.only.mode")
.doc("Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without " +
"executing the query. When it is NONE, the statement will be fully executed")
.doc("Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION " +
"only way without executing the query. When it is NONE, the statement will be fully " +
"executed")
.version("1.4.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
Expand Down
Expand Up @@ -17,49 +17,71 @@

package org.apache.kyuubi.operation

import java.sql.Statement

import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OperationModes._

class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {

override protected val conf: KyuubiConf = {
KyuubiConf()
.set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
.set(KyuubiConf.OPERATION_PLAN_ONLY, "optimize")
.set(KyuubiConf.OPERATION_PLAN_ONLY, OPTIMIZE.toString)
.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "plan-only")
}

override protected def jdbcUrl: String = getJdbcUrl

test("KYUUBI #1059: Plan only operation with system defaults") {
withJdbcStatement() { statement =>
val set = statement.executeQuery("select 1 where true")
assert(set.next())
val res = set.getString(1)
assert(res.startsWith("Project") && !res.contains("Filter"))
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("Project") && !operationPlan.contains("Filter"))
}
}

test("KYUUBI #1059: Plan only operation with session conf") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> "analyze"))(Map.empty) {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
val set = statement.executeQuery("select 1 where true")
assert(set.next())
val res = set.getString(1)
assert(res.startsWith("Project") && res.contains("Filter"))
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("Project") && operationPlan.contains("Filter"))
}
}
}

test("KYUUBI #1059: Plan only operation with set command") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> "analyze"))(Map.empty) {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
val set = statement.executeQuery("select 1 where true")
assert(set.next())
val res = set.getString(1)
assert(res.startsWith("'Project"))
statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=$PARSE")
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("'Project"))
}
}
}

test("KYUUBI #1919: Plan only operation with PHYSICAL mode") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> PHYSICAL.toString))(Map.empty) {
withJdbcStatement() { statement =>
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("Project") && operationPlan.contains("Scan OneRowRelation"))
}
}
}

test("KYUUBI #1919: Plan only operation with EXECUTION mode") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> EXECUTION.toString))(Map.empty) {
withJdbcStatement() { statement =>
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("*(1) Project") &&
operationPlan.contains("*(1) Scan OneRowRelation"))
}
}
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())
resultSet.getString(1)
}
}

0 comments on commit 6c6238c

Please sign in to comment.