Skip to content

Commit

Permalink
[KYUUBI #6377] Fix isCommand check and set min rows threshold for sav…
Browse files Browse the repository at this point in the history
…eToFile

# 🔍 Description
## Issue References 🔗

This pull request fixes #
I found that, with saveToFile enabled with the default min size threshold, even I run a simple `set` command, It also save the result to file.
<img width="1718" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/5bcc0da1-201a-453a-8568-d1bfadd7adef">

I think we need to skip this kind of queries.

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6377 from turboFei/check_is_DQL.

Closes #6377

da9c2a9 [Wang, Fei] ut
04e20db [Wang, Fei] conf
8f20ed8 [Wang, Fei] refine the check
f558dcc [Wang, Fei] ut
c813403 [Wang, Fei] DQL

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
  • Loading branch information
turboFei committed May 9, 2024
1 parent 12c5568 commit 3439ea0
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Expand Up @@ -420,6 +420,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
| kyuubi.operation.result.saveToFile.minRows | 10000 | The minRows of Spark result save to file, default value is 10000. | long | 1.9.1 |
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
Expand Down
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
Expand Down Expand Up @@ -172,10 +172,12 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveSizeThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveRowsThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
resultSaveSizeThreshold,
resultSaveRowsThreshold,
result)) {
saveFileName =
Some(
Expand Down
Expand Up @@ -246,22 +246,33 @@ object SparkDatasetHelper extends Logging {
case _ => None
}

def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
def shouldSaveResultToFs(
resultMaxRows: Int,
minSize: Long,
minRows: Long,
result: DataFrame): Boolean = {
if (isCommandExec(result) ||
(resultMaxRows > 0 && resultMaxRows < minRows) ||
result.queryExecution.optimizedPlan.stats.rowCount.getOrElse(
BigInt(Long.MaxValue)) < minRows) {
return false
}
val finalLimit = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
case Some(limit) => limit
case None => resultMaxRows
val finalLimit: Option[Long] = optimizedPlanLimit(result.queryExecution) match {
case Some(limit) if resultMaxRows > 0 => Some(math.min(limit, resultMaxRows))
case Some(limit) => Some(limit)
case None if resultMaxRows > 0 => Some(resultMaxRows)
case _ => None
}
lazy val stats = if (finalLimit > 0) {
finalLimit * EstimationUtils.getSizePerRow(
result.queryExecution.executedPlan.output)
} else {
result.queryExecution.optimizedPlan.stats.sizeInBytes
if (finalLimit.exists(_ < minRows)) {
return false
}
lazy val colSize =
val sizeInBytes = result.queryExecution.optimizedPlan.stats.sizeInBytes
val stats = finalLimit.map { limit =>
val estimateSize =
limit * EstimationUtils.getSizePerRow(result.queryExecution.executedPlan.output)
estimateSize min sizeInBytes
}.getOrElse(sizeInBytes)
val colSize =
if (result == null || result.schema.isEmpty) {
0
} else {
Expand All @@ -270,7 +281,8 @@ object SparkDatasetHelper extends Logging {
minSize > 0 && colSize > 0 && stats >= minSize
}

private def isCommandExec(nodeName: String): Boolean = {
def isCommandExec(result: DataFrame): Boolean = {
val nodeName = result.queryExecution.executedPlan.getClass.getName
nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
}
Expand Down
Expand Up @@ -42,4 +42,15 @@ class SparkDatasetHelperSuite extends WithSparkSQLEngine {
spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
}
}

test("isCommandExec") {
var query = "set"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "explain set"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "show tables"
assert(SparkDatasetHelper.isCommandExec(spark.sql(query)))
query = "select * from VALUES(1),(2),(3),(4) AS t(id)"
assert(!SparkDatasetHelper.isCommandExec(spark.sql(query)))
}
}
Expand Up @@ -2066,6 +2066,14 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive value")
.createWithDefault(200 * 1024 * 1024)

val OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS: ConfigEntry[Long] =
buildConf("kyuubi.operation.result.saveToFile.minRows")
.doc("The minRows of Spark result save to file, default value is 10000.")
.version("1.9.1")
.longConf
.checkValue(_ > 0, "must be positive value")
.createWithDefault(10000)

val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.incremental.collect")
.internal
Expand Down

0 comments on commit 3439ea0

Please sign in to comment.