Skip to content

Commit

Permalink
[KYUUBI #1756] Clean up code of FlinkOperation
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Minor code cleanup.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1756 from pan3793/cleanup.

Closes #1756

319a985 [Cheng Pan] nit
57804ea [Cheng Pan] nit
1f8db1e [Cheng Pan] Clean up code of FlinkOperation

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 committed Jan 16, 2022
1 parent 783fc16 commit 84839e0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 46 deletions.
Expand Up @@ -143,4 +143,7 @@ abstract class FlinkOperation(
}
}

implicit class RichOptional[A](val optional: java.util.Optional[A]) {
def asScala: Option[A] = if (optional.isPresent) Some(optional.get) else None
}
}
Expand Up @@ -30,12 +30,8 @@ class GetCatalogs(session: Session)
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val catalogs: java.util.List[String] =
tableEnv.listCatalogs.toList.asJava
resultSet = OperationUtil.stringListToResultSet(
catalogs,
TABLE_CAT)
val catalogs = tableEnv.listCatalogs.toList.asJava
resultSet = OperationUtil.stringListToResultSet(catalogs, TABLE_CAT)
} catch onError()
}

}
Expand Up @@ -28,8 +28,7 @@ class GetTableTypes(session: Session)
extends FlinkOperation(OperationType.GET_TABLE_TYPES, session) {

override protected def runInternal(): Unit = {
resultSet = OperationUtil.stringListToResultSet(
Constants.SUPPORTED_TABLE_TYPES.toList.asJava,
TABLE_TYPE)
val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList.asJava
resultSet = OperationUtil.stringListToResultSet(tableTypes, TABLE_TYPE)
}
}
Expand Up @@ -17,9 +17,10 @@

package org.apache.kyuubi.engine.flink.operation

import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, seqAsJavaListConverter}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.catalog.ObjectIdentifier

import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
Expand All @@ -38,49 +39,33 @@ class GetTables(
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment

var catalogName = catalog
if (catalog == null || catalog.isEmpty) {
catalogName = tableEnv.getCurrentCatalog
}

val schemaPattern = toJavaRegex(schema).r.pattern
val tableNamePattern = toJavaRegex(tableName).r.pattern
val catalogName = if (StringUtils.isEmpty(catalog)) tableEnv.getCurrentCatalog else catalog

var tables = List[String]()
val schemaPattern = toJavaRegex(schema).r
val tableNamePattern = toJavaRegex(tableName).r

val optional = tableEnv.getCatalog(catalogName)
if (optional.isPresent) {
val currCatalog = optional.get()
tables = currCatalog.listDatabases().asScala
.filter(database =>
schemaPattern.matcher(database).matches())
.flatMap { database =>
currCatalog.listTables(database).asScala
.filter(identifier =>
tableNamePattern.matcher(identifier).matches())
.filter(identifier => {
// only table or view
if (!tableTypes.contains(Constants.TABLE_TYPE) || !tableTypes.contains(
Constants.VIEW_TYPE)) {
// try to get table kind
Try(currCatalog.getTable(ObjectIdentifier.of(
catalogName,
database,
identifier).toObjectPath)) match {
case Success(table) => tableTypes.contains(table.getTableKind.name())
val tables = tableEnv.getCatalog(catalogName).asScala.toSeq.flatMap { flinkCatalog =>
flinkCatalog.listDatabases().asScala
.filter { _schema => schemaPattern.pattern.matcher(_schema).matches() }
.flatMap { _schema =>
flinkCatalog.listTables(_schema).asScala
.filter { _table => tableNamePattern.pattern.matcher(_table).matches() }
.filter { _table =>
// skip check type of every table if request all types
if (Set(Constants.TABLE_TYPE, Constants.VIEW_TYPE) subsetOf tableTypes) {
true
} else {
val objPath = ObjectIdentifier.of(catalogName, _schema, _table).toObjectPath
Try(flinkCatalog.getTable(objPath)) match {
case Success(table) => tableTypes.contains(table.getTableKind.name)
case Failure(_) => false
}
} else {
true
}
})
}.toList
}
}
}

resultSet = OperationUtil.stringListToResultSet(
tables.asJava,
Constants.SHOW_TABLES_RESULT)

resultSet = OperationUtil.stringListToResultSet(tables.asJava, Constants.SHOW_TABLES_RESULT)
} catch onError()
}
}

0 comments on commit 84839e0

Please sign in to comment.