Skip to content

Commit 9b50230

Browse files
cxzl25pan3793
authored andcommitted
[KYUUBI #2721] Implement dedicated set/get catalog/database operators
### _Why are the changes needed?_ close #2721 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2728 from cxzl25/KYUUBI-2721. Closes #2721 6705164 [sychen] use placeholders and confOverlay 2acf0a4 [sychen] unused import 8bc40b9 [sychen] fix UT 3495b28 [sychen] fix jdbc e55bb2e [sychen] style ff9078a [sychen] fix UT a0247c7 [sychen] Merge branch 'master' into KYUUBI-2721 f62461c [sychen] use placeholders 698fbbd [sychen] set catalog/schema empty resultset 45caf55 [sychen] fix import 7f13ad8 [sychen] fix server spark catalog UT 4982ec7 [sychen] hive engine 1.8 9877038 [sychen] add UT 21e5787 [sychen] set/get catalog/database operators Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent dbe315e commit 9b50230

File tree

38 files changed

+1178
-13
lines changed

38 files changed

+1178
-13
lines changed

docs/deployment/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ Key | Default | Meaning | Type | Since
223223
<code>kyuubi.engine.jdbc.connection.user</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The user is used for connecting to server</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
224224
<code>kyuubi.engine.jdbc.driver.class</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The driver class for jdbc engine connection</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
225225
<code>kyuubi.engine.jdbc.type</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The short name of jdbc type</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
226+
<code>kyuubi.engine.operation.convert.catalog.database.enabled</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
226227
<code>kyuubi.engine.operation.log.dir.root</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at engine-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
227228
<code>kyuubi.engine.pool.name</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine-pool</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The name of engine pool.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
228229
<code>kyuubi.engine.pool.size</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,24 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
3636

3737
private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
3838

39+
private lazy val operationConvertCatalogDatabaseDefault =
40+
getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)
41+
3942
override def newExecuteStatementOperation(
4043
session: Session,
4144
statement: String,
4245
confOverlay: Map[String, String],
4346
runAsync: Boolean,
4447
queryTimeout: Long): Operation = {
4548
val flinkSession = session.asInstanceOf[FlinkSessionImpl]
49+
if (flinkSession.sessionContext.getConfigMap.getOrDefault(
50+
ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key,
51+
operationConvertCatalogDatabaseDefault.toString).toBoolean) {
52+
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
53+
if (catalogDatabaseOperation != null) {
54+
return catalogDatabaseOperation
55+
}
56+
}
4657
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
4758
OPERATION_PLAN_ONLY_MODE.key,
4859
operationModeDefault)
@@ -59,6 +70,26 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
5970
addOperation(op)
6071
}
6172

73+
override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = {
74+
val op = new SetCurrentCatalog(session, catalog)
75+
addOperation(op)
76+
}
77+
78+
override def newGetCurrentCatalogOperation(session: Session): Operation = {
79+
val op = new GetCurrentCatalog(session)
80+
addOperation(op)
81+
}
82+
83+
override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = {
84+
val op = new SetCurrentDatabase(session, database)
85+
addOperation(op)
86+
}
87+
88+
override def newGetCurrentDatabaseOperation(session: Session): Operation = {
89+
val op = new GetCurrentDatabase(session)
90+
addOperation(op)
91+
}
92+
6293
override def newGetTypeInfoOperation(session: Session): Operation = {
6394
val op = new GetTypeInfo(session)
6495
addOperation(op)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
21+
import org.apache.kyuubi.operation.OperationType
22+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
23+
import org.apache.kyuubi.session.Session
24+
25+
class GetCurrentCatalog(session: Session)
26+
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
27+
28+
override protected def runInternal(): Unit = {
29+
try {
30+
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
31+
val catalog = tableEnv.getCurrentCatalog
32+
resultSet = ResultSetUtil.stringListToResultSet(List(catalog), TABLE_CAT)
33+
} catch onError()
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
21+
import org.apache.kyuubi.operation.OperationType
22+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
23+
import org.apache.kyuubi.session.Session
24+
25+
class GetCurrentDatabase(session: Session)
26+
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
27+
28+
override protected def runInternal(): Unit = {
29+
try {
30+
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
31+
val database = tableEnv.getCurrentDatabase
32+
resultSet = ResultSetUtil.stringListToResultSet(List(database), TABLE_SCHEM)
33+
} catch onError()
34+
}
35+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import org.apache.kyuubi.operation.OperationType
21+
import org.apache.kyuubi.session.Session
22+
23+
class SetCurrentCatalog(session: Session, catalog: String)
24+
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
25+
26+
override protected def runInternal(): Unit = {
27+
try {
28+
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
29+
tableEnv.useCatalog(catalog)
30+
setHasResultSet(false)
31+
} catch onError()
32+
}
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import org.apache.kyuubi.operation.OperationType
21+
import org.apache.kyuubi.session.Session
22+
23+
class SetCurrentDatabase(session: Session, database: String)
24+
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
25+
26+
override protected def runInternal(): Unit = {
27+
try {
28+
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
29+
tableEnv.useDatabase(database)
30+
setHasResultSet(false)
31+
} catch onError()
32+
}
33+
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsRe
2626
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2727
import org.scalatest.time.SpanSugar._
2828

29-
import org.apache.kyuubi.config.KyuubiConf
29+
import org.apache.kyuubi.config.KyuubiConf._
3030
import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
3131
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
3232
import org.apache.kyuubi.engine.flink.result.Constants
@@ -37,7 +37,7 @@ import org.apache.kyuubi.service.ServiceState._
3737

3838
class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
3939
override def withKyuubiConf: Map[String, String] =
40-
Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NONE.toString)
40+
Map(OPERATION_PLAN_ONLY_MODE.key -> NONE.toString)
4141

4242
override protected def jdbcUrl: String =
4343
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
@@ -665,6 +665,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
665665
})
666666
}
667667

668+
test("execute statement - set/get catalog") {
669+
withSessionConf()(
670+
Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> "true"))(
671+
Map.empty) {
672+
withJdbcStatement() { statement =>
673+
statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
674+
val catalog = statement.getConnection.getCatalog
675+
assert(catalog == "default_catalog")
676+
statement.getConnection.setCatalog("cat_a")
677+
val changedCatalog = statement.getConnection.getCatalog
678+
assert(changedCatalog == "cat_a")
679+
assert(statement.execute("drop catalog cat_a"))
680+
}
681+
}
682+
}
683+
668684
test("execute statement - create/alter/drop database") {
669685
// TODO: validate table results after FLINK-25558 is resolved
670686
withJdbcStatement()({ statement =>
@@ -674,6 +690,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
674690
})
675691
}
676692

693+
test("execute statement - set/get database") {
694+
withSessionConf()(
695+
Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> "true"))(
696+
Map.empty) {
697+
withJdbcStatement()({ statement =>
698+
statement.executeQuery("create database db_a")
699+
val schema = statement.getConnection.getSchema
700+
assert(schema == "default_database")
701+
statement.getConnection.setSchema("db_a")
702+
val changedSchema = statement.getConnection.getSchema
703+
assert(changedSchema == "db_a")
704+
assert(statement.execute("drop database db_a"))
705+
})
706+
}
707+
}
708+
677709
test("execute statement - create/alter/drop table") {
678710
// TODO: validate table results after FLINK-25558 is resolved
679711
withJdbcStatement()({ statement =>
@@ -777,7 +809,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
777809
}
778810

779811
test("ensure result max rows") {
780-
withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
812+
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
781813
withJdbcStatement() { statement =>
782814
statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
783815
val resultSet = statement.executeQuery(s"select a from tbl_src")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.hive.operation
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.hive.service.cli.operation.Operation
23+
24+
import org.apache.kyuubi.operation.OperationType
25+
import org.apache.kyuubi.session.Session
26+
27+
class GetCurrentCatalog(session: Session)
28+
extends HiveOperation(OperationType.EXECUTE_STATEMENT, session) {
29+
// Hive does not support catalog
30+
override val internalHiveOperation: Operation =
31+
delegatedOperationManager.newExecuteStatementOperation(
32+
hive,
33+
"SELECT '' AS TABLE_CAT",
34+
Map.empty[String, String].asJava,
35+
false,
36+
0)
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.hive.operation
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.hive.service.cli.operation.Operation
23+
24+
import org.apache.kyuubi.operation.OperationType
25+
import org.apache.kyuubi.session.Session
26+
27+
class GetCurrentDatabase(session: Session)
28+
extends HiveOperation(OperationType.EXECUTE_STATEMENT, session) {
29+
30+
override val internalHiveOperation: Operation =
31+
delegatedOperationManager.newExecuteStatementOperation(
32+
hive,
33+
"SELECT current_database()",
34+
Map.empty[String, String].asJava,
35+
false,
36+
0)
37+
}

externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.List
2222
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2323
import org.apache.hive.service.rpc.thrift.TRowSet
2424

25+
import org.apache.kyuubi.config.KyuubiConf._
2526
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
2627
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
2728
import org.apache.kyuubi.session.Session
@@ -36,10 +37,36 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
3637
confOverlay: Map[String, String],
3738
runAsync: Boolean,
3839
queryTimeout: Long): Operation = {
40+
if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
41+
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
42+
if (catalogDatabaseOperation != null) {
43+
return catalogDatabaseOperation
44+
}
45+
}
3946
val operation = new ExecuteStatement(session, statement, confOverlay, runAsync, queryTimeout)
4047
addOperation(operation)
4148
}
4249

50+
override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = {
51+
val op = new SetCurrentCatalog(session, catalog)
52+
addOperation(op)
53+
}
54+
55+
override def newGetCurrentCatalogOperation(session: Session): Operation = {
56+
val op = new GetCurrentCatalog(session)
57+
addOperation(op)
58+
}
59+
60+
override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = {
61+
val op = new SetCurrentDatabase(session, database)
62+
addOperation(op)
63+
}
64+
65+
override def newGetCurrentDatabaseOperation(session: Session): Operation = {
66+
val op = new GetCurrentDatabase(session)
67+
addOperation(op)
68+
}
69+
4370
override def newGetTypeInfoOperation(session: Session): Operation = {
4471
val operation = new GetTypeInfo(session)
4572
addOperation(operation)

0 commit comments

Comments
 (0)