Skip to content

Commit

Permalink
[KYUUBI #1643][FOLLOWUP] Fix Flink GetFunctions result schema
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Implement GetFunctions operation.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1773 from SteNicholas/KYUUBI-1643.

Closes #1643

498c7f9 [SteNicholas] [KYUUBI #1643] Implement GetFunctions operation

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
SteNicholas authored and pan3793 committed Jan 18, 2022
1 parent 7ca1665 commit 012fc75
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.flink.operation

import java.sql.DatabaseMetaData

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -45,22 +47,35 @@ class GetFunctions(
val systemFunctions = filterPattern(
tableEnv.listFunctions().diff(tableEnv.listUserDefinedFunctions()),
functionPattern)
.map { f => Row.of(null, null, f) }
.map { f =>
Row.of(null, null, f, null, Integer.valueOf(DatabaseMetaData.functionResultUnknown), null)
}
val catalogFunctions = tableEnv.listCatalogs()
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
.flatMap { c =>
val catalog = tableEnv.getCatalog(c).get()
filterPattern(catalog.listDatabases().asScala, schemaPattern)
.flatMap { d =>
filterPattern(catalog.listFunctions(d).asScala, functionPattern)
.map { f => Row.of(c, d, f) }
.map { f =>
Row.of(
c,
d,
f,
null,
Integer.valueOf(DatabaseMetaData.functionResultUnknown),
null)
}
}
}
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical(FUNCTION_CAT, DataTypes.STRING()),
Column.physical(FUNCTION_SCHEM, DataTypes.STRING()),
Column.physical(FUNCTION_NAME, DataTypes.STRING()))
Column.physical(FUNCTION_NAME, DataTypes.STRING()),
Column.physical(REMARKS, DataTypes.STRING()),
Column.physical(FUNCTION_TYPE, DataTypes.INT()),
Column.physical(SPECIFIC_NAME, DataTypes.STRING()))
.data(systemFunctions ++: catalogFunctions)
.build
} catch {
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.flink.operation

import java.sql.DatabaseMetaData

import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
import org.apache.flink.table.types.logical.LogicalTypeRoot
Expand Down Expand Up @@ -330,14 +332,14 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
val metaData = statement.getConnection.getMetaData
var resultSet = metaData.getSchemas(null, null)
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
}
resultSet = metaData.getSchemas(
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) == DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
}
}
Expand Down Expand Up @@ -412,26 +414,28 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
val metaData = statement.getConnection.getMetaData
Seq("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
.foreach { func =>
Seq(metaData.getFunctions _).foreach { apiFunc =>
val resultSet = apiFunc(null, null, func)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) == null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(resultSet.getString(FUNCTION_NAME) === func)
}
val resultSet = metaData.getFunctions(null, null, func)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) === null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(resultSet.getString(FUNCTION_NAME) === func)
assert(resultSet.getString(REMARKS) === null)
assert(resultSet.getInt(FUNCTION_TYPE) === DatabaseMetaData.functionResultUnknown)
assert(resultSet.getString(SPECIFIC_NAME) === null)
}
}
val expected =
List("currentTimestamp", "currentDate", "currentTime", "localTimestamp", "localTime")
Seq("current", "local")
.foreach { funcPattern =>
Seq(metaData.getFunctions _).foreach { apiFunc =>
val resultSet = apiFunc(null, null, funcPattern)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) == null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(expected.contains(resultSet.getString(FUNCTION_NAME)))
}
val resultSet = metaData.getFunctions(null, null, funcPattern)
while (resultSet.next()) {
assert(resultSet.getString(FUNCTION_CAT) === null)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(expected.contains(resultSet.getString(FUNCTION_NAME)))
assert(resultSet.getString(REMARKS) === null)
assert(resultSet.getString(FUNCTION_TYPE) === DatabaseMetaData.functionResultUnknown)
assert(resultSet.getString(SPECIFIC_NAME) === null)
}
}
}
Expand Down

0 comments on commit 012fc75

Please sign in to comment.