Skip to content

Commit 03edbce

Browse files
hddongyaooqinn
authored andcommitted
[KYUUBI #1886] Add GetSchemas for trino engine
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add GetSchemas for trino engine ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1902 from hddong/add-trino-get-schemas. Closes #1886 d117f71 [hongdongdong] [KYUUBI #1886] Add GetSchemas for trino engine Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 7f10a23 commit 03edbce

File tree

3 files changed

+133
-2
lines changed

3 files changed

+133
-2
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.trino.operation
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import org.apache.commons.lang3.StringUtils
23+
24+
import org.apache.kyuubi.engine.trino.TrinoStatement
25+
import org.apache.kyuubi.operation.IterableFetchIterator
26+
import org.apache.kyuubi.operation.OperationType
27+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CATALOG
28+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
29+
import org.apache.kyuubi.session.Session
30+
31+
class GetSchemas(session: Session, catalogName: String, schemaPattern: String)
32+
extends TrinoOperation(OperationType.GET_SCHEMAS, session) {
33+
34+
private val SEARCH_STRING_ESCAPE: String = "\\"
35+
36+
override protected def runInternal(): Unit = {
37+
val query = new StringBuilder("SELECT TABLE_SCHEM, TABLE_CATALOG FROM system.jdbc.schemas")
38+
39+
val filters = ArrayBuffer[String]()
40+
if (StringUtils.isNotEmpty(catalogName)) {
41+
filters += s"$TABLE_CATALOG = '$catalogName'"
42+
}
43+
if (StringUtils.isNotEmpty(schemaPattern)) {
44+
filters += s"$TABLE_SCHEM LIKE '$schemaPattern' ESCAPE '$SEARCH_STRING_ESCAPE'"
45+
}
46+
47+
if (filters.nonEmpty) {
48+
query.append(" WHERE ")
49+
query.append(filters.mkString(" AND "))
50+
}
51+
52+
try {
53+
val trinoStatement = TrinoStatement(
54+
trinoContext,
55+
session.sessionManager.getConf,
56+
query.toString)
57+
schema = trinoStatement.getColumns
58+
val resultSet = trinoStatement.execute()
59+
iter = new IterableFetchIterator(resultSet)
60+
} catch onError()
61+
}
62+
}

externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
4747
override def newGetSchemasOperation(
4848
session: Session,
4949
catalog: String,
50-
schema: String): Operation = null
50+
schema: String): Operation = {
51+
val op = new GetSchemas(session, catalog, schema)
52+
addOperation(op)
53+
}
5154

5255
override def newGetTablesOperation(
5356
session: Session,

externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationSuite.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hive.service.rpc.thrift.TStatusCode
3434
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG
3535
import org.apache.kyuubi.engine.trino.WithTrinoEngine
3636
import org.apache.kyuubi.operation.HiveJDBCTestHelper
37-
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.{TABLE_CAT, TABLE_TYPE}
37+
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
3838

3939
class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {
4040
override def withKyuubiConf: Map[String, String] = Map(
@@ -71,6 +71,72 @@ class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {
7171
}
7272
}
7373

74+
test("trino - get schemas") {
75+
case class SchemaWithCatalog(catalog: String, schema: String)
76+
77+
withJdbcStatement() { statement =>
78+
statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test_escape_1")
79+
statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test2escape_1")
80+
statement.execute("CREATE SCHEMA IF NOT EXISTS memory.test_escape11")
81+
82+
val meta = statement.getConnection.getMetaData
83+
val resultSetBuffer = ArrayBuffer[SchemaWithCatalog]()
84+
85+
val schemas1 = meta.getSchemas(null, null)
86+
while (schemas1.next()) {
87+
resultSetBuffer +=
88+
SchemaWithCatalog(schemas1.getString(TABLE_CATALOG), schemas1.getString(TABLE_SCHEM))
89+
}
90+
assert(resultSetBuffer.contains(SchemaWithCatalog("memory", "information_schema")))
91+
assert(resultSetBuffer.contains(SchemaWithCatalog("system", "information_schema")))
92+
93+
val schemas2 = meta.getSchemas("memory", null)
94+
resultSetBuffer.clear()
95+
while (schemas2.next()) {
96+
resultSetBuffer +=
97+
SchemaWithCatalog(schemas2.getString(TABLE_CATALOG), schemas2.getString(TABLE_SCHEM))
98+
}
99+
assert(resultSetBuffer.contains(SchemaWithCatalog("memory", "default")))
100+
assert(resultSetBuffer.contains(SchemaWithCatalog("memory", "information_schema")))
101+
assert(!resultSetBuffer.exists(f => f.catalog == "system"))
102+
103+
val schemas3 = meta.getSchemas(null, "sf_")
104+
resultSetBuffer.clear()
105+
while (schemas3.next()) {
106+
resultSetBuffer +=
107+
SchemaWithCatalog(schemas3.getString(TABLE_CATALOG), schemas3.getString(TABLE_SCHEM))
108+
}
109+
assert(resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf1")))
110+
assert(!resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf10")))
111+
112+
val schemas4 = meta.getSchemas(null, "sf%")
113+
resultSetBuffer.clear()
114+
while (schemas4.next()) {
115+
resultSetBuffer +=
116+
SchemaWithCatalog(schemas4.getString(TABLE_CATALOG), schemas4.getString(TABLE_SCHEM))
117+
}
118+
assert(resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf1")))
119+
assert(resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf10")))
120+
assert(resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf100")))
121+
assert(resultSetBuffer.contains(SchemaWithCatalog("tpcds", "sf1000")))
122+
123+
// test escape the second '_'
124+
val schemas5 = meta.getSchemas("memory", "test_escape\\_1")
125+
resultSetBuffer.clear()
126+
while (schemas5.next()) {
127+
resultSetBuffer +=
128+
SchemaWithCatalog(schemas5.getString(TABLE_CATALOG), schemas5.getString(TABLE_SCHEM))
129+
}
130+
assert(resultSetBuffer.contains(SchemaWithCatalog("memory", "test_escape_1")))
131+
assert(resultSetBuffer.contains(SchemaWithCatalog("memory", "test2escape_1")))
132+
assert(!resultSetBuffer.contains(SchemaWithCatalog("memory", "test_escape11")))
133+
134+
statement.execute("DROP SCHEMA memory.test_escape_1")
135+
statement.execute("DROP SCHEMA memory.test2escape_1")
136+
statement.execute("DROP SCHEMA memory.test_escape11")
137+
}
138+
}
139+
74140
test("execute statement - select decimal") {
75141
withJdbcStatement() { statement =>
76142
val resultSet = statement.executeQuery("SELECT DECIMAL '1.2' as col1, DECIMAL '1.23' AS col2")

0 commit comments

Comments
 (0)