From 9814ec01ed0f8031f055bea63e01d713f56945a2 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Tue, 14 Mar 2017 16:44:02 +0800 Subject: [PATCH 1/6] [FLINK-6039] [core] Row of TableFunction should support flexible number of fields --- .../main/java/org/apache/flink/types/Row.java | 5 +- .../java/org/apache/flink/types/RowTest.java | 10 ++++ .../DataSetUserDefinedFunctionITCase.scala | 47 +++++++++++++++ .../utils/UserDefinedTableFunctions.scala | 60 +++++++++++++++++++ 4 files changed, 120 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java index 0b2120f93db35..e2f2e39b10772 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Row.java +++ b/flink-core/src/main/java/org/apache/flink/types/Row.java @@ -66,10 +66,11 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. - * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. + * Return null if the position is equal to, or larger than the number of fields. + * @throws IndexOutOfBoundsException Thrown, if the position is negative. */ public Object getField(int pos) { - return fields[pos]; + return pos >= fields.length ? null : fields[pos]; } /** diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java index 13a4d6afd4020..5181516e8422f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class RowTest { @Test @@ -46,4 +47,13 @@ public void testRowOf() { row2.setField(4, true); assertEquals(row1, row2); } + + @Test + public void testOutOfBound() { + Row row = new Row(2); + assertNull(row.getField(0)); + row.setField(0, 0); + row.setField(1, 1); + assertNull(row.getField(2)); + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala index 20bbf8b3aa47f..dc4eb12d7fd0e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala @@ -116,6 +116,53 @@ class DataSetUserDefinedFunctionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testIncompleteColumns(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + val func4 = new TableFunc4 + + val result = in + .join(func4('c) as ('name, 'lenone, 'lentwo)) + .select('c, 'name, 'lenone, 'lentwo) + .toDataSet[Row] + + val results = result.collect() + val expected = "Jack#22,Jack,null,null\n" + "Jack#22,22,null,null\n" + + "John#19,John,null,null\n" + "John#19,19,null,null\n" + "Anna#44,Anna,null,null\n" + + "Anna#44,44,null,null\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + + val func5 = new TableFunc5 + + val result1 = in + .join(func5('c) as ('name, 'lenone, 'lentwo)) + .select('c, 'name, 'lenone, 'lentwo) + .toDataSet[Row] + + val results1 = result1.collect() + TestBaseUtils.compareResultAsText(results1.asJava, expected) + } + + @Test + def testOverflowColumns(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + val func6 = new TableFunc6 + + val result = in + .join(func6('c) as ('name, 'len)) + .select('c, 'name, 'len) + .toDataSet[Row] + + val results = result.collect() + val expected = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" + + "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + @Test def testHierarchyType(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala index d0ffade253c95..a8fc690353d95 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala @@ -109,6 +109,66 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[ } } +// Test for incomplete row +class TableFunc4 extends TableFunction[Row] { + def eval(str: String): Unit = { + if (str.contains("#")) { + str.split("#").foreach({ s => + val row = new Row(3) + row.setField(0, s) // And we only set values for one column + collect(row) + }) + } + } + + override def getResultType: TypeInformation[Row] = { + new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO) + } +} + +// Test for incomplete row +class TableFunc5 extends TableFunction[Row] { + def eval(str: String): Unit = { + if (str.contains("#")) { + str.split("#").foreach({ s => + val row = new Row(1) // ResultType is three columns, we have only one here + row.setField(0, s) + collect(row) + }) + } + } + + override def getResultType: TypeInformation[Row] = { + new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO) + } +} + +// Test for overflow row +class TableFunc6 extends TableFunction[Row] { + def eval(str: String): Unit = { + if (str.contains("#")) { + str.split("#").foreach({ s => + val row = new Row(5) // ResultType is two columns, we have five columns here + row.setField(0, s) + row.setField(1, s.length) + row.setField(2, s.length) + row.setField(3, s.length) + row.setField(4, s.length) + collect(row) + }) + } + } + + override def getResultType: TypeInformation[Row] = { + new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO) + } +} + class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] { def eval(user: String) { if (user.contains("#")) { From 3a32307c5fb48eed4aa34f381e23d21e5e78823a Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Thu, 16 Mar 2017 14:34:39 +0800 Subject: [PATCH 2/6] [FLINK-6039] [core] Revert some modifications --- flink-core/src/main/java/org/apache/flink/types/Row.java | 5 ++--- .../org/apache/flink/table/api/TableEnvironment.scala | 9 ++++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java index e2f2e39b10772..0b2120f93db35 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Row.java +++ b/flink-core/src/main/java/org/apache/flink/types/Row.java @@ -66,11 +66,10 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. - * Return null if the position is equal to, or larger than the number of fields. - * @throws IndexOutOfBoundsException Thrown, if the position is negative. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. */ public Object getField(int pos) { - return pos >= fields.length ? null : fields[pos]; + return fields[pos]; } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4a3632040f9bd..7372cdbef46b9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -271,16 +271,15 @@ abstract class TableEnvironment(val config: TableConfig) { // check if class could be instantiated checkForInstantiation(function.getClass) + // register in Table API + functionCatalog.registerFunction(name, function.getClass) + + // register in SQL API val typeInfo: TypeInformation[_] = if (function.getResultType != null) { function.getResultType } else { implicitly[TypeInformation[T]] } - - // register in Table API - functionCatalog.registerFunction(name, function.getClass) - - // register in SQL API val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory) functionCatalog.registerSqlFunctions(sqlFunctions) } From babae43320e856863acafd4f96b101b78eba5443 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Sat, 18 Mar 2017 00:56:13 +0800 Subject: [PATCH 3/6] [FLINK-6039] [core] Support dynamic schema of TableFunction --- .../flink/table/api/TableEnvironment.scala | 7 +- .../flink/table/functions/TableFunction.scala | 16 +++- .../functions/utils/TableSqlFunction.scala | 19 +++-- .../utils/UserDefinedFunctionUtils.scala | 7 +- .../flink/table/plan/logical/operators.scala | 4 +- .../plan/schema/FlinkTableFunctionImpl.scala | 82 ++++++++++++------- .../table/validate/FunctionCatalog.scala | 8 +- .../utils/UserDefinedTableFunctions.scala | 8 +- 8 files changed, 91 insertions(+), 60 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 7372cdbef46b9..6ab005b9a476c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -275,12 +275,7 @@ abstract class TableEnvironment(val config: TableConfig) { functionCatalog.registerFunction(name, function.getClass) // register in SQL API - val typeInfo: TypeInformation[_] = if (function.getResultType != null) { - function.getResultType - } else { - implicitly[TypeInformation[T]] - } - val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory) + val sqlFunctions = createTableSqlFunctions(name, function, typeFactory) functionCatalog.registerSqlFunctions(sqlFunctions) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala index d4c5021ed108a..5eeec2729d530 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala @@ -19,8 +19,9 @@ package org.apache.flink.table.functions import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.expressions.{Expression, TableFunctionCall} +import org.apache.flink.table.expressions.{Expression, Literal, TableFunctionCall} import org.apache.flink.util.Collector +import scala.collection.JavaConversions._ /** * Base class for a user-defined table function (UDTF). A user-defined table functions works on @@ -86,10 +87,16 @@ abstract class TableFunction[T] extends UserDefinedFunction { * @return [[Expression]] in form of a [[TableFunctionCall]] */ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = { - val resultType = if (getResultType == null) { + val arguments = params.map { + case exp: Literal => + exp.value.asInstanceOf[AnyRef] + case _ => + null + } + val resultType = if (getResultType(arguments) == null) { typeInfo } else { - getResultType + getResultType(arguments) } TableFunctionCall(getClass.getSimpleName, this, params, resultType) } @@ -131,8 +138,9 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * + * @param arguments * @return [[TypeInformation]] of result type or null if Flink should determine the type */ - def getResultType: TypeInformation[T] = null + def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[T] = null } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala index 74f3374445171..d45c724645dde 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala @@ -25,7 +25,6 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction import org.apache.calcite.util.Util -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl @@ -33,13 +32,14 @@ import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl import scala.collection.JavaConverters._ import java.util +import org.apache.flink.table.api.ValidationException + /** * Calcite wrapper for user-defined table functions. */ class TableSqlFunction( name: String, udtf: TableFunction[_], - rowTypeInfo: TypeInformation[_], returnTypeInference: SqlReturnTypeInference, operandTypeInference: SqlOperandTypeInference, operandTypeChecker: SqlOperandTypeChecker, @@ -61,13 +61,21 @@ class TableSqlFunction( /** * Get the type information of the table returned by the table function. */ - def getRowTypeInfo = rowTypeInfo + def getRowTypeInfo = if (null == functionImpl.resultType) { + throw new ValidationException("The Result Type hasn't been generated yet") + } else { + functionImpl.resultType + } /** * Get additional mapping information if the returned table type is a POJO * (POJO types have no deterministic field order). */ - def getPojoFieldMapping = functionImpl.fieldIndexes + def getPojoFieldMapping = if (null == functionImpl.resultType) { + throw new ValidationException("The Result Type hasn't been generated yet") + } else { + functionImpl.fieldIndexes + } } @@ -78,7 +86,6 @@ object TableSqlFunction { * * @param name function name (used by SQL parser) * @param udtf user-defined table function to be called - * @param rowTypeInfo the row type information generated by the table function * @param typeFactory type factory for converting Flink's between Calcite's types * @param functionImpl Calcite table function schema * @return [[TableSqlFunction]] @@ -86,7 +93,6 @@ object TableSqlFunction { def apply( name: String, udtf: TableFunction[_], - rowTypeInfo: TypeInformation[_], typeFactory: FlinkTypeFactory, functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = { @@ -110,7 +116,6 @@ object TableSqlFunction { new TableSqlFunction( name, udtf, - rowTypeInfo, ReturnTypes.CURSOR, InferTypes.explicit(argTypes), typeChecker, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index c1cfe0610ae5e..c28afed7e3d31 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -209,22 +209,19 @@ object UserDefinedFunctionUtils { * * @param name function name * @param tableFunction table function - * @param resultType the type information of returned table * @param typeFactory type factory * @return the TableSqlFunction */ def createTableSqlFunctions( name: String, tableFunction: TableFunction[_], - resultType: TypeInformation[_], typeFactory: FlinkTypeFactory) : Seq[SqlFunction] = { - val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) val evalMethods = checkAndExtractEvalMethods(tableFunction) evalMethods.map { method => - val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method) - TableSqlFunction(name, tableFunction, resultType, typeFactory, function) + val function = new FlinkTableFunctionImpl(tableFunction, method) + TableSqlFunction(name, tableFunction, typeFactory, function) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 1b5eafb77ae88..e68a855cf4cc9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -690,13 +690,11 @@ case class LogicalTableFunctionCall( } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { - val fieldIndexes = getFieldInfo(resultType)._2 - val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, evalMethod) + val function = new FlinkTableFunctionImpl(tableFunction, evalMethod) val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] val sqlFunction = TableSqlFunction( tableFunction.functionIdentifier, tableFunction, - resultType, typeFactory, function) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala index 1c058839547b3..74c57e98b75fc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala @@ -25,8 +25,11 @@ import org.apache.calcite.schema.TableFunction import org.apache.calcite.schema.impl.ReflectiveFunctionBase import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} /** * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. @@ -34,40 +37,17 @@ import org.apache.flink.table.calcite.FlinkTypeFactory * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. */ class FlinkTableFunctionImpl[T]( - val typeInfo: TypeInformation[T], - val fieldIndexes: Array[Int], - val fieldNames: Array[String], + val tableFunction: FlinkUDTF[T], val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - "Number of field indexes and field names must be equal.") - } - - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException( - "Table field names must be unique.") - } - - val fieldTypes: Array[TypeInformation[_]] = - typeInfo match { - case cType: CompositeType[T] => - if (fieldNames.length != cType.getArity) { - throw new TableException( - s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") - } - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") - } - Array(aType) - } + /** + * Cached resultType, fieldIndexes and fieldNames + */ + var resultType: TypeInformation[T] = _ + var fieldIndexes: Array[Int] = _ + var fieldNames: Array[String] = _ override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] @@ -75,6 +55,48 @@ class FlinkTableFunctionImpl[T]( arguments: util.List[AnyRef]): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val builder = flinkTypeFactory.builder + implicit val typeInfo: TypeInformation[T] = TypeExtractor.createTypeInfo( + tableFunction, classOf[FlinkUDTF[_]], tableFunction.getClass, 0) + .asInstanceOf[TypeInformation[T]] + + resultType = if (tableFunction.getResultType(arguments) != null) { + tableFunction.getResultType(arguments) + } else { + implicitly[TypeInformation[T]] + } + + val fieldTup = UserDefinedFunctionUtils.getFieldInfo(resultType) + fieldNames = fieldTup._1 + fieldIndexes = fieldTup._2 + + if (fieldIndexes.length != fieldNames.length) { + throw new TableException( + "Number of field indexes and field names must be equal.") + } + + // check uniqueness of field names + if (fieldNames.length != fieldNames.toSet.size) { + throw new TableException( + "Table field names must be unique.") + } + + val fieldTypes: Array[TypeInformation[_]] = + resultType match { + case cType: CompositeType[T] => + if (fieldNames.length != cType.getArity) { + throw new TableException( + s"Arity of type (" + cType.getFieldNames.deep + ") " + + "not equal to number of field names " + fieldNames.deep + ".") + } + fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) + case aType: AtomicType[T] => + if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { + throw new TableException( + "Non-composite input type may have only a single field and its index must be 0.") + } + Array(aType) + } + fieldNames .zip(fieldTypes) .foreach { f => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 3c89ec42b9401..c65ae9602cb07 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -92,8 +92,14 @@ class FunctionCatalog { .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction]) .getOrElse(throw ValidationException(s"Undefined table function: $name")) .asInstanceOf[TableSqlFunction] - val typeInfo = tableSqlFunction.getRowTypeInfo val function = tableSqlFunction.getTableFunction + val arguments = children.map { + case exp: Literal => + exp.value.asInstanceOf[AnyRef] + case _ => + null + } + val typeInfo = function.getResultType(arguments) TableFunctionCall(name, function, children, typeInfo) // general expression call diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala index a8fc690353d95..48940ffeac554 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala @@ -68,7 +68,7 @@ class TableFunc2 extends TableFunction[Row] { } } - override def getResultType: TypeInformation[Row] = { + override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row] = { new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) } @@ -121,7 +121,7 @@ class TableFunc4 extends TableFunction[Row] { } } - override def getResultType: TypeInformation[Row] = { + override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row] = { new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) @@ -140,7 +140,7 @@ class TableFunc5 extends TableFunction[Row] { } } - override def getResultType: TypeInformation[Row] = { + override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row] = { new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) @@ -163,7 +163,7 @@ class TableFunc6 extends TableFunction[Row] { } } - override def getResultType: TypeInformation[Row] = { + override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row] = { new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) } From 34d8b2368970ac6f9b2d118c0d12050b97b35004 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Wed, 22 Mar 2017 11:49:49 +0800 Subject: [PATCH 4/6] [FLINK-6039] [core] Fix the dynamic schema of Table API --- .../utils/UserDefinedFunctionUtils.scala | 3 +- .../flink/table/plan/logical/operators.scala | 10 ++- .../plan/schema/FlinkTableFunctionImpl.scala | 64 ++++++++++++------- .../DataSetUserDefinedFunctionITCase.scala | 2 +- 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index c28afed7e3d31..b344d18e14774 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -220,7 +220,8 @@ object UserDefinedFunctionUtils { val evalMethods = checkAndExtractEvalMethods(tableFunction) evalMethods.map { method => - val function = new FlinkTableFunctionImpl(tableFunction, method) + // We don't know the field names without knowing the result type + val function = new FlinkTableFunctionImpl(tableFunction, null, null, method) TableSqlFunction(name, tableFunction, typeFactory, function) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index e68a855cf4cc9..f7d98a8332caa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -690,7 +690,7 @@ case class LogicalTableFunctionCall( } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { - val function = new FlinkTableFunctionImpl(tableFunction, evalMethod) + val function = new FlinkTableFunctionImpl(tableFunction, fieldIndexes, fieldNames, evalMethod) val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] val sqlFunction = TableSqlFunction( tableFunction.functionIdentifier, @@ -698,12 +698,18 @@ case class LogicalTableFunctionCall( typeFactory, function) + val arguments = parameters.map { + case exp: Literal => + exp.value.asInstanceOf[AnyRef] + case _ => + null + }.asJava val scan = LogicalTableFunctionScan.create( relBuilder.peek().getCluster, new util.ArrayList[RelNode](), relBuilder.call(sqlFunction, parameters.map(_.toRexNode(relBuilder)).asJava), function.getElementType(null), - function.getRowType(relBuilder.getTypeFactory, null), + function.getRowType(relBuilder.getTypeFactory, arguments), null) relBuilder.push(scan) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala index 74c57e98b75fc..a73087edf578d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala @@ -35,49 +35,47 @@ import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} * This is heavily inspired by Calcite's [[org.apache.calcite.schema.impl.TableFunctionImpl]]. * We need it in order to create a [[org.apache.flink.table.functions.utils.TableSqlFunction]]. * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. + * + * @param tableFunction The Table Function instance + * @param fieldIndexes The field indexes. If it is null, it will be inferred from + * the [[tableFunction]] + * @param fieldNames The field names. If it is null, it will be inferred from the + * [[tableFunction]] + * @param evalMethod The eval() method of the [[tableFunction]] */ class FlinkTableFunctionImpl[T]( val tableFunction: FlinkUDTF[T], + var fieldIndexes: Array[Int], + var fieldNames: Array[String], val evalMethod: Method) extends ReflectiveFunctionBase(evalMethod) with TableFunction { + checkFields() + /** - * Cached resultType, fieldIndexes and fieldNames + * Cached resultType */ var resultType: TypeInformation[T] = _ - var fieldIndexes: Array[Int] = _ - var fieldNames: Array[String] = _ override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] override def getRowType(typeFactory: RelDataTypeFactory, arguments: util.List[AnyRef]): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - val builder = flinkTypeFactory.builder - implicit val typeInfo: TypeInformation[T] = TypeExtractor.createTypeInfo( - tableFunction, classOf[FlinkUDTF[_]], tableFunction.getClass, 0) - .asInstanceOf[TypeInformation[T]] + // Get the result type from table function resultType = if (tableFunction.getResultType(arguments) != null) { tableFunction.getResultType(arguments) } else { - implicitly[TypeInformation[T]] - } - - val fieldTup = UserDefinedFunctionUtils.getFieldInfo(resultType) - fieldNames = fieldTup._1 - fieldIndexes = fieldTup._2 - - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - "Number of field indexes and field names must be equal.") + TypeExtractor.createTypeInfo( + tableFunction, classOf[FlinkUDTF[_]], tableFunction.getClass, 0) + .asInstanceOf[TypeInformation[T]] } - - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - throw new TableException( - "Table field names must be unique.") + if (null == fieldNames || null == fieldIndexes) { + val fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType) + fieldNames = fieldInfo._1 + fieldIndexes = fieldInfo._2 + checkFields() } val fieldTypes: Array[TypeInformation[_]] = @@ -97,6 +95,8 @@ class FlinkTableFunctionImpl[T]( Array(aType) } + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + val builder = flinkTypeFactory.builder fieldNames .zip(fieldTypes) .foreach { f => @@ -104,4 +104,22 @@ class FlinkTableFunctionImpl[T]( } builder.build } + + private def checkFields(): Unit = { + + if (null == fieldNames || null == fieldIndexes) { + return + } + + if (fieldIndexes.length != fieldNames.length) { + throw new TableException( + "Number of field indexes and field names must be equal.") + } + + // check uniqueness of field names + if (fieldNames.length != fieldNames.toSet.size) { + throw new TableException( + "Table field names must be unique.") + } + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala index dc4eb12d7fd0e..67cac0ac9cded 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala @@ -288,7 +288,7 @@ class DataSetUserDefinedFunctionITCase( val func32 = new TableFunc3("TwoConf_") val result = in - .join(func30('c) as('d, 'e)) + .join(func30('c) as ('d, 'e)) .select('c, 'd, 'e) .join(func31('c) as ('f, 'g)) .select('c, 'd, 'e, 'f, 'g) From a9878da17e4c324cbe975b08ba06b4de39f78461 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Wed, 22 Mar 2017 16:29:31 +0800 Subject: [PATCH 5/6] [FLINK-6039] [core] Fix build exception --- .../java/org/apache/flink/types/RowTest.java | 9 ------- .../flink/table/api/TableEnvironment.scala | 3 ++- .../utils/UserDefinedFunctionUtils.scala | 7 +++-- .../flink/table/plan/logical/operators.scala | 3 ++- .../plan/schema/FlinkTableFunctionImpl.scala | 27 +++++++++---------- .../table/validate/FunctionCatalog.scala | 1 + .../DataSetUserDefinedFunctionITCase.scala | 10 ------- 7 files changed, 23 insertions(+), 37 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java index 5181516e8422f..4d40d631193df 100644 --- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java @@ -47,13 +47,4 @@ public void testRowOf() { row2.setField(4, true); assertEquals(row1, row2); } - - @Test - public void testOutOfBound() { - Row row = new Row(2); - assertNull(row.getField(0)); - row.setField(0, 0); - row.setField(1, 1); - assertNull(row.getField(2)); - } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 6ab005b9a476c..cc754d2dbc74b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -275,7 +275,8 @@ abstract class TableEnvironment(val config: TableConfig) { functionCatalog.registerFunction(name, function.getClass) // register in SQL API - val sqlFunctions = createTableSqlFunctions(name, function, typeFactory) + val implicitTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]] + val sqlFunctions = createTableSqlFunctions(name, function, implicitTypeInfo, typeFactory) functionCatalog.registerSqlFunctions(sqlFunctions) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index b344d18e14774..e6c6179b2cb00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -209,19 +209,22 @@ object UserDefinedFunctionUtils { * * @param name function name * @param tableFunction table function + * @param implicitResultType the implicit type information of returned table * @param typeFactory type factory * @return the TableSqlFunction */ def createTableSqlFunctions( name: String, tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], typeFactory: FlinkTypeFactory) : Seq[SqlFunction] = { val evalMethods = checkAndExtractEvalMethods(tableFunction) evalMethods.map { method => - // We don't know the field names without knowing the result type - val function = new FlinkTableFunctionImpl(tableFunction, null, null, method) + // We don't know the field names without knowing the exact result type + val function = new FlinkTableFunctionImpl( + tableFunction, implicitResultType, null, null, method) TableSqlFunction(name, tableFunction, typeFactory, function) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index f7d98a8332caa..cc209646303d2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -690,7 +690,8 @@ case class LogicalTableFunctionCall( } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { - val function = new FlinkTableFunctionImpl(tableFunction, fieldIndexes, fieldNames, evalMethod) + val function = new FlinkTableFunctionImpl( + tableFunction, resultType, fieldIndexes, fieldNames, evalMethod) val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] val sqlFunction = TableSqlFunction( tableFunction.functionIdentifier, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala index a73087edf578d..05c58d21770df 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala @@ -25,7 +25,6 @@ import org.apache.calcite.schema.TableFunction import org.apache.calcite.schema.impl.ReflectiveFunctionBase import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils @@ -37,6 +36,8 @@ import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} * The main difference is that we override the [[getRowType()]] and [[getElementType()]]. * * @param tableFunction The Table Function instance + * @param resultType The result type. If it is null, it will be inferred from the + * [[tableFunction]] * @param fieldIndexes The field indexes. If it is null, it will be inferred from * the [[tableFunction]] * @param fieldNames The field names. If it is null, it will be inferred from the @@ -44,7 +45,8 @@ import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} * @param evalMethod The eval() method of the [[tableFunction]] */ class FlinkTableFunctionImpl[T]( - val tableFunction: FlinkUDTF[T], + val tableFunction: FlinkUDTF[_], + var resultType: TypeInformation[_], var fieldIndexes: Array[Int], var fieldNames: Array[String], val evalMethod: Method) @@ -53,23 +55,15 @@ class FlinkTableFunctionImpl[T]( checkFields() - /** - * Cached resultType - */ - var resultType: TypeInformation[T] = _ - override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]] override def getRowType(typeFactory: RelDataTypeFactory, arguments: util.List[AnyRef]): RelDataType = { - // Get the result type from table function - resultType = if (tableFunction.getResultType(arguments) != null) { - tableFunction.getResultType(arguments) - } else { - TypeExtractor.createTypeInfo( - tableFunction, classOf[FlinkUDTF[_]], tableFunction.getClass, 0) - .asInstanceOf[TypeInformation[T]] + // Get the result type from table function. If it is not null, the resultType may + // already be generated by Table API's apply() method. + if (tableFunction.getResultType(arguments) != null) { + resultType = tableFunction.getResultType(arguments) } if (null == fieldNames || null == fieldIndexes) { val fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType) @@ -107,6 +101,11 @@ class FlinkTableFunctionImpl[T]( private def checkFields(): Unit = { + if (null == resultType) { + throw new TableException( + "The result type must be initialized.") + } + if (null == fieldNames || null == fieldIndexes) { return } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index c65ae9602cb07..09df800c36ee8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.validate import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.ValidationException import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala index 67cac0ac9cded..b71aa7458c041 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala @@ -133,16 +133,6 @@ class DataSetUserDefinedFunctionITCase( "John#19,John,null,null\n" + "John#19,19,null,null\n" + "Anna#44,Anna,null,null\n" + "Anna#44,44,null,null\n" TestBaseUtils.compareResultAsText(results.asJava, expected) - - val func5 = new TableFunc5 - - val result1 = in - .join(func5('c) as ('name, 'lenone, 'lentwo)) - .select('c, 'name, 'lenone, 'lentwo) - .toDataSet[Row] - - val results1 = result1.collect() - TestBaseUtils.compareResultAsText(results1.asJava, expected) } @Test From 93445ae210923d5269afa910ed43db1ff1ee7fc2 Mon Sep 17 00:00:00 2001 From: Zhuoluo Yang Date: Wed, 22 Mar 2017 17:53:28 +0800 Subject: [PATCH 6/6] [FLINK-6039] [core] fix comments and test failure --- .../org/apache/flink/table/functions/TableFunction.scala | 3 ++- .../flink/table/functions/utils/TableSqlFunction.scala | 4 ++-- .../org/apache/flink/table/validate/FunctionCatalog.scala | 7 +++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala index 5eeec2729d530..b98a360ce685b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala @@ -138,7 +138,8 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * - * @param arguments + * @param arguments arguments of a function call (only literal arguments + * are passed, nulls for non-literal ones) * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[T] = null diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala index d45c724645dde..a20112f25c05e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala @@ -71,8 +71,8 @@ class TableSqlFunction( * Get additional mapping information if the returned table type is a POJO * (POJO types have no deterministic field order). */ - def getPojoFieldMapping = if (null == functionImpl.resultType) { - throw new ValidationException("The Result Type hasn't been generated yet") + def getPojoFieldMapping = if (null == functionImpl.fieldIndexes) { + throw new ValidationException("The Result Indexes hasn't been generated yet") } else { functionImpl.fieldIndexes } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 09df800c36ee8..60bfe42f2a715 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.validate import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.ValidationException import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} @@ -100,7 +99,11 @@ class FunctionCatalog { case _ => null } - val typeInfo = function.getResultType(arguments) + val typeInfo = if (null != function.getResultType(arguments)) { + function.getResultType(arguments) + } else { + tableSqlFunction.getRowTypeInfo + } TableFunctionCall(name, function, children, typeInfo) // general expression call