Skip to content

Commit 97c3835

Browse files
committed
[KYUUBI #1490] Introduce the basic framework for running scala
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Introduce the basic framework for running scala, see #1490 for the detail ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ``` Beeline version 1.5.0-SNAPSHOT by Apache Kyuubi (Incubating) 0: jdbc:hive2://10.242.189.214:10009/> spark.version; 2021-12-03 13:47:07.556 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:07.560 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:07.558 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:07.559 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:07.560 INFO operation.ExecuteStatement: Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed application ID: local-1638510289918 application web UI: http://10.242.189.214:65027 master: local[*] deploy mode: client version: 3.1.2 Start time: 2021-12-03T13:44:49.313 User: kent 2021-12-03 13:47:07.562 INFO scheduler.DAGScheduler: Asked to cancel job group 321dc15d-68d0-4f91-9216-1e08f09842df 2021-12-03 13:47:07.565 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.006 seconds 2021-12-03 13:47:07.565 ERROR operation.ExecuteStatement: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Query[08b8b6da-d434-4296-b613-2027e3518441] in ERROR_STATE 2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.009 seconds Error: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) (state=,code=0) 0: jdbc:hive2://10.242.189.214:10009/> set kyuubi.operation.language=scala; 2021-12-03 13:47:11.982 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.985 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.983 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.984 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.985 INFO operation.ExecuteStatement: Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed application ID: local-1638510289918 application web UI: http://10.242.189.214:65027 master: local[*] deploy mode: client version: 3.1.2 Start time: 2021-12-03T13:44:49.313 User: kent 2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Execute in full collect mode 2021-12-03 13:47:12.006 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds 2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b] in FINISHED_STATE 2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds +----------------------------+--------+ | key | value | +----------------------------+--------+ | kyuubi.operation.language | scala | +----------------------------+--------+ 1 row selected (0.052 seconds) 0: jdbc:hive2://10.242.189.214:10009/> spark.version; 2021-12-03 13:47:13.685 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:15.541 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:15.543 INFO operation.ExecuteStatement: Query[178ff72d-b870-44d4-99d8-08a0fa0e8efa] in FINISHED_STATE 2021-12-03 13:47:15.544 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: RUNNING_STATE -> FINISHED_STATE, statement: spark.version, time taken: 0.003 seconds +-----------------------+ | output | +-----------------------+ | res0: String = 3.1.2 | +-----------------------+ 1 row selected (1.871 seconds) 0: jdbc:hive2://10.242.189.214:10009/> spark.sql("select current_date()") . . . . . . . . . . . . . . . . . . .> ; 2021-12-03 13:47:36.512 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.sql("select current_date()") 2021-12-03 13:47:36.689 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: PENDING_STATE -> RUNNING_STATE, statement: spark.sql("select current_date()") 2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Query[602c2d88-7e8a-4175-9b53-e34adfff007a] in FINISHED_STATE 2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: RUNNING_STATE -> FINISHED_STATE, statement: spark.sql("select current_date()"), time taken: 0.003 seconds +----------------------------------------------------+ | output | +----------------------------------------------------+ | res1: org.apache.spark.sql.DataFrame = [current_date(): date] | +----------------------------------------------------+ 1 row selected (0.187 seconds) 0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3); Error: Error operating EXECUTE_STATEMENT: org.apache.kyuubi.KyuubiSQLException: Interpret error: results += spark.range(1, 5, 2, 3) <console>:26: error: type mismatch; found : org.apache.spark.sql.Dataset[Long] required: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] results += spark.range(1, 5, 2, 3) ^ at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69) at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:70) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:130) at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:93) at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:62) at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:121) at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:75) at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:118) at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:61) at org.apache.kyuubi.service.ThriftBinaryFrontendService.ExecuteStatement(ThriftBinaryFrontendService.scala:265) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) (state=,code=0) 0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3).toDF; 2021-12-03 13:48:30.327 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: INITIALIZED_STATE -> PENDING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF 2021-12-03 13:48:31.700 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: PENDING_STATE -> RUNNING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF 2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Query[daccd163-978a-4d66-af85-af466086d1db] in FINISHED_STATE 2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: RUNNING_STATE -> FINISHED_STATE, statement: results += spark.range(1, 5, 2, 3).toDF, time taken: 0.003 seconds +-----+ | id | +-----+ | 1 | | 3 | +-----+ 2 rows selected (1.387 seconds) ``` ** Session level isolated ** ![image](https://user-images.githubusercontent.com/8326978/144553902-fc390c30-06de-453b-af3d-cf8def577aa9.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1491 from yaooqinn/scala. Closes #1490 af4d0a1 [Kent Yao] Merge branch 'master' into scala 2ebdf6a [Kent Yao] provided scala dep d58b2bf [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala d256bfd [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala 5209043 [Kent Yao] init Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent a876678 commit 97c3835

File tree

22 files changed

+451
-115
lines changed

22 files changed

+451
-115
lines changed

docs/deployment/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ Key | Default | Meaning | Type | Since
282282
--- | --- | --- | --- | ---
283283
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
284284
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.2.0</div>
285+
kyuubi\.operation<br>\.language|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SQL</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Choose a programing language for the following inputs <ul><li>SQL: (Default) Run all following statements as SQL queries.</li> <li>SCALA: Run all following input a scala codes</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
285286
kyuubi\.operation\.log<br>\.dir\.root|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>server_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at server-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
286287
kyuubi\.operation\.plan<br>\.only\.mode|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
287288
kyuubi\.operation<br>\.query\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>

externals/kyuubi-spark-sql-engine/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,24 @@
5050
<scope>provided</scope>
5151
</dependency>
5252

53+
<dependency>
54+
<groupId>org.apache.spark</groupId>
55+
<artifactId>spark-repl_${scala.binary.version}</artifactId>
56+
<scope>provided</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.scala-lang</groupId>
61+
<artifactId>scala-compiler</artifactId>
62+
<scope>provided</scope>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.scala-lang</groupId>
67+
<artifactId>scala-reflect</artifactId>
68+
<scope>provided</scope>
69+
</dependency>
70+
5371
<dependency>
5472
<groupId>org.apache.hadoop</groupId>
5573
<artifactId>hadoop-client-api</artifactId>

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
2424

2525
import org.apache.spark.{ui, SparkConf}
2626
import org.apache.spark.kyuubi.SparkSQLEngineListener
27+
import org.apache.spark.repl.Main
2728
import org.apache.spark.sql.SparkSession
2829

2930
import org.apache.kyuubi.{KyuubiException, Logging}
@@ -81,6 +82,9 @@ object SparkSQLEngine extends Logging {
8182
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
8283
sparkConf.setIfMissing("spark.master", "local")
8384
sparkConf.setIfMissing("spark.ui.port", "0")
85+
// register the repl's output dir with the file server.
86+
// see also `spark.repl.classdir`
87+
sparkConf.set("spark.repl.class.outputDir", Main.outputDir.getAbsolutePath)
8488
sparkConf.setIfMissing(
8589
"spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads",
8690
"20")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.spark.operation
19+
20+
import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
21+
22+
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.types.StructType
24+
25+
import org.apache.kyuubi.KyuubiSQLException
26+
import org.apache.kyuubi.engine.spark.ArrayFetchIterator
27+
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
28+
import org.apache.kyuubi.operation.OperationType
29+
import org.apache.kyuubi.session.Session
30+
31+
/**
32+
* Support executing Scala Script with or without common Spark APIs, only support running in sync
33+
* mode, as an operation may [[Incomplete]] and wait for others to make [[Success]].
34+
*
35+
* [[KyuubiSparkILoop.results]] is exposed as a [[org.apache.spark.sql.DataFrame]] to users in repl
36+
* to transfer result they wanted to client side.
37+
*
38+
* @param session parent session
39+
* @param repl Scala Interpreter
40+
* @param statement a scala code snippet
41+
*/
42+
class ExecuteScala(
43+
session: Session,
44+
repl: KyuubiSparkILoop,
45+
override val statement: String)
46+
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
47+
48+
override protected def resultSchema: StructType = {
49+
if (result == null || result.schema.isEmpty) {
50+
new StructType().add("output", "string")
51+
} else {
52+
result.schema
53+
}
54+
}
55+
56+
override protected def runInternal(): Unit = {
57+
try {
58+
spark.sparkContext.setJobGroup(statementId, statement)
59+
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
60+
repl.interpret(statement) match {
61+
case Success =>
62+
iter =
63+
if (repl.results.nonEmpty) {
64+
result = repl.results.remove(0)
65+
new ArrayFetchIterator[Row](result.collect())
66+
} else {
67+
new ArrayFetchIterator[Row](Array(Row(repl.getOutput)))
68+
}
69+
case Error =>
70+
throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
71+
case Incomplete =>
72+
throw KyuubiSQLException(s"Incomplete code:\n$statement")
73+
}
74+
} catch {
75+
onError(cancel = true)
76+
} finally {
77+
spark.sparkContext.clearJobGroup()
78+
}
79+
}
80+
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.kyuubi.SQLOperationListener
2525
import org.apache.spark.sql.Row
26-
import org.apache.spark.sql.SparkSession
2726
import org.apache.spark.sql.types._
2827

2928
import org.apache.kyuubi.{KyuubiSQLException, Logging}
@@ -37,13 +36,12 @@ import org.apache.kyuubi.session.Session
3736
import org.apache.kyuubi.util.ThreadUtils
3837

3938
class ExecuteStatement(
40-
spark: SparkSession,
4139
session: Session,
4240
override protected val statement: String,
4341
override val shouldRunAsync: Boolean,
4442
queryTimeout: Long,
4543
incrementalCollect: Boolean)
46-
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
44+
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
4745

4846
import org.apache.kyuubi.KyuubiSparkUtils._
4947

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.SparkSession
2120
import org.apache.spark.sql.types.StructType
2221

2322
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
2625
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
2726
import org.apache.kyuubi.session.Session
2827

29-
class GetCatalogs(spark: SparkSession, session: Session)
30-
extends SparkOperation(spark, OperationType.GET_CATALOGS, session) {
28+
class GetCatalogs(session: Session)
29+
extends SparkOperation(OperationType.GET_CATALOGS, session) {
3130

3231
override protected def resultSchema: StructType = {
3332
new StructType()

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.SparkSession
2120
import org.apache.spark.sql.types._
2221

2322
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
2726
import org.apache.kyuubi.session.Session
2827

2928
class GetColumns(
30-
spark: SparkSession,
3129
session: Session,
3230
catalogName: String,
3331
schemaName: String,
3432
tableName: String,
3533
columnName: String)
36-
extends SparkOperation(spark, OperationType.GET_COLUMNS, session) {
34+
extends SparkOperation(OperationType.GET_COLUMNS, session) {
3735

3836
override def statement: String = {
3937
super.statement +

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
1919

2020
import java.sql.DatabaseMetaData
2121

22-
import org.apache.spark.sql.{Row, SparkSession}
22+
import org.apache.spark.sql.Row
2323
import org.apache.spark.sql.types.StructType
2424

2525
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -28,12 +28,11 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
2828
import org.apache.kyuubi.session.Session
2929

3030
class GetFunctions(
31-
spark: SparkSession,
3231
session: Session,
3332
catalogName: String,
3433
schemaName: String,
3534
functionName: String)
36-
extends SparkOperation(spark, OperationType.GET_FUNCTIONS, session) {
35+
extends SparkOperation(OperationType.GET_FUNCTIONS, session) {
3736

3837
override def statement: String = {
3938
super.statement +

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.SparkSession
2120
import org.apache.spark.sql.types.StructType
2221

2322
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
2625
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
2726
import org.apache.kyuubi.session.Session
2827

29-
class GetSchemas(spark: SparkSession, session: Session, catalogName: String, schema: String)
30-
extends SparkOperation(spark, OperationType.GET_SCHEMAS, session) {
28+
class GetSchemas(session: Session, catalogName: String, schema: String)
29+
extends SparkOperation(OperationType.GET_SCHEMAS, session) {
3130

3231
override def statement: String = {
3332
super.statement + s" [catalog : $catalogName, schemaPattern : $schema]"

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.{Row, SparkSession}
20+
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.types.StructType
2222

2323
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +26,8 @@ import org.apache.kyuubi.operation.OperationType
2626
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
2727
import org.apache.kyuubi.session.Session
2828

29-
class GetTableTypes(spark: SparkSession, session: Session)
30-
extends SparkOperation(spark, OperationType.GET_TABLE_TYPES, session) {
29+
class GetTableTypes(session: Session)
30+
extends SparkOperation(OperationType.GET_TABLE_TYPES, session) {
3131
override protected def resultSchema: StructType = {
3232
new StructType()
3333
.add(TABLE_TYPE, "string", nullable = true, "Table type name.")

0 commit comments

Comments
 (0)