Skip to content

Commit b2e679d

Browse files
committed
[KYUUBI #1499] Introduce DataFrameHolder for cli result fetching
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Replace ArrayList with DataFrameHolder ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1507 from yaooqinn/1499. Closes #1499 7fd2b0e [Kent Yao] fix it 6278009 [Kent Yao] root de3c601 [Kent Yao] fi 358d7a6 [Kent Yao] refine 8373b9b [Kent Yao] refine ab95f7d [Kent Yao] refine 86d90b8 [Kent Yao] loader a07117c [Kent Yao] nit 90a7dd4 [Kent Yao] [KYUUBI #1499] Introduce DataFrameHolder for cli result fetching 8d97b51 [Kent Yao] [KYUUBI #1499] Introduce DataFrameHolder for cli result fetching Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 6e10eec commit b2e679d

File tree

12 files changed

+153
-60
lines changed

12 files changed

+153
-60
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import org.apache.kyuubi.Utils
2626

2727
object KyuubiSparkUtil {
2828

29+
final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
30+
final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
31+
2932
def globalSparkContext: SparkContext = SparkSession.active.sparkContext
3033

3134
def engineId: String =

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ import org.apache.kyuubi.session.Session
3232
* Support executing Scala Script with or without common Spark APIs, only support running in sync
3333
* mode, as an operation may [[Incomplete]] and wait for others to make [[Success]].
3434
*
35-
* [[KyuubiSparkILoop.results]] is exposed as a [[org.apache.spark.sql.DataFrame]] to users in repl
36-
* to transfer result they wanted to client side.
35+
* [[KyuubiSparkILoop.result]] is exposed as a [[org.apache.spark.sql.DataFrame]] holder to users
36+
* in repl to transfer result they wanted to client side.
3737
*
3838
* @param session parent session
3939
* @param repl Scala Interpreter
@@ -56,20 +56,26 @@ class ExecuteScala(
5656
}
5757
}
5858

59-
override protected def runInternal(): Unit = {
59+
override protected def runInternal(): Unit = withLocalProperties {
6060
try {
6161
OperationLog.setCurrentOperationLog(operationLog)
62-
spark.sparkContext.setJobGroup(statementId, statement)
6362
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
63+
val legacyOutput = repl.getOutput
64+
if (legacyOutput.nonEmpty) {
65+
warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
66+
}
6467
repl.interpretWithRedirectOutError(statement) match {
6568
case Success =>
66-
iter =
67-
if (repl.results.nonEmpty) {
68-
result = repl.results.remove(0)
69+
iter = {
70+
result = repl.getResult(statementId)
71+
if (result != null) {
6972
new ArrayFetchIterator[Row](result.collect())
7073
} else {
74+
// TODO (#1498): Maybe we shall pass the output through operation log
75+
// but some clients may not support operation log
7176
new ArrayFetchIterator[Row](Array(Row(repl.getOutput)))
7277
}
78+
}
7379
case Error =>
7480
throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
7581
case Incomplete =>
@@ -78,7 +84,7 @@ class ExecuteScala(
7884
} catch {
7985
onError(cancel = true)
8086
} finally {
81-
spark.sparkContext.clearJobGroup()
87+
repl.clearResult(statementId)
8288
}
8389
}
8490
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import org.apache.spark.sql.Row
2626
import org.apache.spark.sql.types._
2727

2828
import org.apache.kyuubi.{KyuubiSQLException, Logging}
29-
import org.apache.kyuubi.config.KyuubiConf
30-
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
29+
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
3130
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
3231
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType}
3332
import org.apache.kyuubi.operation.OperationState.OperationState
@@ -43,15 +42,6 @@ class ExecuteStatement(
4342
incrementalCollect: Boolean)
4443
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
4544

46-
import org.apache.kyuubi.KyuubiSparkUtils._
47-
48-
private val forceCancel =
49-
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
50-
51-
private val schedulerPool =
52-
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
53-
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
54-
5545
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
5646

5747
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
@@ -91,7 +81,7 @@ class ExecuteStatement(
9181
private def executeStatement(): Unit = withLocalProperties {
9282
try {
9383
setState(OperationState.RUNNING)
94-
info(KyuubiSparkUtil.diagnostics)
84+
info(diagnostics)
9585
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
9686
// TODO: Make it configurable
9787
spark.sparkContext.addSparkListener(operationListener)
@@ -143,26 +133,6 @@ class ExecuteStatement(
143133
}
144134
}
145135

146-
private def withLocalProperties[T](f: => T): T = {
147-
try {
148-
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
149-
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
150-
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
151-
schedulerPool match {
152-
case Some(pool) =>
153-
spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
154-
case None =>
155-
}
156-
157-
f
158-
} finally {
159-
spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
160-
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
161-
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
162-
spark.sparkContext.clearJobGroup()
163-
}
164-
}
165-
166136
private def addTimeoutMonitor(): Unit = {
167137
if (queryTimeout > 0) {
168138
val timeoutExecutor =

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
2626
import org.apache.spark.sql.types.StructType
2727

2828
import org.apache.kyuubi.{KyuubiSQLException, Utils}
29+
import org.apache.kyuubi.config.KyuubiConf
30+
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
31+
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SCHEDULER_POOL_KEY
2932
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
3033
import org.apache.kyuubi.engine.spark.schema.RowSet
3134
import org.apache.kyuubi.engine.spark.schema.SchemaHelper
@@ -72,7 +75,7 @@ abstract class SparkOperation(opType: OperationType, session: Session)
7275
* @param input the SQL pattern to convert
7376
* @return the equivalent Java regular expression of the pattern
7477
*/
75-
def toJavaRegex(input: String): String = {
78+
protected def toJavaRegex(input: String): String = {
7679
val res =
7780
if (StringUtils.isEmpty(input) || input == "*") {
7881
"%"
@@ -85,6 +88,33 @@ abstract class SparkOperation(opType: OperationType, session: Session)
8588
.replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".")
8689
}
8790

91+
private val forceCancel =
92+
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
93+
94+
private val schedulerPool =
95+
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
96+
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
97+
98+
protected def withLocalProperties[T](f: => T): T = {
99+
try {
100+
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
101+
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
102+
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
103+
schedulerPool match {
104+
case Some(pool) =>
105+
spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
106+
case None =>
107+
}
108+
109+
f
110+
} finally {
111+
spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
112+
spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
113+
spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
114+
spark.sparkContext.clearJobGroup()
115+
}
116+
}
117+
88118
protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
89119
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
90120
// could be thrown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.spark.repl
19+
20+
import java.util.HashMap
21+
22+
import org.apache.spark.kyuubi.SparkContextHelper
23+
import org.apache.spark.sql.{DataFrame, SparkSession}
24+
25+
/**
26+
* Helper class to wrap a [[DataFrame]] and pass its results to clients
27+
*
28+
* @since 1.5.0
29+
*/
30+
class DataFrameHolder(spark: SparkSession) {
31+
32+
private val results = new HashMap[String, DataFrame]()
33+
34+
private def currentId: String = {
35+
SparkContextHelper.getCurrentStatementId(spark.sparkContext)
36+
}
37+
38+
/**
39+
* Set Results
40+
* @param df a DataFrame for pass result to to clients
41+
*/
42+
def set(df: DataFrame): Unit = {
43+
results.put(currentId, df)
44+
}
45+
46+
/**
47+
* Get Result
48+
* @param statementId kyuubi statement id
49+
*/
50+
def get(statementId: String): DataFrame = {
51+
results.get(statementId)
52+
}
53+
54+
/**
55+
* Clear Result
56+
* @param statementId kyuubi statement id
57+
*/
58+
def unset(statementId: String): Unit = {
59+
results.remove(statementId)
60+
}
61+
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@ package org.apache.kyuubi.engine.spark.repl
1919

2020
import java.io.{ByteArrayOutputStream, File}
2121

22-
import scala.collection.mutable.ArrayBuffer
2322
import scala.tools.nsc.Settings
23+
import scala.tools.nsc.interpreter.IR
2424
import scala.tools.nsc.interpreter.JPrintWriter
2525

2626
import org.apache.spark.SparkContext
2727
import org.apache.spark.repl.{Main, SparkILoop}
28-
import org.apache.spark.sql.{Dataset, Row, SparkSession}
28+
import org.apache.spark.sql.{DataFrame, SparkSession}
2929
import org.apache.spark.util.MutableURLClassLoader
3030

3131
private[spark] case class KyuubiSparkILoop private (
3232
spark: SparkSession,
3333
output: ByteArrayOutputStream)
3434
extends SparkILoop(None, new JPrintWriter(output)) {
3535

36-
// TODO: this is a little hacky
37-
val results = new ArrayBuffer[Dataset[Row]]()
36+
val result = new DataFrameHolder(spark)
3837

3938
private def initialize(): Unit = {
4039
settings = new Settings
@@ -51,7 +50,7 @@ private[spark] case class KyuubiSparkILoop private (
5150
try {
5251
this.compilerClasspath
5352
this.ensureClassLoader()
54-
var classLoader = Thread.currentThread().getContextClassLoader
53+
var classLoader: ClassLoader = Thread.currentThread().getContextClassLoader
5554
while (classLoader != null) {
5655
classLoader match {
5756
case loader: MutableURLClassLoader =>
@@ -66,6 +65,9 @@ private[spark] case class KyuubiSparkILoop private (
6665
classLoader = classLoader.getParent
6766
}
6867
}
68+
69+
this.addUrlsToClassPath(
70+
classOf[DataFrameHolder].getProtectionDomain.getCodeSource.getLocation)
6971
} finally {
7072
Thread.currentThread().setContextClassLoader(currentClassLoader)
7173
}
@@ -86,14 +88,17 @@ private[spark] case class KyuubiSparkILoop private (
8688

8789
// for feeding results to client, e.g. beeline
8890
this.bind(
89-
"results",
90-
"scala.collection.mutable.ArrayBuffer[" +
91-
"org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]",
92-
results)
91+
"result",
92+
classOf[DataFrameHolder].getCanonicalName,
93+
result)
9394
}
9495
}
9596

96-
def interpretWithRedirectOutError(statement: String): scala.tools.nsc.interpreter.IR.Result = {
97+
def getResult(statementId: String): DataFrame = result.get(statementId)
98+
99+
def clearResult(statementId: String): Unit = result.unset(statementId)
100+
101+
def interpretWithRedirectOutError(statement: String): IR.Result = {
97102
Console.withOut(output) {
98103
Console.withErr(output) {
99104
this.interpret(statement)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KDFRegistry.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
2525
import org.apache.spark.sql.functions.udf
2626

2727
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
28-
import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_SESSION_USER_KEY
28+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
2929

3030
object KDFRegistry {
3131

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import org.apache.spark.scheduler._
2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
2525

26-
import org.apache.kyuubi.KyuubiSparkUtils._
2726
import org.apache.kyuubi.Logging
27+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
28+
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_SQL_EXECUTION_ID_KEY
2829
import org.apache.kyuubi.operation.Operation
2930
import org.apache.kyuubi.operation.log.OperationLog
3031

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
2626
import org.apache.spark.scheduler.local.LocalSchedulerBackend
2727

2828
import org.apache.kyuubi.Logging
29+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
2930
import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
3031
import org.apache.kyuubi.events.EventLogger
3132

@@ -52,6 +53,24 @@ object SparkContextHelper extends Logging {
5253
}
5354
}
5455

56+
/**
57+
* Get a local property set in this thread, or null if it is missing. See
58+
* `org.apache.spark.SparkContext.setLocalProperty`.
59+
*/
60+
private def getLocalProperty(sc: SparkContext, propertyKey: String): String = {
61+
sc.getLocalProperty(propertyKey)
62+
}
63+
64+
/**
65+
* Get `KYUUBI_STATEMENT_ID_KEY` set in this thread, or null if it is missing.
66+
*
67+
* @param sc an active SparkContext
68+
* @return the current statementId or null
69+
*/
70+
def getCurrentStatementId(sc: SparkContext): String = {
71+
getLocalProperty(sc, KYUUBI_STATEMENT_ID_KEY)
72+
}
73+
5574
}
5675

5776
/**

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import scala.annotation.tailrec
2626
import org.apache.spark.SparkException
2727
import org.apache.spark.scheduler._
2828

29-
import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
3029
import org.apache.kyuubi.Logging
3130
import org.apache.kyuubi.Utils.stringifyException
3231
import org.apache.kyuubi.config.KyuubiConf._
32+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
3333
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
3434
import org.apache.kyuubi.service.{Serverable, ServiceState}
3535

kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSparkUtils.scala renamed to kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi
18+
package org.apache.kyuubi.config
1919

20-
object KyuubiSparkUtils {
20+
object KyuubiReservedKeys {
2121
final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
2222
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
23-
final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
24-
final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
2523
}

0 commit comments

Comments
 (0)