Skip to content

Commit cab0762

Browse files
SteNicholasyaooqinn
authored andcommitted
[KYUUBI #1645] Implement Flink engine GetSchemas operation
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Implement GetSchemas operation. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1768 from SteNicholas/KYUUBI-1645. Closes #1645 7ff66db [SteNicholas] [KYUUBI #1645] Implement GetSchemas operation Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 36b95d3 commit cab0762

File tree

3 files changed

+83
-1
lines changed

3 files changed

+83
-1
lines changed

externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
4747
override def newGetSchemasOperation(
4848
session: Session,
4949
catalog: String,
50-
schema: String): Operation = null
50+
schema: String): Operation = {
51+
val op = new GetSchemas(session, catalog, schema)
52+
addOperation(op)
53+
}
5154

5255
override def newGetTablesOperation(
5356
session: Session,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.flink.operation
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.commons.lang3.StringUtils
23+
import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
24+
import org.apache.flink.table.catalog.Column
25+
import org.apache.flink.types.Row
26+
27+
import org.apache.kyuubi.engine.flink.result.ResultSet
28+
import org.apache.kyuubi.engine.flink.util.StringUtils.filterPattern
29+
import org.apache.kyuubi.operation.OperationType
30+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
31+
import org.apache.kyuubi.session.Session
32+
33+
class GetSchemas(session: Session, catalogName: String, schema: String)
34+
extends FlinkOperation(OperationType.GET_SCHEMAS, session) {
35+
36+
override protected def runInternal(): Unit = {
37+
try {
38+
val schemaPattern = toJavaRegex(schema)
39+
val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
40+
val schemas = tableEnv.listCatalogs()
41+
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
42+
.flatMap { c =>
43+
val catalog = tableEnv.getCatalog(c).get()
44+
filterPattern(catalog.listDatabases().asScala, schemaPattern)
45+
.map { d => Row.of(d, c) }
46+
}
47+
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
48+
.columns(
49+
Column.physical(TABLE_SCHEM, DataTypes.STRING()),
50+
Column.physical(TABLE_CATALOG, DataTypes.STRING()))
51+
.data(schemas)
52+
.build
53+
} catch {
54+
onError()
55+
}
56+
}
57+
}

externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.engine.flink.operation
1919

20+
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
21+
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
2022
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2123
import org.scalatest.time.SpanSugar._
2224

@@ -27,6 +29,8 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_CAT
2729
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_NAME
2830
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.FUNCTION_SCHEM
2931
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
32+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CATALOG
33+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
3034
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_TYPE
3135
import org.apache.kyuubi.service.ServiceState._
3236

@@ -58,6 +62,24 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
5862
}
5963
}
6064

65+
test("get schemas") {
66+
withJdbcStatement() { statement =>
67+
val metaData = statement.getConnection.getMetaData
68+
var resultSet = metaData.getSchemas(null, null)
69+
while (resultSet.next()) {
70+
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
71+
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
72+
}
73+
resultSet = metaData.getSchemas(
74+
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
75+
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
76+
while (resultSet.next()) {
77+
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
78+
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
79+
}
80+
}
81+
}
82+
6183
test("get table types") {
6284
withJdbcStatement() { statement =>
6385
val meta = statement.getConnection.getMetaData

0 commit comments

Comments
 (0)