Skip to content

Commit

Permalink
[KYUUBI #1906] Add GetColumns for trino engine
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Add GetColumns for trino engine.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1922 from SteNicholas/KYUUBI-1906.

Closes #1906

e7ff09b [SteNicholas] [KYUUBI #1906] Add GetColumns for trino engine

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
SteNicholas authored and yaooqinn committed Feb 17, 2022
1 parent 2501f9a commit 9bcd08f
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 34 deletions.
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.trino.operation

import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.trino.TrinoStatement
import org.apache.kyuubi.operation.{IterableFetchIterator, OperationType}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.{COLUMN_NAME, TABLE_CAT, TABLE_NAME, TABLE_SCHEM}
import org.apache.kyuubi.session.Session

class GetColumns(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
extends TrinoOperation(OperationType.GET_COLUMNS, session) {

private val SEARCH_STRING_ESCAPE: String = "\\"

override protected def runInternal(): Unit = {
val query = new StringBuilder(
"""
|SELECT TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE,
|TYPE_NAME, COLUMN_SIZE, BUFFER_LENGTH, DECIMAL_DIGITS, NUM_PREC_RADIX,
|NULLABLE, REMARKS, COLUMN_DEF, SQL_DATA_TYPE, SQL_DATETIME_SUB,
|CHAR_OCTET_LENGTH, ORDINAL_POSITION, IS_NULLABLE, SCOPE_CATALOG,
|SCOPE_SCHEMA, SCOPE_TABLE, SOURCE_DATA_TYPE
|FROM system.jdbc.columns
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotEmpty(catalogName)) {
filters += s"$TABLE_CAT = '$catalogName'"
}
if (StringUtils.isNotEmpty(schemaName)) {
filters += s"$TABLE_SCHEM LIKE '$schemaName' ESCAPE '$SEARCH_STRING_ESCAPE'"
}
if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName' ESCAPE '$SEARCH_STRING_ESCAPE'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName' ESCAPE '$SEARCH_STRING_ESCAPE'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

try {
val trinoStatement =
TrinoStatement(trinoContext, session.sessionManager.getConf, query.toString)
schema = trinoStatement.getColumns
val resultSet = trinoStatement.execute()
iter = new IterableFetchIterator(resultSet)
} catch onError()
}
}
Expand Up @@ -82,7 +82,10 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): Operation = null
columnName: String): Operation = {
val op = new GetColumns(session, catalogName, schemaName, tableName, columnName)
addOperation(op)
}

override def newGetFunctionsOperation(
session: Session,
Expand Down
Expand Up @@ -47,40 +47,42 @@ class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {

override protected def jdbcUrl: String = getJdbcUrl

private val standardTypes: Set[String] = Set(
BIGINT,
INTEGER,
SMALLINT,
TINYINT,
BOOLEAN,
DATE,
DECIMAL,
REAL,
DOUBLE,
HYPER_LOG_LOG,
QDIGEST,
P4_HYPER_LOG_LOG,
INTERVAL_DAY_TO_SECOND,
INTERVAL_YEAR_TO_MONTH,
TIMESTAMP,
TIMESTAMP_WITH_TIME_ZONE,
TIME,
TIME_WITH_TIME_ZONE,
VARBINARY,
VARCHAR,
CHAR,
ROW,
ARRAY,
MAP,
JSON,
IPADDRESS,
UUID,
GEOMETRY,
SPHERICAL_GEOGRAPHY,
BING_TILE)

test("trino - get type info") {
withJdbcStatement() { statement =>
val typeInfo = statement.getConnection.getMetaData.getTypeInfo
val types: Set[String] = Set(
BIGINT,
INTEGER,
SMALLINT,
TINYINT,
BOOLEAN,
DATE,
DECIMAL,
REAL,
DOUBLE,
HYPER_LOG_LOG,
QDIGEST,
P4_HYPER_LOG_LOG,
INTERVAL_DAY_TO_SECOND,
INTERVAL_YEAR_TO_MONTH,
TIMESTAMP,
TIMESTAMP_WITH_TIME_ZONE,
TIME,
TIME_WITH_TIME_ZONE,
VARBINARY,
VARCHAR,
CHAR,
ROW,
ARRAY,
MAP,
JSON,
IPADDRESS,
UUID,
GEOMETRY,
SPHERICAL_GEOGRAPHY,
BING_TILE,
val expectedTypes = standardTypes ++ Set(
"color",
"KdbTree",
"CodePoints",
Expand All @@ -98,10 +100,10 @@ class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {
"Classifier")
val typeInfos: Set[String] = Set()
while (typeInfo.next()) {
assert(types.contains(typeInfo.getString(TYPE_NAME)))
assert(expectedTypes.contains(typeInfo.getString(TYPE_NAME)))
typeInfos += typeInfo.getString(TYPE_NAME)
}
assert(types.size === typeInfos.size)
assert(expectedTypes.size === typeInfos.size)
}
}

Expand Down Expand Up @@ -441,6 +443,120 @@ class TrinoOperationSuite extends WithTrinoEngine with HiveJDBCTestHelper {
}
}

test("trino - get columns") {
case class ColumnWithTableAndCatalogAndSchema(
catalog: String,
schema: String,
tableName: String,
columnName: String,
typeName: String)

withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val resultSetBuffer = ArrayBuffer[ColumnWithTableAndCatalogAndSchema]()

var columns = meta.getColumns(null, null, null, null)
while (columns.next()) {
resultSetBuffer +=
ColumnWithTableAndCatalogAndSchema(
columns.getString(TABLE_CAT),
columns.getString(TABLE_SCHEM),
columns.getString(TABLE_NAME),
columns.getString(COLUMN_NAME),
columns.getString(TYPE_NAME))
}
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"table_catalog",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"table_schema",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"table_name",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"column_name",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"ordinal_position",
BIGINT)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"column_default",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"is_nullable",
VARCHAR)))
assert(resultSetBuffer.contains(ColumnWithTableAndCatalogAndSchema(
"memory",
"information_schema",
"columns",
"data_type",
VARCHAR)))

val columnTypes = standardTypes.map {
case ARRAY => s"$ARRAY($VARCHAR)"
case MAP => s"$MAP($VARCHAR, $VARCHAR)"
case ROW => s"$ROW(c $VARCHAR)"
case QDIGEST => s"$QDIGEST($VARCHAR)"
case columnType => columnType
}
var schema: Seq[String] = Seq()
for (position <- 0 until columnTypes.size) {
schema = schema :+ s"c$position ${columnTypes.toSeq(position)}"
}
statement.execute(s"CREATE SCHEMA IF NOT EXISTS memory.test_schema")
statement.execute(
s"CREATE TABLE IF NOT EXISTS memory.test_schema.test_column(${schema.mkString(",")})")

columns = meta.getColumns("memory", "test_schema", "test_column", null)

var position = 0
while (columns.next()) {
assert(columns.getString(TABLE_CAT) === "memory")
assert(columns.getString(TABLE_SCHEM) === "test_schema")
assert(columns.getString(TABLE_NAME) === "test_column")
assert(columns.getString(COLUMN_NAME) === s"c$position")

val expectType = columnTypes.toSeq(position) match {
case CHAR => s"$CHAR(1)"
case DECIMAL => s"$DECIMAL(38,0)"
case TIME => s"$TIME(3)"
case TIME_WITH_TIME_ZONE => s"$TIME(3) with time zone"
case TIMESTAMP => s"$TIMESTAMP(3)"
case TIMESTAMP_WITH_TIME_ZONE => s"$TIMESTAMP(3) with time zone"
case columnType => columnType
}
assert(columns.getString(TYPE_NAME) === expectType)
position += 1
}
assert(position === columnTypes.size, "all columns should have been verified")

statement.execute("DROP TABLE memory.test_schema.test_column")
statement.execute("DROP SCHEMA memory.test_schema")
}
}

test("execute statement - select decimal") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT DECIMAL '1.2' as col1, DECIMAL '1.23' AS col2")
Expand Down

0 comments on commit 9bcd08f

Please sign in to comment.