Skip to content

Commit e54d431

Browse files
link3280yaooqinn
authored andcommitted
[KYUUBI #1883] Support max result rows for Flink queries
### _Why are the changes needed?_ Currently, Flink engine would pull all result rows into memory before returning it to the client. This would be problematic for large result sets and infinite result sets. This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1938 from link3280/feature/FLINK-1883. Closes #1883 80020ce [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala 1b95822 [Paul Lin] [KYUUBI #1883] Avoid allocating too much buffer space 5be7535 [Paul Lin] [KYUUBI #1883] Support max result rows for Flink queries Authored-by: Paul Lin <paullin3280@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent cec25d9 commit e54d431

File tree

6 files changed

+43
-7
lines changed

6 files changed

+43
-7
lines changed

docs/deployment/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ Key | Default | Meaning | Type | Since
318318
<code>kyuubi.session.conf.restrict.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
319319
<code>kyuubi.session.engine.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
320320
<code>kyuubi.session.engine.flink.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
321+
<code>kyuubi.session.engine.flink.max.rows</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>1000000</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max rows of Flink query results. For batch queries, rows that exceeds the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.5.0</div>
321322
<code>kyuubi.session.engine.idle.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
322323
<code>kyuubi.session.engine.initialize.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
323324
<code>kyuubi.session.engine.launch.async</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session, whether to launch backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class ExecuteStatement(
4141
session: Session,
4242
override val statement: String,
4343
override val shouldRunAsync: Boolean,
44-
queryTimeout: Long)
44+
queryTimeout: Long,
45+
resultMaxRows: Int)
4546
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
4647

4748
private val operationLog: OperationLog =
@@ -132,12 +133,16 @@ class ExecuteStatement(
132133
while (loop) {
133134
Thread.sleep(50) // slow the processing down
134135

135-
val result = executor.snapshotResult(sessionId, resultId, 2)
136+
val pageSize = Math.min(500, resultMaxRows)
137+
val result = executor.snapshotResult(sessionId, resultId, pageSize)
136138
result.getType match {
137139
case TypedResult.ResultType.PAYLOAD =>
138-
rows.clear()
139140
(1 to result.getPayload).foreach { page =>
140-
rows ++= executor.retrieveResultPage(resultId, page).asScala
141+
if (rows.size < resultMaxRows) {
142+
rows ++= executor.retrieveResultPage(resultId, page).asScala
143+
} else {
144+
loop = false
145+
}
141146
}
142147
case TypedResult.ResultType.EOS => loop = false
143148
case TypedResult.ResultType.EMPTY =>
@@ -147,7 +152,7 @@ class ExecuteStatement(
147152
resultSet = ResultSet.builder
148153
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
149154
.columns(resultDescriptor.getResultSchema.getColumns)
150-
.data(rows.toArray[Row])
155+
.data(rows.slice(0, resultMaxRows).toArray[Row])
151156
.build
152157
} finally {
153158
if (resultId != null) {

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
3333

3434
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
3535

36+
private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
37+
3638
override def newExecuteStatementOperation(
3739
session: Session,
3840
statement: String,
@@ -43,9 +45,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
4345
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
4446
OPERATION_PLAN_ONLY.key,
4547
operationModeDefault)
48+
val resultMaxRows =
49+
flinkSession.normalizedConf.getOrElse(
50+
ENGINE_FLINK_MAX_ROWS.key,
51+
resultMaxRowsDefault.toString).toInt
4652
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
4753
case NONE =>
48-
new ExecuteStatement(session, statement, runAsync, queryTimeout)
54+
new ExecuteStatement(session, statement, runAsync, queryTimeout, resultMaxRows)
4955
case mode =>
5056
new PlanOnlyStatement(session, statement, mode)
5157
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,4 +761,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
761761
.getStringVal.getValues.get(0) === "tmp.hello")
762762
}
763763
}
764+
765+
test("ensure result max rows") {
766+
withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
767+
withJdbcStatement() { statement =>
768+
statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
769+
val resultSet = statement.executeQuery(s"select a from tbl_src")
770+
var rows = 0
771+
while (resultSet.next()) {
772+
rows += 1
773+
}
774+
assert(rows === 200)
775+
}
776+
}
777+
}
764778
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ class LegacyFlinkOperationSuite extends KyuubiFunSuite {
116116
}
117117

118118
test("execute statement - select column name with dots") {
119-
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
119+
val executeStatementOp =
120+
new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1, 500)
120121
val executor = createLocalExecutor
121122
executor.openSession("test-session")
122123
executeStatementOp.setExecutor(executor)

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,15 @@ object KyuubiConf {
594594
.stringConf
595595
.createOptional
596596

597+
val ENGINE_FLINK_MAX_ROWS: ConfigEntry[Int] =
598+
buildConf("session.engine.flink.max.rows")
599+
.doc("Max rows of Flink query results. For batch queries, rows that exceeds the limit " +
600+
"would be ignored. For streaming queries, the query would be canceled if the limit " +
601+
"is reached.")
602+
.version("1.5.0")
603+
.intConf
604+
.createWithDefault(1000000)
605+
597606
val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
598607
buildConf("session.engine.trino.main.resource")
599608
.doc("The package used to create Trino engine remote job. If it is undefined," +

0 commit comments

Comments
 (0)