From 3086dadffeb1ec193874ea0b4dd95a6d9423cf32 Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 6 Feb 2017 17:18:08 +0100 Subject: [PATCH 1/3] [FLINK-5662] [table] Rework internal type handling of Table API --- .../table/api/BatchTableEnvironment.scala | 32 ++- .../table/api/StreamTableEnvironment.scala | 37 ++- .../apache/flink/table/api/TableConfig.scala | 24 -- .../flink/table/api/TableEnvironment.scala | 118 ++++++++- .../table/calcite/FlinkTypeFactory.scala | 17 +- .../flink/table/codegen/CodeGenerator.scala | 33 +-- .../table/codegen/ExpressionReducer.scala | 11 +- .../flink/table/codegen/generated.scala | 25 +- .../flink/table/plan/logical/operators.scala | 18 +- ...kAggregate.scala => CommonAggregate.scala} | 2 +- .../{FlinkCalc.scala => CommonCalc.scala} | 110 ++++---- ...kCorrelate.scala => CommonCorrelate.scala} | 34 ++- .../flink/table/plan/nodes/CommonScan.scala | 98 +++++++ .../flink/table/plan/nodes/FlinkRel.scala | 37 --- .../table/plan/nodes/dataset/BatchScan.scala | 61 ++--- .../nodes/dataset/BatchTableSourceScan.scala | 7 +- .../plan/nodes/dataset/DataSetAggregate.scala | 82 ++---- .../plan/nodes/dataset/DataSetCalc.scala | 27 +- .../plan/nodes/dataset/DataSetCorrelate.scala | 12 +- .../plan/nodes/dataset/DataSetIntersect.scala | 57 +---- .../plan/nodes/dataset/DataSetJoin.scala | 32 +-- .../plan/nodes/dataset/DataSetMinus.scala | 57 +---- .../table/plan/nodes/dataset/DataSetRel.scala | 15 +- .../plan/nodes/dataset/DataSetScan.scala | 8 +- .../nodes/dataset/DataSetSingleRowJoin.scala | 48 ++-- .../plan/nodes/dataset/DataSetSort.scala | 49 +--- .../plan/nodes/dataset/DataSetUnion.scala | 24 +- .../plan/nodes/dataset/DataSetValues.scala | 18 +- .../dataset/DataSetWindowAggregate.scala | 68 ++--- .../datastream/DataStreamAggregate.scala | 239 ++++++++---------- .../nodes/datastream/DataStreamCalc.scala | 27 +- .../datastream/DataStreamCorrelate.scala | 14 +- .../plan/nodes/datastream/DataStreamRel.scala | 13 +- .../nodes/datastream/DataStreamScan.scala | 10 +- .../nodes/datastream/DataStreamUnion.scala | 6 +- .../nodes/datastream/DataStreamValues.scala | 23 +- .../plan/nodes/datastream/StreamScan.scala | 84 ++---- .../datastream/StreamTableSourceScan.scala | 12 +- .../runtime/aggregate/AggregateUtil.scala | 120 +++++---- .../flink/table/typeutils/TypeConverter.scala | 156 ------------ .../java/batch/TableEnvironmentITCase.java | 7 +- .../scala/batch/TableEnvironmentITCase.scala | 4 +- .../batch/utils/TableProgramsTestBase.scala | 11 +- .../utils/ExpressionTestBase.scala | 16 +- 44 files changed, 792 insertions(+), 1111 deletions(-) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/{FlinkAggregate.scala => CommonAggregate.scala} (98%) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/{FlinkCalc.scala => CommonCalc.scala} (62%) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/{FlinkCorrelate.scala => CommonCorrelate.scala} (91%) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index dd0487accfc0d..d9fdcba0e0493 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.Expression @@ -135,7 +136,7 @@ abstract class BatchTableEnvironment( private[flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row])) + val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new RowTypeInfo()) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan @@ -250,28 +251,37 @@ abstract class BatchTableEnvironment( * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) } /** - * Translates a logical [[RelNode]] into a [[DataSet]]. + * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary. * * @param logicalPlan The root node of the relational expression tree. * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. * @tparam A The type of the resulting [[DataSet]]. * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ - protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { + protected def translate[A]( + logicalPlan: RelNode, + logicalType: RelDataType) + (implicit tpe: TypeInformation[A]): DataSet[A] = { TableEnvironment.validateType(tpe) logicalPlan match { case node: DataSetRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataSet[A]] - case _ => ??? + val plan = node.translateToPlan(this) + val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion") + conversion match { + case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary + case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + } + + case _ => + throw TableException("Cannot generate DataSet due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 81e884d9a741b..f10c467e2292c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.explain.PlanJsonParser @@ -200,11 +201,11 @@ abstract class StreamTableEnvironment( dataStream: DataStream[T], fields: Array[Expression]): Unit = { - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray) + val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields) val dataStreamTable = new DataStreamTable[T]( dataStream, - fieldIndexes.toArray, - fieldNames.toArray + fieldIndexes, + fieldNames ) registerTableInternal(name, dataStreamTable) } @@ -255,8 +256,9 @@ abstract class StreamTableEnvironment( * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val dataStreamPlan = optimize(table.getRelNode) - translate(dataStreamPlan) + val relNode = table.getRelNode + val dataStreamPlan = optimize(relNode) + translate(dataStreamPlan, relNode.getRowType) } /** @@ -267,18 +269,25 @@ abstract class StreamTableEnvironment( * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ - protected def translate[A] - (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { + protected def translate[A]( + logicalPlan: RelNode, + logicalType: RelDataType) + (implicit tpe: TypeInformation[A]): DataStream[A] = { TableEnvironment.validateType(tpe) logicalPlan match { case node: DataStreamRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataStream[A]] - case _ => ??? + val plan = node.translateToPlan(this) + val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + conversion match { + case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary + case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + } + + case _ => + throw TableException("Cannot generate DataStream due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") } } @@ -291,7 +300,7 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) + val dataStream = translate[Row](optimizedPlan, ast.getRowType)(new RowTypeInfo()) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index a8876a85ba2dd..6448657c5111f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -36,12 +36,6 @@ class TableConfig { */ private var nullCheck: Boolean = true - /** - * Defines if efficient types (such as Tuple types or Atomic types) - * should be used within operators where possible. - */ - private var efficientTypeUsage = false - /** * Defines the configuration of Calcite for Table API and SQL queries. */ @@ -72,24 +66,6 @@ class TableConfig { this.nullCheck = nullCheck } - /** - * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def getEfficientTypeUsage = efficientTypeUsage - - /** - * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = { - this.efficientTypeUsage = efficientTypeUsage - } - /** * Returns the current configuration of Calcite for Table API and SQL queries. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bcff1fbf279df..4ffcc528b44a5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} @@ -39,19 +40,21 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} -import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} +import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} -import org.apache.flink.table.codegen.ExpressionReducer +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} -import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable} +import org.apache.flink.table.plan.schema.RelTable +import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -466,6 +469,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion[T]( + physicalRowTypeInfo: TypeInformation[Row], + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation[T], + functionName: String) + : Option[MapFunction[Row, T]] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // expected type is a row, no conversion needed + // TODO this logic will change with FLINK-5429 + if (expectedTypeInfo.getTypeClass == classOf[Row]) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate expected type + if (expectedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + expectedTypeInfo match { + + // POJO type expected + case pt: PojoTypeInfo[_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class type expected + case ct: CompositeType[_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val expectedTypeInfo = ct.getTypeAt(i) + if (fieldTypeInfo != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type expected + case at: AtomicType[_] => + val fieldTypeInfo = logicalFieldTypes.head + if (fieldTypeInfo != at) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $at; Actual: $fieldTypeInfo") + } + + case _ => + throw new TableException(s"Unsupported result type: $expectedTypeInfo") + } + + // code generate MapFunction + val generator = new CodeGenerator( + config, + false, + physicalRowTypeInfo, + None, + None) + + val conversion = generator.generateConverterResultExpression( + expectedTypeInfo, + logicalFieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + functionName, + classOf[MapFunction[Row, T]], + body, + expectedTypeInfo) + + val mapFunction = new MapRunner[Row, T]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + Some(mapFunction) + } + } /** @@ -623,7 +733,7 @@ object TableEnvironment { validateType(inputType) inputType match { - case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt(_)).toArray + case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt).toArray case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]]) case tpe => throw new TableException(s"Currently only CompositeType and AtomicType are supported.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index f3e2f918f54ca..251be1436349a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType} @@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple import org.apache.flink.table.plan.schema.ArrayRelDataType import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName +import org.apache.flink.types.Row import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] @@ -167,6 +169,19 @@ object FlinkTypeFactory { throw TableException(s"Type is not supported: $t") } + /** + * Converts a Calcite logical record into a Flink type information. + */ + def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = { + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray) + } + def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index d49d7a0e93283..c679bd8920f15 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.runtime.TableFunctionCollector -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.mutable @@ -62,8 +62,8 @@ import scala.collection.mutable class CodeGenerator( config: TableConfig, nullableInput: Boolean, - input1: TypeInformation[Any], - input2: Option[TypeInformation[Any]] = None, + input1: TypeInformation[_ <: Any], + input2: Option[TypeInformation[_ <: Any]] = None, input1PojoFieldMapping: Option[Array[Int]] = None, input2PojoFieldMapping: Option[Array[Int]] = None) extends RexVisitor[GeneratedExpression] { @@ -112,7 +112,7 @@ class CodeGenerator( * @param config configuration that determines runtime behavior */ def this(config: TableConfig) = - this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None) + this(config, false, new RowTypeInfo(), None, None) // set of member statements that will be added only once // we use a LinkedHashSet to keep the insertion order @@ -224,15 +224,16 @@ class CodeGenerator( * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or * output record can be accessed via the given term methods. * @param returnType expected return type - * @tparam T Flink Function to be generated. + * @tparam F Flink Function to be generated. + * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ - def generateFunction[T <: Function]( + def generateFunction[F <: Function, T <: Any]( name: String, - clazz: Class[T], + clazz: Class[F], bodyCode: String, - returnType: TypeInformation[Any]) - : GeneratedFunction[T] = { + returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { val funcName = newName(name) // Janino does not support generics, that's why we need @@ -298,14 +299,14 @@ class CodeGenerator( * valid Java class identifier. * @param records code for creating records * @param returnType expected return type - * @tparam T Flink Function to be generated. + * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ - def generateValuesInputFormat[T]( + def generateValuesInputFormat[T <: Row]( name: String, records: Seq[String], - returnType: TypeInformation[Any]) - : GeneratedFunction[GenericInputFormat[T]] = { + returnType: TypeInformation[T]) + : GeneratedInput[GenericInputFormat[T], T] = { val funcName = newName(name) addReusableOutRecord(returnType) @@ -343,7 +344,7 @@ class CodeGenerator( } """.stripMargin - GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode) + GeneratedInput(funcName, returnType, funcCode) } /** @@ -1094,7 +1095,7 @@ class CodeGenerator( // ---------------------------------------------------------------------------------------------- private def generateInputAccess( - inputType: TypeInformation[Any], + inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, pojoFieldMapping: Option[Array[Int]]) @@ -1122,7 +1123,7 @@ class CodeGenerator( } private def generateNullableInputFieldAccess( - inputType: TypeInformation[Any], + inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, pojoFieldMapping: Option[Array[Int]]) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index 94007deb6d566..0f1de214e3182 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -39,7 +38,7 @@ import scala.collection.JavaConverters._ class ExpressionReducer(config: TableConfig) extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] { - private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE + private val EMPTY_ROW_INFO = new RowTypeInfo() private val EMPTY_ROW = new Row(0) override def reduce( @@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig) resultType.getFieldNames, literals) - val generatedFunction = generator.generateFunction[MapFunction[Row, Row]]( + val generatedFunction = generator.generateFunction[MapFunction[Row, Row], Row]( "ExpressionReducer", classOf[MapFunction[Row, Row]], s""" |${result.code} |return ${result.resultTerm}; |""".stripMargin, - resultType.asInstanceOf[TypeInformation[Any]]) + resultType) val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code) val function = clazz.newInstance() diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala index b4c293dedda10..271f686bc51c4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala @@ -18,6 +18,9 @@ package org.apache.flink.table.codegen +import org.apache.flink.api.common.functions +import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation /** @@ -41,14 +44,32 @@ object GeneratedExpression { } /** - * Describes a generated [[org.apache.flink.api.common.functions.Function]] + * Describes a generated [[functions.Function]] * * @param name class name of the generated Function. * @param returnType the type information of the result type * @param code code of the generated Function. + * @tparam F type of function * @tparam T type of function */ -case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String) +case class GeneratedFunction[F <: Function, T <: Any]( + name: String, + returnType: TypeInformation[T], + code: String) + +/** + * Describes a generated [[InputFormat]]. + * + * @param name class name of the generated input function. + * @param returnType the type information of the result type + * @param code code of the generated Function. + * @tparam F type of function + * @tparam T type of function + */ +case class GeneratedInput[F <: InputFormat[_, _], T <: Any]( + name: String, + returnType: TypeInformation[T], + code: String) /** * Describes a generated [[org.apache.flink.util.Collector]]. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 3ba0285f96129..20f810a4b184c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -22,14 +22,13 @@ import java.util import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.CorrelationId -import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan} +import org.apache.calcite.rel.core.{CorrelationId, JoinRelType} +import org.apache.calcite.rel.logical.LogicalTableFunctionScan import org.apache.calcite.rex.{RexInputRef, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.table._ import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, UnresolvedException} import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.expressions._ @@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess} import scala.collection.JavaConverters._ @@ -426,11 +424,18 @@ case class Join( } relBuilder.join( - TypeConverter.flinkJoinTypeToRelType(joinType), + convertJoinType(joinType), condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)), corSet.asJava) } + private def convertJoinType(joinType: JoinType) = joinType match { + case JoinType.INNER => JoinRelType.INNER + case JoinType.LEFT_OUTER => JoinRelType.LEFT + case JoinType.RIGHT_OUTER => JoinRelType.RIGHT + case JoinType.FULL_OUTER => JoinRelType.FULL + } + private def ambiguousName: Set[String] = left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet) @@ -481,13 +486,12 @@ case class Join( if (checkIfFilterCondition(x)) { localPredicateFound = true } - case x: BinaryComparison => { + case x: BinaryComparison => if (checkIfFilterCondition(x)) { localPredicateFound = true } else { nonEquiJoinPredicateFound = true } - } case x => failValidation( s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x") } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala similarity index 98% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala index 729059495ff04..3883b1467a468 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import scala.collection.JavaConverters._ -trait FlinkAggregate { +trait CommonAggregate { private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala similarity index 62% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 5ebd3ee9ee657..3f462589d155f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -23,28 +23,25 @@ import org.apache.calcite.rex.{RexNode, RexProgram} import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.table.runtime.FlatMapRunner -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait FlinkCalc { +trait CommonCalc { private[flink] def functionBody( - generator: CodeGenerator, - inputType: TypeInformation[Any], - rowType: RelDataType, - calcProgram: RexProgram, - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): String = { - - val returnType = determineReturnType( - rowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + generator: CodeGenerator, + inputType: TypeInformation[Row], + rowType: RelDataType, + calcProgram: RexProgram, + config: TableConfig) + : String = { + + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) val condition = calcProgram.getCondition val expandedExpressions = calcProgram.getProjectList.map( @@ -54,59 +51,43 @@ trait FlinkCalc { rowType.getFieldNames, expandedExpressions) - // only projection - if (condition == null) { + // only projection + if (condition == null) { + s""" + |${projection.code} + |${generator.collectorTerm}.collect(${projection.resultTerm}); + |""".stripMargin + } + else { + val filterCondition = generator.generateExpression( + calcProgram.expandLocalRef(calcProgram.getCondition)) + // only filter + if (projection == null) { s""" - |${projection.code} - |${generator.collectorTerm}.collect(${projection.resultTerm}); + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${generator.collectorTerm}.collect(${generator.input1Term}); + |} |""".stripMargin } + // both filter and projection else { - val filterCondition = generator.generateExpression( - calcProgram.expandLocalRef(calcProgram.getCondition)) - // only filter - if (projection == null) { - // conversion - if (inputType != returnType) { - val conversion = generator.generateConverterResultExpression( - returnType, - rowType.getFieldNames) - - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin - } - // no conversion - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${generator.collectorTerm}.collect(${generator.input1Term}); - |} - |""".stripMargin - } - } - // both filter and projection - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${projection.code} - | ${generator.collectorTerm}.collect(${projection.resultTerm}); - |} - |""".stripMargin - } + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${projection.code} + | ${generator.collectorTerm}.collect(${projection.resultTerm}); + |} + |""".stripMargin } } + } private[flink] def calcMapFunction( - genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = { + genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row]) + : RichFlatMapFunction[Row, Row] = { - new FlatMapRunner[Any, Any]( + new FlatMapRunner[Row, Row]( genFunction.name, genFunction.code, genFunction.returnType) @@ -138,13 +119,12 @@ trait FlinkCalc { proj .map(expression(_, inFields, Some(localExprs))) - .zip(outFields).map { case (e, o) => { - if (e != o) { - e + " AS " + o - } else { - e - } - } + .zip(outFields).map { case (e, o) => + if (e != o) { + e + " AS " + o + } else { + e + } }.mkString(", ") } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala similarity index 91% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index c986602cb6430..61b7ffb96d943 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -22,20 +22,21 @@ import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector} -import org.apache.flink.table.typeutils.TypeConverter._ -import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.types.Row import scala.collection.JavaConverters._ /** * Join a user-defined table function */ -trait FlinkCorrelate { +trait CommonCorrelate { /** * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table @@ -43,22 +44,17 @@ trait FlinkCorrelate { */ private[flink] def correlateMapFunction( config: TableConfig, - inputTypeInfo: TypeInformation[Any], + inputTypeInfo: TypeInformation[Row], udtfTypeInfo: TypeInformation[Any], rowType: RelDataType, joinType: SemiJoinType, rexCall: RexCall, condition: Option[RexNode], - expectedType: Option[TypeInformation[Any]], pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping ruleDescription: String) - : CorrelateFlatMapRunner[Any, Any] = { + : CorrelateFlatMapRunner[Row, Row] = { - val returnType = determineReturnType( - rowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) val flatMap = generateFunction( config, @@ -80,7 +76,7 @@ trait FlinkCorrelate { condition, pojoFieldMapping) - new CorrelateFlatMapRunner[Any, Any]( + new CorrelateFlatMapRunner[Row, Row]( flatMap.name, flatMap.code, collector.name, @@ -94,15 +90,15 @@ trait FlinkCorrelate { */ private def generateFunction( config: TableConfig, - inputTypeInfo: TypeInformation[Any], + inputTypeInfo: TypeInformation[Row], udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Any], + returnType: TypeInformation[Row], rowType: RelDataType, joinType: SemiJoinType, rexCall: RexCall, pojoFieldMapping: Option[Array[Int]], ruleDescription: String) - : GeneratedFunction[FlatMapFunction[Any, Any]] = { + : GeneratedFunction[FlatMapFunction[Row, Row], Row] = { val functionGenerator = new CodeGenerator( config, @@ -153,7 +149,7 @@ trait FlinkCorrelate { functionGenerator.generateFunction( ruleDescription, - classOf[FlatMapFunction[Any, Any]], + classOf[FlatMapFunction[Row, Row]], body, returnType) } @@ -163,9 +159,9 @@ trait FlinkCorrelate { */ private[flink] def generateCollector( config: TableConfig, - inputTypeInfo: TypeInformation[Any], + inputTypeInfo: TypeInformation[Row], udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Any], + returnType: TypeInformation[Row], rowType: RelDataType, condition: Option[RexNode], pojoFieldMapping: Option[Array[Int]]) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala new file mode 100644 index 0000000000000..56d22a16f0017 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ or object have to be unboxed + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + */ + private[flink] def needsConversion( + externalTypeInfo: TypeInformation[Any], + internalTypeInfo: TypeInformation[Row]) + : Boolean = { + + if (externalTypeInfo == internalTypeInfo) { + val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo] + var containsBoxedTypes = false + // TODO enable these lines for FLINK-5429 + // for (i <- rowTypeInfo.getArity) { + // val field = rowTypeInfo.getTypeAt(i) + // if (field == SqlTimeTypeInfo.DATE || + // field == SqlTimeTypeInfo.TIME || + // field == SqlTimeTypeInfo.TIMESTAMP) { + // containsBoxedTypes = true + // } + // } + containsBoxedTypes + } else { + true + } + } + + private[flink] def getConversionMapper( + config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Row], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Row] = { + + val generator = new CodeGenerator( + config, + false, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction[Row, Row]], + body, + expectedType) + + new MapRunner[Any, Row]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index a7765d16b4bd5..7ad9bd54a9008 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -66,43 +66,6 @@ trait FlinkRel { } } - private[flink] def getConversionMapper( - config: TableConfig, - nullableInput: Boolean, - inputType: TypeInformation[Any], - expectedType: TypeInformation[Any], - conversionOperatorName: String, - fieldNames: Seq[String], - inputPojoFieldMapping: Option[Array[Int]] = None) - : MapFunction[Any, Any] = { - - val generator = new CodeGenerator( - config, - nullableInput, - inputType, - None, - inputPojoFieldMapping) - val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - conversionOperatorName, - classOf[MapFunction[Any, Any]], - body, - expectedType) - - new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - } - private[flink] def estimateRowSize(rowType: RelDataType): Double = { val fieldList = rowType.getFieldList diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index 252bb2e2d4dc8..09262a6a9a2e9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.PojoTypeInfo import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.FlinkTable -import org.apache.flink.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -36,6 +36,7 @@ abstract class BatchScan( traitSet: RelTraitSet, table: RelOptTable) extends TableScan(cluster, traitSet, table) + with CommonScan with DataSetRel { override def toString: String = { @@ -48,50 +49,34 @@ abstract class BatchScan( planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) } - protected def convertToExpectedType( + protected def convertToInternalRow( input: DataSet[Any], flinkTable: FlinkTable[_], - expectedType: Option[TypeInformation[Any]], - config: TableConfig): DataSet[Any] = { + config: TableConfig) + : DataSet[Row] = { val inputType = input.getType - expectedType match { + val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - // special case: - // if efficient type usage is enabled and no expected type is set - // we can simply forward the DataSet to the next operator. - // however, we cannot forward PojoTypes as their fields don't have an order - case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] => - input + // conversion + if (needsConversion(inputType, internalType)) { - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val mapFunc = getConversionMapper( + config, + inputType, + internalType, + "DataSetSourceConversion", + getRowType.getFieldNames, + Some(flinkTable.fieldIndexes)) - // conversion - if (determinedType != inputType) { + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - val mapFunc = getConversionMapper( - config, - nullableInput = false, - inputType, - determinedType, - "DataSetSourceConversion", - getRowType.getFieldNames, - Some(flinkTable.fieldIndexes)) - - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - input.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - input - } + input.map(mapFunc).name(opName) + } + // no conversion necessary, forward + else { + input.asInstanceOf[DataSet[Row]] } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 73dddc6bacce3..9b8e1ea039a96 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( @@ -62,13 +63,11 @@ class BatchTableSourceScan( .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config) + convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index 67715367f2a2d..206e562a95b65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.runtime.aggregate.AggregateUtil import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair -import org.apache.flink.table.typeutils.TypeConverter -import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.types.Row -import scala.collection.JavaConverters._ - /** * Flink RelNode which matches along with a LogicalAggregate. */ @@ -49,7 +45,7 @@ class DataSetAggregate( grouping: Array[Int], inGroupingSet: Boolean) extends SingleRel(cluster, traitSet, inputNode) - with FlinkAggregate + with CommonAggregate with DataSetRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -89,9 +85,7 @@ class DataSetAggregate( planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig @@ -109,15 +103,7 @@ class DataSetAggregate( grouping, inGroupingSet) - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( - tableEnv, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - // get the output types - val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .toArray + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) val prepareOpName = s"prepare select: ($aggString)" @@ -125,46 +111,26 @@ class DataSetAggregate( .map(mapFunction) .name(prepareOpName) - val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) - - val result = { - if (groupingKeys.length > 0) { - // grouped aggregation - val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + - s"select: ($aggString)" - - mappedInput.asInstanceOf[DataSet[Row]] - .groupBy(groupingKeys: _*) - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataSet[Any]] - } - else { - // global aggregation - val aggOpName = s"select:($aggString)" - mappedInput.asInstanceOf[DataSet[Row]] - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataSet[Any]] - } - } + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + if (groupingKeys.length > 0) { + // grouped aggregation + val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + + s"select: ($aggString)" - // if the expected type is not a Row, inject a mapper to convert to the expected type - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(getConversionMapper( - config = config, - nullableInput = false, - inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType = expectedType.get, - conversionOperatorName = "DataSetAggregateConversion", - fieldNames = getRowType.getFieldNames.asScala - )) - .name(mapName) - case _ => result + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggOpName) + } + else { + // global aggregation + val aggOpName = s"select:($aggString)" + mappedInput.asInstanceOf[DataSet[Row]] + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggOpName) } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 03178ad1af704..245a0380ac35d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -22,15 +22,15 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.plan.nodes.FlinkCalc -import org.apache.flink.table.typeutils.TypeConverter -import TypeConverter._ -import org.apache.calcite.rex._ import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.plan.nodes.CommonCalc +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -46,7 +46,7 @@ class DataSetCalc( private[flink] val calcProgram: RexProgram, // for tests ruleDescription: String) extends SingleRel(cluster, traitSet, input) - with FlinkCalc + with CommonCalc with DataSetRel { override def deriveRowType() = rowRelDataType @@ -99,19 +99,13 @@ class DataSetCalc( } } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val generator = new CodeGenerator(config, false, inputDS.getType) @@ -120,12 +114,11 @@ class DataSetCalc( inputDS.getType, getRowType, calcProgram, - config, - expectedType) + config) val genFunction = generator.generateFunction( ruleDescription, - classOf[FlatMapFunction[Any, Any]], + classOf[FlatMapFunction[Row, Row]], body, returnType) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 5a75e5ded59b0..c18a829587d5f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.functions.utils.TableSqlFunction -import org.apache.flink.table.plan.nodes.FlinkCorrelate -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.table.plan.nodes.CommonCorrelate +import org.apache.flink.types.Row /** * Flink RelNode which matches along with join a user defined table function. @@ -45,7 +45,7 @@ class DataSetCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, inputNode) - with FlinkCorrelate + with CommonCorrelate with DataSetRel { override def deriveRowType() = relRowType @@ -85,10 +85,7 @@ class DataSetCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]) - : DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig @@ -109,7 +106,6 @@ class DataSetCorrelate( joinType, rexCall, condition, - expectedType, Some(pojoFieldMapping), ruleDescription) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala index 332aa8ae36cfb..4497df33d8b4f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala @@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.runtime.IntersectCoGroupFunction -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -75,55 +74,21 @@ class DataSetIntersect( } } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val coGroupedDs = leftDataSet.coGroup(rightDataSet) val coGroupOpName = s"intersect: ($intersectSelectionToString)" - val coGroupFunction = new IntersectCoGroupFunction[Any](all) - - val intersectDs = coGroupedDs.where("*").equalTo("*") - .`with`(coGroupFunction).name(coGroupOpName) - - val config = tableEnv.getConfig - val leftType = leftDataSet.getType - - // here we only care about left type information, because we emit records from left DataSet - expectedType match { - case None if config.getEfficientTypeUsage => - intersectDs - - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - // conversion - if (determinedType != leftType) { - val mapFunc = getConversionMapper( - config, - false, - leftType, - determinedType, - "DataSetIntersectConversion", - getRowType.getFieldNames) - - val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - intersectDs.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - intersectDs - } - } + val coGroupFunction = new IntersectCoGroupFunction[Row](all) + + coGroupedDs + .where("*") + .equalTo("*") + .`with`(coGroupFunction) + .name(coGroupOpName) } private def intersectSelectionToString: String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala index edb5be269f8a4..e6f8ca4bb82e2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala @@ -23,17 +23,16 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{JoinInfo, JoinRelType} import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode import org.apache.calcite.util.mapping.IntPair +import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.{BatchTableEnvironment, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.FlatJoinRunner -import org.apache.flink.table.typeutils.TypeConverter.determineReturnType -import org.apache.flink.api.common.functions.FlatJoinFunction -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} -import org.apache.calcite.sql.SqlKind -import org.apache.flink.table.api.{BatchTableEnvironment, TableException} +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -102,17 +101,11 @@ class DataSetJoin( planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) // get the equality keys val leftKeys = ArrayBuffer.empty[Int] @@ -195,19 +188,22 @@ class DataSetJoin( } val genFunction = generator.generateFunction( ruleDescription, - classOf[FlatJoinFunction[Any, Any, Any]], + classOf[FlatJoinFunction[Row, Row, Row]], body, returnType) - val joinFun = new FlatJoinRunner[Any, Any, Any]( + val joinFun = new FlatJoinRunner[Row, Row, Row]( genFunction.name, genFunction.code, genFunction.returnType) val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)" - joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*) - .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]] + joinOperator + .where(leftKeys.toArray: _*) + .equalTo(rightKeys.toArray: _*) + .`with`(joinFun) + .name(joinOpName) } private def joinSelectionToString: String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala index 672ff9ccc3ad6..9ba65bfdddb8c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala @@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.runtime.MinusCoGroupFunction -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -86,55 +85,21 @@ class DataSetMinus( rowCnt } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val coGroupedDs = leftDataSet.coGroup(rightDataSet) val coGroupOpName = s"minus: ($minusSelectionToString)" - val coGroupFunction = new MinusCoGroupFunction[Any](all) - - val minusDs = coGroupedDs.where("*").equalTo("*") - .`with`(coGroupFunction).name(coGroupOpName) - - val config = tableEnv.getConfig - val leftType = leftDataSet.getType - - // here we only care about left type information, because we emit records from left DataSet - expectedType match { - case None if config.getEfficientTypeUsage => - minusDs - - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - // conversion - if (determinedType != leftType) { - val mapFunc = getConversionMapper( - config = config, - nullableInput = false, - inputType = leftType, - expectedType = determinedType, - conversionOperatorName = "DataSetMinusConversion", - fieldNames = getRowType.getFieldNames) - - val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - minusDs.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - minusDs - } - } + val coGroupFunction = new MinusCoGroupFunction[Row](all) + + coGroupedDs + .where("*") + .equalTo("*") + .`with`(coGroupFunction) + .name(coGroupOpName) } private def minusSelectionToString: String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala index 02138cf57bca1..16a2955ee735b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala @@ -19,26 +19,19 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode +import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.plan.nodes.FlinkRel -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet +import org.apache.flink.types.Row trait DataSetRel extends RelNode with FlinkRel { /** * Translates the [[DataSetRel]] node into a [[DataSet]] operator. * - * @param tableEnv [[BatchTableEnvironment]] of the translated Table. - * @param expectedType specifies the type the Flink operator should return. The type must - * have the same arity as the result. For instance, if the - * expected type is a RowTypeInfo this method will return a DataSet of - * type Row. If the expected type is Tuple2, the operator will return - * a Tuple2 if possible. Row otherwise. + * @param tableEnv The [[BatchTableEnvironment]] of the translated Table. * @return DataSet of type expectedType or RowTypeInfo */ - def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any] + def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala index 48bbb74ff9fc0..44d2d0016354d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala @@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.plan.schema.DataSetTable +import org.apache.flink.types.Row /** * Flink RelNode which matches along with DataSource. @@ -51,14 +51,12 @@ class DataSetScan( ) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig val inputDataSet: DataSet[Any] = dataSetTable.dataSet - convertToExpectedType(inputDataSet, dataSetTable, expectedType, config) + convertToInternalRow(inputDataSet, dataSetTable, config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala index a70b4ab7f4857..b7d1a4bfb60c6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala @@ -26,10 +26,11 @@ import org.apache.calcite.rex.RexNode import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig} +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} -import org.apache.flink.table.typeutils.TypeConverter.determineReturnType -import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig} +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -87,9 +88,7 @@ class DataSetSingleRowJoin( planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) @@ -100,8 +99,7 @@ class DataSetSingleRowJoin( rightDataSet.getType, leftIsSingle, joinCondition, - broadcastSetName, - expectedType) + broadcastSetName) val (multiRowDataSet, singleRowDataSet) = if (leftIsSingle) { @@ -114,17 +112,16 @@ class DataSetSingleRowJoin( .flatMap(mapSideJoin) .withBroadcastSet(singleRowDataSet, broadcastSetName) .name(getMapOperatorName) - .asInstanceOf[DataSet[Any]] } private def generateMapFunction( config: TableConfig, - inputType1: TypeInformation[Any], - inputType2: TypeInformation[Any], + inputType1: TypeInformation[Row], + inputType2: TypeInformation[Row], firstIsSingle: Boolean, joinCondition: RexNode, - broadcastInputSetName: String, - expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = { + broadcastInputSetName: String) + : FlatMapFunction[Row, Row] = { val codeGenerator = new CodeGenerator( config, @@ -132,11 +129,7 @@ class DataSetSingleRowJoin( inputType1, Some(inputType2)) - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val conversion = codeGenerator.generateConverterResultExpression( returnType, @@ -144,28 +137,29 @@ class DataSetSingleRowJoin( val condition = codeGenerator.generateExpression(joinCondition) - val joinMethodBody = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin + val joinMethodBody = + s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin val genFunction = codeGenerator.generateFunction( ruleDescription, - classOf[FlatJoinFunction[Any, Any, Any]], + classOf[FlatJoinFunction[Row, Row, Row]], joinMethodBody, returnType) if (firstIsSingle) { - new MapJoinRightRunner[Any, Any, Any]( + new MapJoinRightRunner[Row, Row, Row]( genFunction.name, genFunction.code, genFunction.returnType, broadcastInputSetName) } else { - new MapJoinLeftRunner[Any, Any, Any]( + new MapJoinLeftRunner[Row, Row, Row]( genFunction.name, genFunction.code, genFunction.returnType, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala index 4d84730301b76..192237ac14ee7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala @@ -27,11 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.{BatchTableEnvironment, TableException} import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction} -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -87,10 +86,7 @@ class DataSetSort( } } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]] = None) - : DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { if (fieldCollations.isEmpty) { throw TableException("Limiting the result without sorting is not allowed " + @@ -113,10 +109,10 @@ class DataSetSort( partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) } - val limitedDs = if (offset == null && fetch == null) { + if (offset == null && fetch == null) { partitionedDs } else { - val countFunction = new CountPartitionFunction[Any] + val countFunction = new CountPartitionFunction[Row] val partitionCountName = s"prepare offset/fetch" @@ -126,7 +122,7 @@ class DataSetSort( val broadcastName = "countPartition" - val limitFunction = new LimitFilterFunction[Any]( + val limitFunction = new LimitFilterFunction[Row]( limitStart, limitEnd, broadcastName) @@ -138,41 +134,6 @@ class DataSetSort( .name(limitName) .withBroadcastSet(partitionCount, broadcastName) } - - val inputType = partitionedDs.getType - expectedType match { - - case None if config.getEfficientTypeUsage => - limitedDs - - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - // conversion - if (determinedType != inputType) { - - val mapFunc = getConversionMapper( - config = config, - nullableInput = false, - inputType = partitionedDs.getType, - expectedType = determinedType, - conversionOperatorName = "DataSetSortConversion", - fieldNames = getRowType.getFieldNames.asScala - ) - - val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - limitedDs.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - limitedDs - } - } } private def directionToOrder(direction: Direction) = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala index b0c95b5e665a6..a87c6e3a95e72 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala @@ -22,9 +22,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -77,24 +77,12 @@ class DataSetUnion( getInputs.foldLeft(0.0)(_ + mq.getRowCount(_)) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - - var leftDataSet: DataSet[Any] = null - var rightDataSet: DataSet[Any] = null - - expectedType match { - case None => - leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - rightDataSet = - right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType)) - case _ => - leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) - rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) - } + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]] + leftDataSet.union(rightDataSet) } private def unionSelectionToString: String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala index e0282f2a92ff1..3ebee2c30e51d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala @@ -24,12 +24,12 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Values import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexLiteral -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.io.ValuesInputFormat -import org.apache.flink.table.typeutils.TypeConverter._ +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -66,17 +66,11 @@ class DataSetValues( super.explainTerms(pw).item("values", valuesFieldsToString) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val generator = new CodeGenerator(config) @@ -94,12 +88,12 @@ class DataSetValues( generatedRecords.map(_.code), returnType) - val inputFormat = new ValuesInputFormat[Any]( + val inputFormat = new ValuesInputFormat[Row]( generatedFunction.name, generatedFunction.code, generatedFunction.returnType) - tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]] + tableEnv.execEnv.createInput(inputFormat, returnType) } private def valuesFieldsToString: String = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index b165afa77aa8a..48de822f739a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -23,21 +23,17 @@ import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.logical._ -import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.types.Row -import scala.collection.JavaConversions._ - /** * Flink RelNode which matches along with a LogicalWindowAggregate. */ @@ -52,7 +48,7 @@ class DataSetWindowAggregate( inputType: RelDataType, grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) - with FlinkAggregate + with CommonAggregate with DataSetRel { override def deriveRowType() = rowRelDataType @@ -109,20 +105,15 @@ class DataSetWindowAggregate( planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( - tableEnv, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) // whether identifiers are matched case-sensitively val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive() - val result = window match { + window match { case EventTimeTumblingGroupWindow(_, _, size) => createEventTimeTumblingWindowDataSet( inputDS, @@ -139,31 +130,14 @@ class DataSetWindowAggregate( "windows in a batch environment must declare a time attribute over which " + "the query is evaluated.") } - - // if the expected type is not a Row, inject a mapper to convert to the expected type - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${getRowType.getFieldNames.toList.mkString(", ")})" - result.map( - getConversionMapper( - config = config, - nullableInput = false, - inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType = expectedType.get, - conversionOperatorName = "DataSetWindowAggregateConversion", - fieldNames = getRowType.getFieldNames - )) - .name(mapName) - case _ => result - } } private def createEventTimeTumblingWindowDataSet( - inputDS: DataSet[Any], + inputDS: DataSet[Row], isTimeWindow: Boolean, isParserCaseSensitive: Boolean) - : DataSet[Any] = { + : DataSet[Row] = { val mapFunction = createDataSetWindowPrepareMapFunction( window, namedAggregates, @@ -182,6 +156,8 @@ class DataSetWindowAggregate( .map(mapFunction) .name(prepareOperatorName) + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType if (isTimeWindow) { // grouped time window aggregation @@ -190,9 +166,8 @@ class DataSetWindowAggregate( mappedInput.asInstanceOf[DataSet[Row]] .groupBy(groupingKeys: _*) .reduceGroup(groupReduceFunction) - .returns(resultRowTypeInfo) + .returns(rowTypeInfo) .name(aggregateOperatorName) - .asInstanceOf[DataSet[Any]] } else { // count window val groupingKeys = grouping.indices.toArray @@ -203,10 +178,8 @@ class DataSetWindowAggregate( // sort on time field, it's the last element in the row .sortGroup(mapReturnType.getArity - 1, Order.ASCENDING) .reduceGroup(groupReduceFunction) - .returns(resultRowTypeInfo) + .returns(rowTypeInfo) .name(aggregateOperatorName) - .asInstanceOf[DataSet[Any]] - } else { // TODO: count tumbling all window on event-time should sort all the data set // on event time before applying the windowing logic. @@ -217,11 +190,12 @@ class DataSetWindowAggregate( } private[this] def createEventTimeSessionWindowDataSet( - inputDS: DataSet[Any], - isParserCaseSensitive: Boolean): DataSet[Any] = { + inputDS: DataSet[Row], + isParserCaseSensitive: Boolean) + : DataSet[Row] = { val groupingKeys = grouping.indices.toArray - val rowTypeInfo = resultRowTypeInfo + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) // grouping window if (groupingKeys.length > 0) { @@ -280,7 +254,6 @@ class DataSetWindowAggregate( .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) - .asInstanceOf[DataSet[Any]] } // do non-incremental aggregation else { @@ -298,7 +271,6 @@ class DataSetWindowAggregate( .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) - .asInstanceOf[DataSet[Any]] } } // non-grouping window @@ -332,12 +304,4 @@ class DataSetWindowAggregate( s"window: ($window), select: ($aggString)" } } - - private def resultRowTypeInfo: RowTypeInfo = { - // get the output types - val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .toArray - new RowTypeInfo(fieldTypes: _*) - } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala index 6a3d4e32c5bbe..c21d008bbbfbb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -22,27 +22,23 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.types.Row -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ -import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter} -import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} -import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.table.api.StreamTableEnvironment - -import scala.collection.JavaConverters._ +import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.types.Row class DataStreamAggregate( window: LogicalWindow, @@ -55,7 +51,7 @@ class DataStreamAggregate( inputType: RelDataType, grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) - with FlinkAggregate + with CommonAggregate with DataStreamRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -103,24 +99,12 @@ class DataStreamAggregate( namedProperties)) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { - val config = tableEnv.getConfig val groupingKeys = grouping.indices.toArray - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan( - tableEnv, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - // get the output types - val fieldTypes: Array[TypeInformation[_]] = - getRowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .toArray + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val aggString = aggregationToString( inputType, @@ -142,121 +126,100 @@ class DataStreamAggregate( val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - val result: DataStream[Any] = { - // check whether all aggregates support partial aggregate - if (AggregateUtil.doAllSupportPartialAggregation( - namedAggregates.map(_.getKey), - inputType, - grouping.length)) { - // do Incremental Aggregation - val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( + + // check whether all aggregates support partial aggregate + if (AggregateUtil.doAllSupportPartialAggregation( + namedAggregates.map(_.getKey), + inputType, + grouping.length)) { + // do Incremental Aggregation + val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( + namedAggregates, + inputType, + getRowType, + grouping) + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( + window, namedAggregates, inputType, - getRowType, - grouping) - // grouped / keyed aggregation - if (groupingKeys.length > 0) { - val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - - windowedStream - .apply(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - // global / non-keyed aggregation - else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - - windowedStream - .apply(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Any]] - } + rowRelDataType, + grouping, + namedProperties) + + val keyedStream = mappedInput.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) + .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + + windowedStream + .reduce(reduceFunction, windowFunction) + .returns(rowTypeInfo) + .name(keyedAggOpName) } + // global / non-keyed aggregation else { - // do non-Incremental Aggregation - // grouped / keyed aggregation - if (groupingKeys.length > 0) { - - val windowFunction = AggregateUtil.createWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - .asInstanceOf[DataStream[Any]] - } - // global / non-keyed aggregation - else { - val windowFunction = AggregateUtil.createAllWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Any]] - } + val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val windowedStream = + createNonKeyedWindowedStream(window, mappedInput) + .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + + windowedStream + .reduce(reduceFunction, windowFunction) + .returns(rowTypeInfo) + .name(nonKeyedAggOpName) } } + else { + // do non-Incremental Aggregation + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + + val windowFunction = AggregateUtil.createWindowAggregationFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val keyedStream = mappedInput.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) + .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - // if the expected type is not a Row, inject a mapper to convert to the expected type - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(getConversionMapper( - config = config, - nullableInput = false, - inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType = expectedType.get, - conversionOperatorName = "DataStreamAggregateConversion", - fieldNames = getRowType.getFieldNames.asScala - )) - .name(mapName) - case _ => result + windowedStream + .apply(windowFunction) + .returns(rowTypeInfo) + .name(keyedAggOpName) + } + // global / non-keyed aggregation + else { + val windowFunction = AggregateUtil.createAllWindowAggregationFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val windowedStream = + createNonKeyedWindowedStream(window, mappedInput) + .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + + windowedStream + .apply(windowFunction) + .returns(rowTypeInfo) + .name(nonKeyedAggOpName) + } } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 43f1fb604ab14..b39ae4a084a9f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -22,13 +22,13 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.RexProgram -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.plan.nodes.FlinkCalc -import org.apache.flink.table.typeutils.TypeConverter._ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.plan.nodes.CommonCalc +import org.apache.flink.types.Row /** * Flink RelNode which matches along with FlatMapOperator. @@ -42,7 +42,7 @@ class DataStreamCalc( private[flink] val calcProgram: RexProgram, ruleDescription: String) extends SingleRel(cluster, traitSet, input) - with FlinkCalc + with CommonCalc with DataStreamRel { override def deriveRowType() = rowRelDataType @@ -68,20 +68,12 @@ class DataStreamCalc( calcProgram.getCondition != null) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - val generator = new CodeGenerator(config, false, inputDataStream.getType) val body = functionBody( @@ -89,14 +81,13 @@ class DataStreamCalc( inputDataStream.getType, getRowType, calcProgram, - config, - expectedType) + config) val genFunction = generator.generateFunction( ruleDescription, - classOf[FlatMapFunction[Any, Any]], + classOf[FlatMapFunction[Row, Row]], body, - returnType) + FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) val mapFunc = calcMapFunction(genFunction) inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index bd65954210236..dd799e6946f72 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -24,11 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.functions.utils.TableSqlFunction -import org.apache.flink.table.plan.nodes.FlinkCorrelate -import org.apache.flink.table.typeutils.TypeConverter._ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.nodes.CommonCorrelate +import org.apache.flink.types.Row /** * Flink RelNode which matches along with join a user defined table function. @@ -44,7 +44,7 @@ class DataStreamCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, inputNode) - with FlinkCorrelate + with CommonCorrelate with DataStreamRel { override def deriveRowType() = relRowType @@ -79,10 +79,7 @@ class DataStreamCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]) - : DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig @@ -103,7 +100,6 @@ class DataStreamCorrelate( joinType, rexCall, condition, - expectedType, Some(pojoFieldMapping), ruleDescription) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 16427b8d47877..cd985e89c05d2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -19,10 +19,10 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.plan.nodes.FlinkRel import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.FlinkRel +import org.apache.flink.types.Row trait DataStreamRel extends RelNode with FlinkRel { @@ -30,16 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @param expectedType specifies the type the Flink operator should return. The type must - * have the same arity as the result. For instance, if the - * expected type is a RowTypeInfo this method will return a DataSet of - * type Row. If the expected type is Tuple2, the operator will return - * a Tuple2 if possible. Row otherwise. * @return DataStream of type expectedType or RowTypeInfo */ - def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any] + def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index 2d5ec0926090b..e8d218e4b19af 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.plan.schema.DataStreamTable import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.types.Row /** * Flink RelNode which matches along with DataStreamSource. @@ -51,14 +51,12 @@ class DataStreamScan( ) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream - convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config) + convertToInternalRow(inputDataStream, dataStreamTable, config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index beb15d20bf4c4..f676176ea64fd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -21,9 +21,9 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -60,9 +60,7 @@ class DataStreamUnion( s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index f2a3d726ccd88..0ab4a489e2c6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.runtime.io.ValuesInputFormat -import org.apache.flink.table.typeutils.TypeConverter._ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.io.ValuesInputFormat +import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -57,18 +57,11 @@ class DataStreamValues( ) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]) - : DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val generator = new CodeGenerator(config) @@ -86,12 +79,12 @@ class DataStreamValues( generatedRecords.map(_.code), returnType) - val inputFormat = new ValuesInputFormat[Any]( + val inputFormat = new ValuesInputFormat[Row]( generatedFunction.name, generatedFunction.code, generatedFunction.returnType) - tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]] + tableEnv.execEnv.createInput(inputFormat, returnType) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index ddac958b80ac8..56f7f2758f13c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -19,17 +19,13 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan._ -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.PojoTypeInfo -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.plan.schema.FlinkTable -import org.apache.flink.table.runtime.MapRunner -import org.apache.flink.table.typeutils.TypeConverter.determineReturnType import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonScan +import org.apache.flink.table.plan.schema.FlinkTable +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -39,69 +35,37 @@ abstract class StreamScan( traitSet: RelTraitSet, table: RelOptTable) extends TableScan(cluster, traitSet, table) + with CommonScan with DataStreamRel { - protected def convertToExpectedType( + protected def convertToInternalRow( input: DataStream[Any], flinkTable: FlinkTable[_], - expectedType: Option[TypeInformation[Any]], - config: TableConfig): DataStream[Any] = { + config: TableConfig) + : DataStream[Row] = { val inputType = input.getType - expectedType match { - - // special case: - // if efficient type usage is enabled and no expected type is set - // we can simply forward the DataSet to the next operator. - // however, we cannot forward PojoTypes as their fields don't have an order - case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] => - input - - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - // conversion - if (determinedType != inputType) { - val generator = new CodeGenerator( - config, - nullableInput = false, - input.getType, - flinkTable.fieldIndexes) + // conversion + if (needsConversion(inputType, internalType)) { - val conversion = generator.generateConverterResultExpression( - determinedType, - getRowType.getFieldNames) + val mapFunc = getConversionMapper( + config, + inputType, + internalType, + "DataStreamSourceConversion", + getRowType.getFieldNames, + Some(flinkTable.fieldIndexes)) - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - val genFunction = generator.generateFunction( - "DataSetSourceConversion", - classOf[MapFunction[Any, Any]], - body, - determinedType) - - val mapFunc = new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - input.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - input - } + input.map(mapFunc).name(opName) + } + // no conversion necessary, forward + else { + input.asInstanceOf[DataStream[Row]] } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 7550593442c91..73d0291999f51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan._ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.StreamTableSource -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -62,15 +62,13 @@ class StreamTableSourceScan( .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } - override def translateToPlan( - tableEnv: StreamTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = tableSource .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config) + convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index e89f14f35f31c..034ff9e34447a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -67,9 +67,10 @@ object AggregateUtil { * */ private[flink] def createPrepareMapFunction( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - groupings: Array[Int], - inputType: RelDataType): MapFunction[Any, Row] = { + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType) + : MapFunction[Row, Row] = { val (aggFieldIndexes,aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -83,7 +84,7 @@ object AggregateUtil { aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] + mapReturnType) mapFunction } @@ -113,11 +114,12 @@ object AggregateUtil { * NOTE: this function is only used for time based window on batch tables. */ def createDataSetWindowPrepareMapFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - groupings: Array[Int], - inputType: RelDataType, - isParserCaseSensitive: Boolean): MapFunction[Any, Row] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType, + isParserCaseSensitive: Boolean) + : MapFunction[Row, Row] = { val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -147,7 +149,7 @@ object AggregateUtil { groupings, timeFieldPos, tumbleTimeWindowSize, - mapReturnType).asInstanceOf[MapFunction[Any, Row]] + mapReturnType) } /** @@ -159,13 +161,14 @@ object AggregateUtil { * NOTE: this function is only used for window on batch tables. */ def createDataSetWindowAggregationGroupReduceFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - properties: Seq[NamedWindowProperty], - isInputCombined: Boolean = false): RichGroupReduceFunction[Row, Row] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + properties: Seq[NamedWindowProperty], + isInputCombined: Boolean = false) + : RichGroupReduceFunction[Row, Row] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -269,10 +272,11 @@ object AggregateUtil { * */ private[flink] def createDataSetWindowAggregationCombineFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - groupings: Array[Int]): RichGroupCombineFunction[Row,Row] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + groupings: Array[Int]) + : RichGroupCombineFunction[Row,Row] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -313,11 +317,12 @@ object AggregateUtil { * */ private[flink] def createAggregateGroupReduceFunction( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - inGroupingSet: Boolean): RichGroupReduceFunction[Row, Row] = { + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + inGroupingSet: Boolean) + : RichGroupReduceFunction[Row, Row] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -370,10 +375,11 @@ object AggregateUtil { * */ private[flink] def createIncrementalAggregateReduceFunction( - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int]): IncrementalAggregateReduceFunction = { + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int]) + : IncrementalAggregateReduceFunction = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 @@ -397,13 +403,13 @@ object AggregateUtil { * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates. */ private[flink] def createAllWindowAggregationFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - properties: Seq[NamedWindowProperty]) - : AllWindowFunction[Row, Row, DataStreamWindow] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { val aggFunction = createAggregateGroupReduceFunction( @@ -427,13 +433,13 @@ object AggregateUtil { * */ private[flink] def createWindowAggregationFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - properties: Seq[NamedWindowProperty]) - : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + properties: Seq[NamedWindowProperty]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { val aggFunction = createAggregateGroupReduceFunction( @@ -457,12 +463,13 @@ object AggregateUtil { * window aggregates. */ private[flink] def createAllWindowIncrementalAggregationFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + properties: Seq[NamedWindowProperty]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 @@ -499,12 +506,13 @@ object AggregateUtil { * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates. */ private[flink] def createWindowIncrementalAggregationFunction( - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - outputType: RelDataType, - groupings: Array[Int], - properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + outputType: RelDataType, + groupings: Array[Int], + properties: Seq[NamedWindowProperty]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala deleted file mode 100644 index a2a120b492630..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.typeutils - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rel.core.JoinRelType._ -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.types.Row -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} -import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkTypeFactory - -import scala.collection.JavaConversions._ - -object TypeConverter { - - val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]] - - /** - * Determines the return type of Flink operators based on the logical fields, the expected - * physical type and configuration parameters. - * - * For example: - * - No physical type expected, only 3 non-null fields and efficient type usage enabled - * -> return Tuple3 - * - No physical type expected, efficient type usage enabled, but 3 nullable fields - * -> return Row because Tuple does not support null values - * - Physical type expected - * -> check if physical type is compatible and return it - * - * @param logicalRowType logical row information - * @param expectedPhysicalType expected physical type - * @param nullable fields can be nullable - * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types) - * @return suitable return type - */ - def determineReturnType( - logicalRowType: RelDataType, - expectedPhysicalType: Option[TypeInformation[Any]], - nullable: Boolean, - useEfficientTypes: Boolean) - : TypeInformation[Any] = { - // convert to type information - val logicalFieldTypes = logicalRowType.getFieldList map { relDataType => - FlinkTypeFactory.toTypeInfo(relDataType.getType) - } - // field names - val logicalFieldNames = logicalRowType.getFieldNames.toList - - val returnType = expectedPhysicalType match { - // a certain physical type is expected (but not Row) - // check if expected physical type is compatible with logical field type - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - if (typeInfo.getArity != logicalFieldTypes.length) { - throw new TableException("Arity of result does not match expected type.") - } - typeInfo match { - - // POJO type expected - case pt: PojoTypeInfo[_] => - logicalFieldNames.zip(logicalFieldTypes) foreach { - case (fName, fType) => - val pojoIdx = pt.getFieldIndex(fName) - if (pojoIdx < 0) { - throw new TableException(s"POJO does not define field name: $fName") - } - val expectedTypeInfo = pt.getTypeAt(pojoIdx) - if (fType != expectedTypeInfo) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $expectedTypeInfo; Actual: $fType") - } - } - - // Tuple/Case class type expected - case ct: CompositeType[_] => - logicalFieldTypes.zipWithIndex foreach { - case (fieldTypeInfo, i) => - val expectedTypeInfo = ct.getTypeAt(i) - if (fieldTypeInfo != expectedTypeInfo) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") - } - } - - // Atomic type expected - case at: AtomicType[_] => - val fieldTypeInfo = logicalFieldTypes.head - if (fieldTypeInfo != at) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $at; Actual: $fieldTypeInfo") - } - - case _ => - throw new TableException("Unsupported result type.") - } - typeInfo - - // Row is expected, create the arity for it - case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] => - new RowTypeInfo(logicalFieldTypes: _*) - - // no physical type - // determine type based on logical fields and configuration parameters - case None => - // no need for efficient types -> use Row - // we cannot use efficient types if row arity > tuple arity or nullable - if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) { - new RowTypeInfo(logicalFieldTypes: _*) - } - // use efficient type tuple or atomic type - else { - if (logicalFieldTypes.length == 1) { - logicalFieldTypes.head - } - else { - new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*) - } - } - } - returnType.asInstanceOf[TypeInformation[Any]] - } - - def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match { - case INNER => JoinType.INNER - case LEFT => JoinType.LEFT_OUTER - case RIGHT => JoinType.RIGHT_OUTER - case FULL => JoinType.FULL_OUTER - } - - def flinkJoinTypeToRelType(joinType: JoinType) = joinType match { - case JoinType.INNER => JoinRelType.INNER - case JoinType.LEFT_OUTER => JoinRelType.LEFT - case JoinType.RIGHT_OUTER => JoinRelType.RIGHT - case JoinType.FULL_OUTER => JoinRelType.FULL - } -} diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index e84c9063a02bb..dece2955fb3a1 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -56,8 +56,7 @@ public TableEnvironmentITCase(TableConfigMode configMode) { @Parameterized.Parameters(name = "Table config = {0}") public static Collection parameters() { return Arrays.asList(new Object[][] { - { TableProgramsTestBase.DEFAULT() }, - { TableProgramsTestBase.EFFICIENT() } + { TableProgramsTestBase.DEFAULT() } }); } @@ -265,8 +264,8 @@ public void testAsFromTupleToPojo() throws Exception { data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world")); Table table = tableEnv - .fromDataSet(env.fromCollection(data), "a, b, c, d") - .select("a, b, c, d"); + .fromDataSet(env.fromCollection(data), "q, w, e, r") + .select("q as a, w as b, e as c, r as d"); DataSet ds = tableEnv.toDataSet(table, SmallPojo2.class); List results = ds.collect(); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index 2b00cc9b4e528..6cbe83402aae5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -261,8 +261,8 @@ object TableEnvironmentITCase { @Parameterized.Parameters(name = "Table config = {0}") def parameters(): util.Collection[Array[java.lang.Object]] = { Seq[Array[AnyRef]]( - Array(TableProgramsTestBase.DEFAULT), - Array(TableProgramsTestBase.EFFICIENT)).asJava + Array(TableProgramsTestBase.DEFAULT) + ).asJava } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala index a699068783594..586d71615adc9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala.batch.utils import java.util import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode} +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode} import org.apache.flink.test.util.MultipleProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.junit.runners.Parameterized @@ -38,8 +38,6 @@ class TableProgramsTestBase( tableConfigMode match { case NO_NULL => conf.setNullCheck(false) - case EFFICIENT => - conf.setEfficientTypeUsage(true) case _ => // keep default } conf @@ -47,11 +45,10 @@ class TableProgramsTestBase( } object TableProgramsTestBase { - case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean) + case class TableConfigMode(nullCheck: Boolean) - val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false) - val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false) - val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true) + val DEFAULT = TableConfigMode(nullCheck = true) + val NO_NULL = TableConfigMode(nullCheck = false) @Parameterized.Parameters(name = "Table config = {0}") def parameters(): util.Collection[Array[java.lang.Object]] = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index 8555632eea56a..b4327ecd4f7a6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -95,7 +95,7 @@ abstract class ExpressionTestBase { val generator = new CodeGenerator(config, false, typeInfo) // cast expressions to String - val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq + val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)) // generate code val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*) @@ -110,16 +110,16 @@ abstract class ExpressionTestBase { |return ${genExpr.resultTerm}; |""".stripMargin - val genFunc = generator.generateFunction[MapFunction[Any, String]]( + val genFunc = generator.generateFunction[MapFunction[Any, Row], Row]( "TestFunction", - classOf[MapFunction[Any, String]], + classOf[MapFunction[Any, Row]], bodyCode, - resultType.asInstanceOf[TypeInformation[Any]]) + resultType) // compile and evaluate - val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) + val clazz = new TestCompiler[MapFunction[Any, Row], Row]().compile(genFunc) val mapper = clazz.newInstance() - val result = mapper.map(testData).asInstanceOf[Row] + val result = mapper.map(testData) // compare testExprs @@ -211,8 +211,8 @@ abstract class ExpressionTestBase { // ---------------------------------------------------------------------------------------------- // TestCompiler that uses current class loader - class TestCompiler[T <: Function] extends Compiler[T] { - def compile(genFunc: GeneratedFunction[T]): Class[T] = + class TestCompiler[F <: Function, T <: Any] extends Compiler[F] { + def compile(genFunc: GeneratedFunction[F, T]): Class[F] = compile(getClass.getClassLoader, genFunc.name, genFunc.code) } } From 2355eedf41959748dc26baafc94df1ddf9ab6223 Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 13 Feb 2017 11:50:16 +0100 Subject: [PATCH 2/3] Comments addressed --- .../table/api/BatchTableEnvironment.scala | 2 ++ .../table/api/StreamTableEnvironment.scala | 2 ++ .../flink/table/api/TableEnvironment.scala | 14 ++++++++++-- .../flink/table/plan/nodes/CommonScan.scala | 22 +++---------------- .../table/plan/nodes/dataset/DataSetRel.scala | 2 +- .../plan/nodes/datastream/DataStreamRel.scala | 2 +- 6 files changed, 21 insertions(+), 23 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index d9fdcba0e0493..fefd83908c957 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -260,6 +260,8 @@ abstract class BatchTableEnvironment( * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary. * * @param logicalPlan The root node of the relational expression tree. + * @param logicalType The row type of the result. Since the logicalPlan can lose the + * field naming during optimization we pass the row type separately. * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. * @tparam A The type of the resulting [[DataSet]]. * @return The [[DataSet]] that corresponds to the translated [[Table]]. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index f10c467e2292c..58c372b15a9b4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -265,6 +265,8 @@ abstract class StreamTableEnvironment( * Translates a logical [[RelNode]] into a [[DataStream]]. * * @param logicalPlan The root node of the relational expression tree. + * @param logicalType The row type of the result. Since the logicalPlan can lose the + * field naming during optimization we pass the row type separately. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4ffcc528b44a5..cfe1320226d96 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -34,7 +34,7 @@ import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TupleTypeInfoBase} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -471,6 +471,12 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Creates a final converter that maps the internal row type to external type. + * + * @param physicalRowTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param expectedTypeInfo the outptu type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. */ protected def sinkConversion[T]( physicalRowTypeInfo: TypeInformation[Row], @@ -522,7 +528,7 @@ abstract class TableEnvironment(val config: TableConfig) { } // Tuple/Case class type expected - case ct: CompositeType[_] => + case ct: TupleTypeInfoBase[_] => logicalFieldTypes.zipWithIndex foreach { case (fieldTypeInfo, i) => val expectedTypeInfo = ct.getTypeAt(i) @@ -534,6 +540,10 @@ abstract class TableEnvironment(val config: TableConfig) { // Atomic type expected case at: AtomicType[_] => + if (logicalFieldTypes.size != 1) { + throw new TableException(s"Result does not have a single field, " + + s"but ${logicalFieldTypes.size} fields.") + } val fieldTypeInfo = logicalFieldTypes.head if (fieldTypeInfo != at) { throw new TableException(s"Result field does not match expected type. " + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala index 56d22a16f0017..274b602e8384d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -33,30 +33,14 @@ trait CommonScan { /** * We check if the input type is exactly the same as the internal row type. - * A conversion is necessary if types differ or object have to be unboxed - * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + * A conversion is necessary if types differ. */ private[flink] def needsConversion( externalTypeInfo: TypeInformation[Any], internalTypeInfo: TypeInformation[Row]) : Boolean = { - if (externalTypeInfo == internalTypeInfo) { - val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo] - var containsBoxedTypes = false - // TODO enable these lines for FLINK-5429 - // for (i <- rowTypeInfo.getArity) { - // val field = rowTypeInfo.getTypeAt(i) - // if (field == SqlTimeTypeInfo.DATE || - // field == SqlTimeTypeInfo.TIME || - // field == SqlTimeTypeInfo.TIMESTAMP) { - // containsBoxedTypes = true - // } - // } - containsBoxedTypes - } else { - true - } + externalTypeInfo != internalTypeInfo } private[flink] def getConversionMapper( @@ -84,7 +68,7 @@ trait CommonScan { val genFunction = generator.generateFunction( conversionOperatorName, - classOf[MapFunction[Row, Row]], + classOf[MapFunction[Any, Row]], body, expectedType) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala index 16a2955ee735b..980f3ccccbeec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala @@ -30,7 +30,7 @@ trait DataSetRel extends RelNode with FlinkRel { * Translates the [[DataSetRel]] node into a [[DataSet]] operator. * * @param tableEnv The [[BatchTableEnvironment]] of the translated Table. - * @return DataSet of type expectedType or RowTypeInfo + * @return DataSet of type [[Row]] */ def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index cd985e89c05d2..6f208311af0f6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -30,7 +30,7 @@ trait DataStreamRel extends RelNode with FlinkRel { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @return DataStream of type expectedType or RowTypeInfo + * @return DataStream of type [[Row]] */ def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] From 40890abe076c0bd0d4cd493abf75aa75b23ac760 Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 13 Feb 2017 16:23:51 +0100 Subject: [PATCH 3/3] Feedback addressed --- .../flink/table/api/TableEnvironment.scala | 71 +++++++++++-------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index cfe1320226d96..be0dc2a83382a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -34,7 +34,7 @@ import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TupleTypeInfoBase} +import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -413,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) { } exprs.map { case UnresolvedFieldReference(name) => (0, name) - case _ => throw new TableException("Field reference expression expected.") + case _ => throw new TableException("Field reference expression requested.") } case t: TupleTypeInfo[A] => exprs.zipWithIndex.map { @@ -474,14 +474,14 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param physicalRowTypeInfo the input of the sink * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) - * @param expectedTypeInfo the outptu type of the sink + * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ protected def sinkConversion[T]( physicalRowTypeInfo: TypeInformation[Row], logicalRowType: RelDataType, - expectedTypeInfo: TypeInformation[T], + requestedTypeInfo: TypeInformation[T], functionName: String) : Option[MapFunction[Row, T]] = { @@ -493,9 +493,9 @@ abstract class TableEnvironment(val config: TableConfig) { "This is a bug and should not happen. Please file an issue.") } - // expected type is a row, no conversion needed - // TODO this logic will change with FLINK-5429 - if (expectedTypeInfo.getTypeClass == classOf[Row]) { + // requested type is a generic Row, no conversion needed + if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] && + requestedTypeInfo.getTypeClass == classOf[Row]) { return None } @@ -506,13 +506,13 @@ abstract class TableEnvironment(val config: TableConfig) { // field names val logicalFieldNames = logicalRowType.getFieldNames.asScala - // validate expected type - if (expectedTypeInfo.getArity != logicalFieldTypes.length) { - throw new TableException("Arity of result does not match expected type.") + // validate requested type + if (requestedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match requested type.") } - expectedTypeInfo match { + requestedTypeInfo match { - // POJO type expected + // POJO type requested case pt: PojoTypeInfo[_] => logicalFieldNames.zip(logicalFieldTypes) foreach { case (fName, fType) => @@ -520,38 +520,49 @@ abstract class TableEnvironment(val config: TableConfig) { if (pojoIdx < 0) { throw new TableException(s"POJO does not define field name: $fName") } - val expectedTypeInfo = pt.getTypeAt(pojoIdx) - if (fType != expectedTypeInfo) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $expectedTypeInfo; Actual: $fType") + val requestedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != requestedTypeInfo) { + throw new TableException(s"Result field does not match requested type. " + + s"requested: $requestedTypeInfo; Actual: $fType") } } - // Tuple/Case class type expected - case ct: TupleTypeInfoBase[_] => + // Tuple/Case class type requested + case tt: TupleTypeInfoBase[_] => logicalFieldTypes.zipWithIndex foreach { case (fieldTypeInfo, i) => - val expectedTypeInfo = ct.getTypeAt(i) - if (fieldTypeInfo != expectedTypeInfo) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + val requestedTypeInfo = tt.getTypeAt(i) + if (fieldTypeInfo != requestedTypeInfo) { + throw new TableException(s"Result field does not match requested type. " + + s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo") } } - // Atomic type expected + // Row type requested + case rt: RowTypeInfo[_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val requestedTypeInfo = rt.getTypeAt(i) + if (fieldTypeInfo != requestedTypeInfo) { + throw new TableException(s"Result field does not match requested type. " + + s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type requested case at: AtomicType[_] => if (logicalFieldTypes.size != 1) { - throw new TableException(s"Result does not have a single field, " + - s"but ${logicalFieldTypes.size} fields.") + throw new TableException(s"Requested result type is an atomic type but " + + s"result has more or less than a single field.") } val fieldTypeInfo = logicalFieldTypes.head if (fieldTypeInfo != at) { - throw new TableException(s"Result field does not match expected type. " + - s"Expected: $at; Actual: $fieldTypeInfo") + throw new TableException(s"Result field does not match requested type. " + + s"Requested: $at; Actual: $fieldTypeInfo") } case _ => - throw new TableException(s"Unsupported result type: $expectedTypeInfo") + throw new TableException(s"Unsupported result type: $requestedTypeInfo") } // code generate MapFunction @@ -563,7 +574,7 @@ abstract class TableEnvironment(val config: TableConfig) { None) val conversion = generator.generateConverterResultExpression( - expectedTypeInfo, + requestedTypeInfo, logicalFieldNames) val body = @@ -576,7 +587,7 @@ abstract class TableEnvironment(val config: TableConfig) { functionName, classOf[MapFunction[Row, T]], body, - expectedTypeInfo) + requestedTypeInfo) val mapFunction = new MapRunner[Row, T]( genFunction.name,