Skip to content

Commit

Permalink
[KYUUBI #3214] Plan only mode should unset when mode value is incorrect
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

close #3214

### manual tests
when we set `kyuubi.operation.plan.only.mode` an incorrect value,unset plan only mode
![image](https://user-images.githubusercontent.com/18713676/183860516-b555cc2d-d5fb-4782-b8e3-e300fb877728.png)
so we can set a correct value

![image](https://user-images.githubusercontent.com/18713676/183860130-a662267a-7a77-430b-80ef-d2a49346b5e9.png)

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3216 from lsm1/fix/plan_only_mode.

Closes #3214

bb159ec [senmiaoliu] fix ut
f23dea1 [LSM] Merge branch 'apache:master' into fix/plan_only_mode
5204745 [senmiaoliu] implement in OperationModes
65898f4 [senmiaoliu] fix style, added ut
a1df52f [senmiaoliu] unset kyuubi.operation.plan.only.mode if mode is invalid

Lead-authored-by: senmiaoliu <senmiaoliu@trip.com>
Co-authored-by: LSM <senmiaoliu@trip.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
lsm1 authored and pan3793 committed Aug 17, 2022
1 parent 3cf5a20 commit 92f3a53
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 10 deletions.
Expand Up @@ -18,7 +18,6 @@
package org.apache.kyuubi.engine.flink.operation

import java.util
import java.util.Locale

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -54,14 +53,15 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
return catalogDatabaseOperation
}
}
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
val mode = OperationModes(flinkSession.sessionContext.getConfigMap.getOrDefault(
OPERATION_PLAN_ONLY_MODE.key,
operationModeDefault)
operationModeDefault))
flinkSession.sessionContext.set(OPERATION_PLAN_ONLY_MODE.key, mode.toString)
val resultMaxRows =
flinkSession.normalizedConf.getOrElse(
ENGINE_FLINK_MAX_ROWS.key,
resultMaxRowsDefault.toString).toInt
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
val op = mode match {
case NONE =>
// FLINK-24427 seals calcite classes which required to access in async mode, considering
// there is no much benefit in async mode, here we just ignore `runAsync` and always run
Expand Down
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.OPERATION_PLAN_ONLY_EXCLUDES
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator}
Expand Down Expand Up @@ -77,6 +78,9 @@ class PlanOnlyStatement(
case EXECUTION =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
case UNKNOWN =>
throw KyuubiSQLException(s"The operation mode $mode" +
" doesn't support in Spark SQL engine.")
}
}
}
Expand Down
Expand Up @@ -68,8 +68,10 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val operation =
OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
case OperationLanguages.SQL =>
val mode = spark.conf.get(OPERATION_PLAN_ONLY_MODE.key, operationModeDefault)
OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
val mode =
OperationModes(spark.conf.get(OPERATION_PLAN_ONLY_MODE.key, operationModeDefault))
spark.conf.set(OPERATION_PLAN_ONLY_MODE.key, mode.toString)
mode match {
case NONE =>
val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
Expand Down
Expand Up @@ -1612,9 +1612,23 @@ object KyuubiConf {
.stringConf
.createOptional

object OperationModes extends Enumeration {
object OperationModes extends Enumeration with Logging {
type OperationMode = Value
val PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION, NONE = Value
val PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION, NONE, UNKNOWN = Value

def apply(mode: String): OperationMode = {
mode.toUpperCase(Locale.ROOT) match {
case "PARSE" => PARSE
case "ANALYZE" => ANALYZE
case "OPTIMIZE" => OPTIMIZE
case "PHYSICAL" => PHYSICAL
case "EXECUTION" => EXECUTION
case "NONE" => NONE
case other =>
warn(s"Unsupported operation mode: $mode, using UNKNOWN instead")
UNKNOWN
}
}
}

val OPERATION_PLAN_ONLY_MODE: ConfigEntry[String] =
Expand Down
Expand Up @@ -479,7 +479,7 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
assert(set.getString("plan") startsWith "Create")
val set0 = statement.executeQuery(setkey)
assert(set0.next())
assert(set0.getString(2) === "optimize")
assert(set0.getString(2) === "OPTIMIZE")
val e1 = intercept[SQLException](statement.executeQuery(dql))
assert(e1.getMessage.contains("Table or view not found"))
statement.execute("SET kyuubi.operation.plan.only.mode=analyze")
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import java.sql.Statement
import java.sql.{SQLException, Statement}

import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
Expand Down Expand Up @@ -114,6 +114,20 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("kyuubi #3214: Plan only mode with an incorrect value") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "parse"))(Map.empty) {
withJdbcStatement() { statement =>
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parser")
val e = intercept[SQLException](statement.executeQuery("select 1"))
assert(e.getMessage.contains("The operation mode UNKNOWN doesn't support"))
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parse")
val result = statement.executeQuery("select 1")
assert(result.next())
assert(result.getString(1).contains("Project [unresolvedalias(1, None)]"))
}
}
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())
Expand Down

0 comments on commit 92f3a53

Please sign in to comment.