From 7bd26239dd9a7c41f09fbe070baa70b19278c51f Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 22 Dec 2016 21:26:34 +0000 Subject: [PATCH 1/7] [FLINK-5280] Update TableSource to support nested data --- .../connectors/kafka/KafkaTableSource.java | 16 +-- .../table/api/BatchTableEnvironment.scala | 2 +- .../table/api/StreamTableEnvironment.scala | 2 +- .../flink/table/api/TableEnvironment.scala | 104 +++++++++++++----- .../nodes/dataset/BatchTableSourceScan.scala | 6 +- .../datastream/StreamTableSourceScan.scala | 6 +- ...hProjectIntoBatchTableSourceScanRule.scala | 2 +- ...ProjectIntoStreamTableSourceScanRule.scala | 2 +- .../flink/table/plan/schema/FlinkTable.scala | 6 +- .../table/plan/schema/TableSourceTable.scala | 4 +- .../flink/table/sources/CsvTableSource.scala | 7 -- .../flink/table/sources/TableSource.scala | 27 +++-- 12 files changed, 115 insertions(+), 69 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 9a9c85d80e5ab..b82fa89b255a6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.sources.TableSource$class; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sources.StreamTableSource; @@ -111,24 +112,17 @@ public DataStream getDataStream(StreamExecutionEnvironment env) { return kafkaSource; } - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; + public int[] getFieldsIndices() { + return TableSource$class.getFieldsIndices(this); } @Override public TypeInformation getReturnType() { - return new RowTypeInfo(fieldTypes); + return new RowTypeInfo(fieldTypes, fieldNames); } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 59cad80357383..4b9936d637607 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -298,7 +298,7 @@ abstract class BatchTableEnvironment( * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { - validateType(tpe) + TableEnvironment.validateType(tpe) logicalPlan match { case node: DataSetRel => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 4e43001ea3be4..c08b502512d15 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -308,7 +308,7 @@ abstract class StreamTableEnvironment( protected def translate[A] (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - validateType(tpe) + TableEnvironment.validateType(tpe) logicalPlan match { case node: DataStreamRel => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 769008f60298d..30848d9b6ee19 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -31,6 +31,7 @@ import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo @@ -317,16 +318,6 @@ abstract class TableEnvironment(val config: TableConfig) { frameworkConfig } - protected def validateType(typeInfo: TypeInformation[_]): Unit = { - val clazz = typeInfo.getTypeClass - if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || - !Modifier.isPublic(clazz.getModifiers) || - clazz.getCanonicalName == null) { - throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + - s"static and globally accessible.") - } - } - /** * Returns field names and field positions for a given [[TypeInformation]]. * @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) { * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): - (Array[String], Array[Int]) = - { - validateType(inputType) - - val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - } - val fieldIndexes = fieldNames.indices.toArray - - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - - (fieldNames, fieldIndexes) + (Array[String], Array[Int]) = { + TableEnvironment.getFieldInfo(inputType) } /** @@ -374,7 +348,7 @@ abstract class TableEnvironment(val config: TableConfig) { inputType: TypeInformation[A], exprs: Array[Expression]): (Array[String], Array[Int]) = { - validateType(inputType) + TableEnvironment.validateType(inputType) val indexedNames: Array[(Int, String)] = inputType match { case a: AtomicType[A] => @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [[TypeInformation]]. + * + * Field names are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + validateType(inputType) + + val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [[TypeInformation]]. + * + * Field types are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + } + + private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = { + 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 09cb180aa3ecb..8a85802fbbc1e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.BatchTableSource @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 702b6ebe62cd6..d098546b9ae99 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -38,7 +38,9 @@ class StreamTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala index 7adec48849fc9..f9cf8fe0fb63e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala @@ -47,7 +47,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan. - if (scan.tableSource.getNumberOfFields != usedFields.length) { + if (scan.tableSource.getFieldsNames.length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new BatchTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala index 654fb8fb0e914..6a920144f4d3b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala @@ -48,7 +48,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( val usedFields = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan - if (scan.tableSource.getNumberOfFields != usedFields.length) { + if (scan.tableSource.getFieldsNames.length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new StreamTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 8bb5c8117a159..2593e87e1dc7e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable[T]( - val typeInfo: TypeInformation[T], + val typeInfo: TypeInformation[_], val fieldIndexes: Array[Int], val fieldNames: Array[String]) extends AbstractTable { @@ -44,14 +44,14 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = typeInfo match { - case cType: CompositeType[T] => + case cType: CompositeType[_] => if (fieldNames.length != cType.getArity) { throw new TableException( s"Arity of type (" + cType.getFieldNames.deep + ") " + "not equal to number of field names " + fieldNames.deep + ".") } fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => + case aType: AtomicType[_] => if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { throw new TableException( "Non-composite input type may have only a single field and its index must be 0.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 0f55daf8efa5b..1fa8cf8fc9c16 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable(val tableSource: TableSource[_]) extends FlinkTable[Row]( - typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*), - fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, + typeInfo = tableSource.getReturnType, + fieldIndexes = tableSource.getFieldsIndices, fieldNames = tableSource.getFieldsNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 20e8bb96c3eb3..14e31677a1dd8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -86,16 +86,9 @@ class CsvTableSource( override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType) } - - /** Returns the types of the table fields. */ - override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes - /** Returns the names of the table fields. */ override def getFieldsNames: Array[String] = fieldNames - /** Returns the number of fields of the table. */ - override def getNumberOfFields: Int = fieldNames.length - /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index 9d4ba6861cd99..f8439d1bee7e0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the number of fields of the table. */ - def getNumberOfFields: Int - /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] - - /** Returns the types of the table fields. */ - def getFieldTypes: Array[TypeInformation[_]] + def getFieldsNames: Array[String] = { + TableEnvironment.getFieldInfo(getReturnType)._1 + } + + /** Returns the indices of the table fields. */ + def getFieldsIndices: Array[Int] = { + getFieldsNames.indices.toArray + } /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] From f98f6d773bc20f58ba5b668479da546c7d6d179e Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Sat, 24 Dec 2016 03:48:22 +0000 Subject: [PATCH 2/7] [FLINK-5280] Fix according to review --- .../connectors/kafka/KafkaTableSource.java | 4 +- .../flink/table/api/TableEnvironment.scala | 44 ++++++++++--------- .../utils/UserDefinedFunctionUtils.scala | 22 ++-------- .../dataSet/BatchTableSourceScanRule.scala | 6 +-- .../StreamTableSourceScanRule.scala | 6 +-- .../flink/table/plan/schema/FlinkTable.scala | 2 +- .../table/plan/schema/TableSourceTable.scala | 8 ++-- .../flink/table/sources/TableSource.scala | 12 ++--- .../flink/table/TableEnvironmentTest.scala | 7 ++- 9 files changed, 51 insertions(+), 60 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index b82fa89b255a6..f0f6f823fa736 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -116,8 +116,8 @@ public String[] getFieldsNames() { return TableSource$class.getFieldsNames(this); } - public int[] getFieldsIndices() { - return TableSource$class.getFieldsIndices(this); + public int[] getFieldsIndexes() { + return TableSource$class.getFieldsIndexes(this); } @Override diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 30848d9b6ee19..3bc0f0bfc2dc6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -43,6 +43,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.codegen.ExpressionReducer import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory @@ -332,7 +333,7 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { - TableEnvironment.getFieldInfo(inputType) + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndexes(inputType)) } /** @@ -522,26 +523,29 @@ object TableEnvironment { * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { validateType(inputType) val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") } - val fieldIndexes = fieldNames.indices.toArray if (fieldNames.contains("*")) { throw new TableException("Field name can not be '*'.") } - (fieldNames, fieldIndexes) + fieldNames } + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ def validateType(typeInfo: TypeInformation[_]): Unit = { val clazz = typeInfo.getTypeClass if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || @@ -552,6 +556,10 @@ object TableEnvironment { } } + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { + getFieldNames(inputType).indices.toArray + } + /** * Returns field types for a given [[TypeInformation]]. * @@ -566,17 +574,13 @@ object TableEnvironment { def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { validateType(inputType) - inputType match { - case t: TupleTypeInfo[_] => getTypes(t) - case c: CaseClassTypeInfo[_] => getTypes(c) - case p: PojoTypeInfo[_] => getTypes(p) - case r: RowTypeInfo => getTypes(r) - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") + getFieldNames(inputType).map { i => + inputType match { + case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]] + case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]] + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported.") + } } } - - private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = { - 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray - } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index aa3fab075baac..cc43fd0d29e49 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction} import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl import org.apache.flink.util.InstantiationUtil @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation[_]) : (Array[String], Array[Int], Array[TypeInformation[_]]) = { - val fieldNames: Array[String] = inputType match { - case t: CompositeType[_] => t.getFieldNames - case a: AtomicType[_] => Array("f0") - case tpe => - throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + - s"Type $tpe lacks explicit field naming") - } - val fieldIndexes = fieldNames.indices.toArray - val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i => - inputType match { - case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]] - case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]] - case tpe => - throw new TableException(s"Currently only CompositeType and AtomicType are supported.") - } - } - (fieldNames, fieldIndexes, fieldTypes) + (TableEnvironment.getFieldNames(inputType), + TableEnvironment.getFieldIndexes(inputType), + TableEnvironment.getFieldTypes(inputType)) } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index d6995850827ea..d9f5bf8b68cb0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -39,9 +39,9 @@ class BatchTableSourceScanRule /** Rule must only match if TableScan targets a [[BatchTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) dataSetTable match { - case tst: TableSourceTable => + case tst: TableSourceTable[_] => tst.tableSource match { case _: BatchTableSource[_] => true @@ -57,7 +57,7 @@ class BatchTableSourceScanRule val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource .asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 296c86be54137..c6c7c59ad2af7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -40,9 +40,9 @@ class StreamTableSourceScanRule /** Rule must only match if TableScan targets a [[StreamTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) dataSetTable match { - case tst: TableSourceTable => + case tst: TableSourceTable[_] => tst.tableSource match { case _: StreamTableSource[_] => true @@ -59,7 +59,7 @@ class StreamTableSourceScanRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) // The original registered table source - val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val table = scan.getTable.unwrap(classOf[TableSourceTable[_]]) val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]] new StreamTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 2593e87e1dc7e..3660de82a0293 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable[T]( - val typeInfo: TypeInformation[_], + val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], val fieldNames: Array[String]) extends AbstractTable { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 1fa8cf8fc9c16..f2f51cf98204d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,13 +18,11 @@ package org.apache.flink.table.plan.schema -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.sources.TableSource -import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ -class TableSourceTable(val tableSource: TableSource[_]) - extends FlinkTable[Row]( +class TableSourceTable[T](val tableSource: TableSource[T]) + extends FlinkTable[T]( typeInfo = tableSource.getReturnType, - fieldIndexes = tableSource.getFieldsIndices, + fieldIndexes = tableSource.getFieldsIndexes, fieldNames = tableSource.getFieldsNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index f8439d1bee7e0..863476d23ddbf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -26,11 +26,11 @@ import org.apache.flink.table.api.TableEnvironment * Schema information consists of a data type, field names, and corresponding indices of * these names in the data type. * - * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case - * field names and field indices are deducted from the returned type. + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. * * In case if custom field names are required one need to implement both - * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]]. + * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]]. * * @tparam T The return type of the [[TableSource]]. */ @@ -38,12 +38,12 @@ trait TableSource[T] { /** Returns the names of the table fields. */ def getFieldsNames: Array[String] = { - TableEnvironment.getFieldInfo(getReturnType)._1 + TableEnvironment.getFieldNames(getReturnType) } /** Returns the indices of the table fields. */ - def getFieldsIndices: Array[Int] = { - getFieldsNames.indices.toArray + def getFieldsIndexes: Array[Int] = { + TableEnvironment.getFieldIndexes(getReturnType) } /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index b90de97dfcfa8..f91aee91be52c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -68,9 +68,12 @@ class TableEnvironmentTest { fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) } - @Test(expected = classOf[TableException]) + @Test def testGetFieldInfoAtomic(): Unit = { - tEnv.getFieldInfo(atomicType) + val fieldInfo = tEnv.getFieldInfo(atomicType) + + fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) } @Test From 5ab0016ff3a313799be82f82d0d253224f8ba045 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 29 Dec 2016 21:22:29 +0000 Subject: [PATCH 3/7] [FLINK-5280] Fix according to review --- .../connectors/kafka/KafkaTableSource.java | 12 +--- .../flink/table/api/TableEnvironment.scala | 54 +++++++++++------ .../flink/table/codegen/CodeGenerator.scala | 2 +- .../flink/table/plan/nodes/FlinkRel.scala | 1 + .../flink/table/plan/schema/FlinkTable.scala | 2 +- .../table/plan/schema/TableSourceTable.scala | 2 +- .../AbstractBatchStreamTableSource.scala | 30 ++++++++++ .../sources/AbstractBatchTableSource.scala | 30 ++++++++++ .../sources/AbstractStreamTableSource.scala | 30 ++++++++++ .../table/sources/AbstractTableSource.scala | 39 ++++++++++++ .../sources/BatchStreamTableSource.scala | 27 +++++++++ .../flink/table/sources/CsvTableSource.scala | 3 +- .../flink/table/sources/TableSource.scala | 8 +-- .../api/java/batch/TableSourceITCase.java | 31 ++++++++-- .../api/scala/batch/TableSourceITCase.scala | 19 ++++++ .../flink/table/utils/CommonTestData.scala | 59 ++++++++++++++++++- .../api/scala/io/CsvInputFormatTest.scala | 2 - 17 files changed, 301 insertions(+), 50 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index f0f6f823fa736..fceb7f0f7c700 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.sources.TableSource$class; +import org.apache.flink.table.sources.AbstractStreamTableSource; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sources.StreamTableSource; @@ -38,7 +38,7 @@ *

The version-specific Kafka consumers need to extend this class and * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. */ -public abstract class KafkaTableSource implements StreamTableSource { +public abstract class KafkaTableSource extends AbstractStreamTableSource { /** The Kafka topic to consume. */ private final String topic; @@ -112,14 +112,6 @@ public DataStream getDataStream(StreamExecutionEnvironment env) { return kafkaSource; } - public String[] getFieldsNames() { - return TableSource$class.getFieldsNames(this); - } - - public int[] getFieldsIndexes() { - return TableSource$class.getFieldsIndexes(this); - } - @Override public TypeInformation getReturnType() { return new RowTypeInfo(fieldTypes, fieldNames); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 3bc0f0bfc2dc6..6d5538280d487 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.api -import _root_.java.util.concurrent.atomic.AtomicInteger import _root_.java.lang.reflect.Modifier +import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex import org.apache.calcite.plan.RelOptPlanner @@ -32,7 +32,7 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -43,7 +43,6 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.codegen.ExpressionReducer import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory @@ -323,9 +322,11 @@ abstract class TableEnvironment(val config: TableConfig) { * Returns field names and field positions for a given [[TypeInformation]]. * * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * * @param inputType The TypeInformation extract the field names and positions from. * @tparam A The type of the TypeInformation. @@ -512,16 +513,18 @@ object TableEnvironment { } /** - * Returns field names and field positions for a given [[TypeInformation]]. + * Returns field names for a given [[TypeInformation]]. * * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * - * @param inputType The TypeInformation extract the field names and positions from. + * @param inputType The TypeInformation extract the field names. * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. + * @return A an array holding the field names */ def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { validateType(inputType) @@ -556,6 +559,19 @@ object TableEnvironment { } } + /** + * Returns field indexes for a given [[TypeInformation]]. + * + * Field indexes are automatically extracted for + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. + * The method fails if inputType is not a + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. + * + * @param inputType The TypeInformation extract the field positions from. + * @return A an array holding the field positions + */ def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { getFieldNames(inputType).indices.toArray } @@ -564,9 +580,11 @@ object TableEnvironment { * Returns field types for a given [[TypeInformation]]. * * Field types are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. + * [[org.apache.flink.api.common.typeutils.CompositeType]] + * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. * * @param inputType The TypeInformation to extract field types from. * @return an holding the field types. @@ -574,13 +592,11 @@ object TableEnvironment { def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { validateType(inputType) - getFieldNames(inputType).map { i => - inputType match { - case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]] - case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]] - case tpe => - throw new TableException(s"Currently only CompositeType and AtomicType are supported.") - } + inputType match { + case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt(_)).toArray + case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]]) + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported.") } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 13fe4c374cf3b..ea7431f4310b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 835f31698e4f0..73a5dc57a2525 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -120,6 +120,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 + case SqlTypeName.ROW => s case _ => throw TableException(s"Unsupported data type encountered: $t") } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 3660de82a0293..971f54f8cbde6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -22,7 +22,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable[T]( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index f2f51cf98204d..21444f1a24d84 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -24,5 +24,5 @@ import org.apache.flink.table.sources.TableSource class TableSourceTable[T](val tableSource: TableSource[T]) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, - fieldIndexes = tableSource.getFieldsIndexes, + fieldIndexes = tableSource.getFieldsIndices, fieldNames = tableSource.getFieldsNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala new file mode 100644 index 0000000000000..8406bd630285f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [[BatchStreamTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractBatchStreamTableSource[T] + extends AbstractTableSource[T] + with BatchStreamTableSource[T] { + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala new file mode 100644 index 0000000000000..3f8aeb3f8bdbd --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [[BatchTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractBatchTableSource[T] + extends AbstractTableSource[T] + with BatchTableSource[T] { + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala new file mode 100644 index 0000000000000..97446633eb9c9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [[StreamTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractStreamTableSource[T] + extends AbstractTableSource[T] + with StreamTableSource[T] { + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala new file mode 100644 index 0000000000000..29912efb89364 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.api.TableEnvironment + +/** + * Partial implementation of the [[AbstractTableSource]] trait. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +abstract class AbstractTableSource[T] extends TableSource[T] { + + /** Returns the names of the table fields. */ + override def getFieldsNames: Array[String] = { + TableEnvironment.getFieldNames(getReturnType) + } + + /** Returns the indices of the table fields. */ + override def getFieldsIndices: Array[Int] = { + TableEnvironment.getFieldIndexes(getReturnType) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala new file mode 100644 index 0000000000000..b84f4ded6bd47 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** Defines an external batch table and an external stream table and provides access to its data. + * + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + */ +trait BatchStreamTableSource[T] extends BatchTableSource[T] with StreamTableSource[T] { + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 14e31677a1dd8..3f5b23e5ca42e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -53,8 +53,7 @@ class CsvTableSource( ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) - extends BatchTableSource[Row] - with StreamTableSource[Row] + extends AbstractBatchStreamTableSource[Row] with ProjectableTableSource[Row] { /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index 863476d23ddbf..566bb138e003a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -37,14 +37,10 @@ import org.apache.flink.table.api.TableEnvironment trait TableSource[T] { /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] = { - TableEnvironment.getFieldNames(getReturnType) - } + def getFieldsNames: Array[String] /** Returns the indices of the table fields. */ - def getFieldsIndexes: Array[Int] = { - TableEnvironment.getFieldIndexes(getReturnType) - } + def getFieldsIndices: Array[Int] /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java index e5777f28a18fc..8e384e321531e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java @@ -18,17 +18,15 @@ package org.apache.flink.table.api.java.batch; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.utils.CommonTestData; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.utils.CommonTestData; +import org.apache.flink.types.Row; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -92,4 +90,25 @@ public void testBatchTableSourceSQL() throws Exception { compareResultAsText(results, expected); } + @Test + public void testNestedBatchTableSourceSQL() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + BatchTableSource nestedTable = CommonTestData.getNestedTableSource(); + + tableEnv.registerTableSource("NestedPersons", nestedTable); + + Table result = tableEnv + .sql("SELECT NestedPersons.firstName, NestedPersons.lastName," + + "NestedPersons.address.street, NestedPersons.address.city AS city " + + "FROM NestedPersons " + + "WHERE NestedPersons.address.city LIKE 'Dublin'"); + + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + + String expected = "Bob,Taylor,Pearse Street,Dublin"; + + compareResultAsText(results, expected); + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala index a9218ac72cf04..f5ab352eee301 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.utils.CommonTestData import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -86,4 +87,22 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testNestedBatchTableSourceSQL(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + val nestedTable = CommonTestData.getNestedTableSource + + tableEnv.registerTableSource("NestedPersons", nestedTable) + + val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," + + "NestedPersons.address.street, NestedPersons.address.city AS city " + + "FROM NestedPersons " + + "WHERE NestedPersons.address.city LIKE 'Dublin'").collect() + + val expected = "Bob,Taylor,Pearse Street,Dublin" + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala index 349b36999dc72..440c2b716a3c7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala @@ -19,9 +19,16 @@ package org.apache.flink.table.utils import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.util -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.table.sources.CsvTableSource +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.sources.{AbstractBatchTableSource, BatchTableSource, CsvTableSource, TableSource} +import org.apache.flink.api.scala._ object CommonTestData { @@ -60,4 +67,52 @@ object CommonTestData { ignoreComments = "%" ) } + + def getNestedTableSource: BatchTableSource[Person] = { + new AbstractBatchTableSource[Person] { + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = { + val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment + executionEnvironment.fromCollection( + util.Arrays.asList( + new Person("Mike", "Smith", new Address("5th Ave", "New-York")), + new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")), + new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))), + getReturnType + ) + } + + /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ + override def getReturnType: TypeInformation[Person] = new PojoTypeInfo[Person]( + classOf[Person], + util.Arrays.asList( + new PojoField(classOf[Person].getDeclaredField("firstName"), + BasicTypeInfo.STRING_TYPE_INFO), + new PojoField(classOf[Person].getDeclaredField("lastName"), + BasicTypeInfo.STRING_TYPE_INFO), + new PojoField(classOf[Person].getDeclaredField("address"), + new PojoTypeInfo[Address]( + classOf[Address], + util.Arrays.asList( + new PojoField(classOf[Address].getDeclaredField("street"), + BasicTypeInfo.STRING_TYPE_INFO), + new PojoField(classOf[Address].getDeclaredField("city"), + BasicTypeInfo.STRING_TYPE_INFO) + ) + )) + ) + ) + } + } + + class Person(var firstName: String, var lastName: String, var address: Address) { + def this() { + this(null, null, null) + } + } + + class Address(var street: String, var city: String) { + def this() { + this(null, null) + } + } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index 539a2578dba30..925ee7849f946 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -19,8 +19,6 @@ package org.apache.flink.api.scala.io import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.io.PojoCsvInputFormat import org.apache.flink.api.java.io.TupleCsvInputFormat import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO From e67208a9c01e45101a7a386796eff9178ae7f962 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Thu, 29 Dec 2016 23:21:46 +0000 Subject: [PATCH 4/7] [FLINK-5280] Update estimateRowSize to process ROW type --- .../org/apache/flink/table/plan/nodes/FlinkRel.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 73a5dc57a2525..57803428e96dc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.plan.nodes -import org.apache.calcite.rel.`type`.RelDataType +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction @@ -103,9 +105,11 @@ trait FlinkRel { } + private[flink] def estimateRowSize(rowType: RelDataType): Double = { + val fieldList = rowType.getFieldList - rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) => + fieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) => t match { case SqlTypeName.TINYINT => s + 1 case SqlTypeName.SMALLINT => s + 2 @@ -120,7 +124,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 - case SqlTypeName.ROW => s + case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(0).getType()).asInstanceOf[Int] case _ => throw TableException(s"Unsupported data type encountered: $t") } } From 4031d40489efa9f61acf81a2602309d7f58772f8 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Sun, 8 Jan 2017 22:04:40 +0000 Subject: [PATCH 5/7] [FLINK-5280] Fix according to review --- .../flink/table/api/TableEnvironment.scala | 21 ----------------- .../flink/table/codegen/CodeGenerator.scala | 4 ++-- .../flink/table/plan/nodes/FlinkRel.scala | 6 ++--- .../nodes/dataset/BatchTableSourceScan.scala | 4 ++-- .../datastream/StreamTableSourceScan.scala | 4 ++-- ...hProjectIntoBatchTableSourceScanRule.scala | 2 +- ...ProjectIntoStreamTableSourceScanRule.scala | 2 +- .../table/plan/schema/TableSourceTable.scala | 4 ++-- .../table/sources/AbstractTableSource.scala | 4 ++-- .../flink/table/sources/CsvTableSource.scala | 2 +- .../flink/table/sources/TableSource.scala | 4 ++-- .../api/java/batch/TableSourceITCase.java | 22 ------------------ .../flink/table/utils/CommonTestData.scala | 23 ++----------------- 13 files changed, 20 insertions(+), 82 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 6d5538280d487..28b41604c5250 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -515,13 +515,6 @@ object TableEnvironment { /** * Returns field names for a given [[TypeInformation]]. * - * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * * @param inputType The TypeInformation extract the field names. * @tparam A The type of the TypeInformation. * @return A an array holding the field names @@ -562,13 +555,6 @@ object TableEnvironment { /** * Returns field indexes for a given [[TypeInformation]]. * - * Field indexes are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * * @param inputType The TypeInformation extract the field positions from. * @return A an array holding the field positions */ @@ -579,13 +565,6 @@ object TableEnvironment { /** * Returns field types for a given [[TypeInformation]]. * - * Field types are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]] - * or [[org.apache.flink.api.common.typeinfo.AtomicType]]. - * * @param inputType The TypeInformation to extract field types from. * @return an holding the field types. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index ea7431f4310b1..7c68e2af20044 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.api.{TableConfig, TableEnvironment} +import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} @@ -39,8 +39,8 @@ import org.apache.flink.table.codegen.Indenter.toISC import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ import org.apache.flink.table.functions.UserDefinedFunction -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.typeutils.TypeConverter import scala.collection.JavaConversions._ import scala.collection.mutable diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 57803428e96dc..9b844beb43aca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -109,8 +109,8 @@ trait FlinkRel { private[flink] def estimateRowSize(rowType: RelDataType): Double = { val fieldList = rowType.getFieldList - fieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) => - t match { + fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) => + t._1 match { case SqlTypeName.TINYINT => s + 1 case SqlTypeName.SMALLINT => s + 2 case SqlTypeName.INTEGER => s + 4 @@ -124,7 +124,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 - case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(0).getType()).asInstanceOf[Int] + case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int] case _ => throw TableException(s"Unsupported data type encountered: $t") } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 8a85802fbbc1e..bc933701e34f8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -39,7 +39,7 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildRowDataType( - tableSource.getFieldsNames, + tableSource.getFieldNames, TableEnvironment.getFieldTypes(tableSource.getReturnType)) } @@ -59,7 +59,7 @@ class BatchTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldsNames.mkString(", ")) + .item("fields", tableSource.getFieldNames.mkString(", ")) } override def translateToPlan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index d098546b9ae99..4e76ac0dcb13b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -39,7 +39,7 @@ class StreamTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildRowDataType( - tableSource.getFieldsNames, + tableSource.getFieldNames, TableEnvironment.getFieldTypes(tableSource.getReturnType)) } @@ -59,7 +59,7 @@ class StreamTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldsNames.mkString(", ")) + .item("fields", tableSource.getFieldNames.mkString(", ")) } override def translateToPlan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala index f9cf8fe0fb63e..f0fedaf8a1633 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala @@ -47,7 +47,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan. - if (scan.tableSource.getFieldsNames.length != usedFields.length) { + if (scan.tableSource.getFieldNames.length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new BatchTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala index 6a920144f4d3b..05ca048ee3346 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala @@ -48,7 +48,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( val usedFields = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan - if (scan.tableSource.getFieldsNames.length != usedFields.length) { + if (scan.tableSource.getFieldNames.length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new StreamTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 21444f1a24d84..e139b3d9be98d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -24,5 +24,5 @@ import org.apache.flink.table.sources.TableSource class TableSourceTable[T](val tableSource: TableSource[T]) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, - fieldIndexes = tableSource.getFieldsIndices, - fieldNames = tableSource.getFieldsNames) + fieldIndexes = tableSource.getFieldIndices, + fieldNames = tableSource.getFieldNames) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala index 29912efb89364..10f60ef39e525 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala @@ -28,12 +28,12 @@ import org.apache.flink.table.api.TableEnvironment abstract class AbstractTableSource[T] extends TableSource[T] { /** Returns the names of the table fields. */ - override def getFieldsNames: Array[String] = { + override def getFieldNames: Array[String] = { TableEnvironment.getFieldNames(getReturnType) } /** Returns the indices of the table fields. */ - override def getFieldsIndices: Array[Int] = { + override def getFieldIndices: Array[Int] = { TableEnvironment.getFieldIndexes(getReturnType) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 3f5b23e5ca42e..06139b1324563 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -86,7 +86,7 @@ class CsvTableSource( execEnv.createInput(createCsvInput(), returnType) } /** Returns the names of the table fields. */ - override def getFieldsNames: Array[String] = fieldNames + override def getFieldNames: Array[String] = fieldNames /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index 566bb138e003a..cb34bb72eb29f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -37,10 +37,10 @@ import org.apache.flink.table.api.TableEnvironment trait TableSource[T] { /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] + def getFieldNames: Array[String] /** Returns the indices of the table fields. */ - def getFieldsIndices: Array[Int] + def getFieldIndices: Array[Int] /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java index 8e384e321531e..d67725e3607d3 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java @@ -89,26 +89,4 @@ public void testBatchTableSourceSQL() throws Exception { compareResultAsText(results, expected); } - - @Test - public void testNestedBatchTableSourceSQL() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - BatchTableSource nestedTable = CommonTestData.getNestedTableSource(); - - tableEnv.registerTableSource("NestedPersons", nestedTable); - - Table result = tableEnv - .sql("SELECT NestedPersons.firstName, NestedPersons.lastName," + - "NestedPersons.address.street, NestedPersons.address.city AS city " + - "FROM NestedPersons " + - "WHERE NestedPersons.address.city LIKE 'Dublin'"); - - DataSet resultSet = tableEnv.toDataSet(result, Row.class); - List results = resultSet.collect(); - - String expected = "Bob,Taylor,Pearse Street,Dublin"; - - compareResultAsText(results, expected); - } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala index 440c2b716a3c7..71bcb0162108b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala @@ -24,7 +24,7 @@ import java.util import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor} import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.table.sources.{AbstractBatchTableSource, BatchTableSource, CsvTableSource, TableSource} @@ -81,26 +81,7 @@ object CommonTestData { ) } - /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ - override def getReturnType: TypeInformation[Person] = new PojoTypeInfo[Person]( - classOf[Person], - util.Arrays.asList( - new PojoField(classOf[Person].getDeclaredField("firstName"), - BasicTypeInfo.STRING_TYPE_INFO), - new PojoField(classOf[Person].getDeclaredField("lastName"), - BasicTypeInfo.STRING_TYPE_INFO), - new PojoField(classOf[Person].getDeclaredField("address"), - new PojoTypeInfo[Address]( - classOf[Address], - util.Arrays.asList( - new PojoField(classOf[Address].getDeclaredField("street"), - BasicTypeInfo.STRING_TYPE_INFO), - new PojoField(classOf[Address].getDeclaredField("city"), - BasicTypeInfo.STRING_TYPE_INFO) - ) - )) - ) - ) + override def getReturnType: TypeInformation[Person] = TypeExtractor.getForClass(classOf[Person]) } } From 030bbfce8fa52ad6df87f6a191119df5d9c8acdb Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Mon, 9 Jan 2017 09:22:08 +0000 Subject: [PATCH 6/7] [FLINK-5280] Fix TableSource class hierarchy --- .../connectors/kafka/KafkaTableSource.java | 7 ++-- .../flink/table/api/TableEnvironment.scala | 35 ++++++++++++++--- .../utils/UserDefinedFunctionUtils.scala | 2 +- .../nodes/dataset/BatchTableSourceScan.scala | 4 +- .../datastream/StreamTableSourceScan.scala | 4 +- ...hProjectIntoBatchTableSourceScanRule.scala | 3 +- ...ProjectIntoStreamTableSourceScanRule.scala | 3 +- .../table/plan/schema/TableSourceTable.scala | 5 ++- .../sources/AbstractBatchTableSource.scala | 30 -------------- .../sources/AbstractStreamTableSource.scala | 30 -------------- .../table/sources/AbstractTableSource.scala | 39 ------------------- .../sources/BatchStreamTableSource.scala | 27 ------------- .../flink/table/sources/CsvTableSource.scala | 9 ++++- ...leSource.scala => DefinedFieldNames.scala} | 15 ++++--- .../flink/table/sources/TableSource.scala | 10 +---- .../flink/table/utils/CommonTestData.scala | 8 ++-- 16 files changed, 69 insertions(+), 162 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/{AbstractBatchStreamTableSource.scala => DefinedFieldNames.scala} (71%) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index fceb7f0f7c700..dd32bdda24828 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,13 +19,12 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.sources.AbstractStreamTableSource; -import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import java.util.Properties; @@ -38,7 +37,7 @@ *

The version-specific Kafka consumers need to extend this class and * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. */ -public abstract class KafkaTableSource extends AbstractStreamTableSource { +public abstract class KafkaTableSource implements StreamTableSource { /** The Kafka topic to consume. */ private final String topic; diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 28b41604c5250..2e3c2663cd7db 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -48,6 +48,7 @@ import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.schema.RelTable import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog import _root_.scala.collection.JavaConverters._ @@ -334,7 +335,7 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { - (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndexes(inputType)) + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) } /** @@ -517,7 +518,7 @@ object TableEnvironment { * * @param inputType The TypeInformation extract the field names. * @tparam A The type of the TypeInformation. - * @return A an array holding the field names + * @return An array holding the field names */ def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { validateType(inputType) @@ -556,9 +557,9 @@ object TableEnvironment { * Returns field indexes for a given [[TypeInformation]]. * * @param inputType The TypeInformation extract the field positions from. - * @return A an array holding the field positions + * @return An array holding the field positions */ - def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = { + def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = { getFieldNames(inputType).indices.toArray } @@ -566,7 +567,7 @@ object TableEnvironment { * Returns field types for a given [[TypeInformation]]. * * @param inputType The TypeInformation to extract field types from. - * @return an holding the field types. + * @return An array holding the field types. */ def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { validateType(inputType) @@ -578,4 +579,28 @@ object TableEnvironment { throw new TableException(s"Currently only CompositeType and AtomicType are supported.") } } + + /** + * Returns field names for a given [[TableSource]]. + * + * @param tableSource The TableSource to extract field names from. + * @tparam A The type of the TableSource. + * @return An array holding the field names. + */ + def getFieldNames[A](tableSource: TableSource[A]): Array[String] = tableSource match { + case d: DefinedFieldNames => d.getFieldNames + case _ => TableEnvironment.getFieldNames(tableSource.getReturnType) + } + + /** + * Returns field indices for a given [[TableSource]]. + * + * @param tableSource The TableSource to extract field indices from. + * @tparam A The type of the TableSource. + * @return An array holding the field indices. + */ + def getFieldIndices[A](tableSource: TableSource[A]): Array[Int] = tableSource match { + case d: DefinedFieldNames => d.getFieldIndices + case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index cc43fd0d29e49..fa4668d9da132 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -269,7 +269,7 @@ object UserDefinedFunctionUtils { : (Array[String], Array[Int], Array[TypeInformation[_]]) = { (TableEnvironment.getFieldNames(inputType), - TableEnvironment.getFieldIndexes(inputType), + TableEnvironment.getFieldIndices(inputType), TableEnvironment.getFieldTypes(inputType)) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index bc933701e34f8..73dddc6bacce3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -39,7 +39,7 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildRowDataType( - tableSource.getFieldNames, + TableEnvironment.getFieldNames(tableSource), TableEnvironment.getFieldTypes(tableSource.getReturnType)) } @@ -59,7 +59,7 @@ class BatchTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldNames.mkString(", ")) + .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } override def translateToPlan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 4e76ac0dcb13b..7550593442c91 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -39,7 +39,7 @@ class StreamTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildRowDataType( - tableSource.getFieldNames, + TableEnvironment.getFieldNames(tableSource), TableEnvironment.getFieldTypes(tableSource.getReturnType)) } @@ -59,7 +59,7 @@ class StreamTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldNames.mkString(", ")) + .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } override def translateToPlan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala index f0fedaf8a1633..70639b784b3ed 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource} @@ -47,7 +48,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan. - if (scan.tableSource.getFieldNames.length != usedFields.length) { + if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new BatchTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala index 05ca048ee3346..a6d4b821838dc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource} @@ -48,7 +49,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( val usedFields = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan - if (scan.tableSource.getFieldNames.length != usedFields.length) { + if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new StreamTableSourceScan( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index e139b3d9be98d..4f82f5ef1ed67 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,11 +18,12 @@ package org.apache.flink.table.plan.schema +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T](val tableSource: TableSource[T]) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, - fieldIndexes = tableSource.getFieldIndices, - fieldNames = tableSource.getFieldNames) + fieldIndexes = TableEnvironment.getFieldIndices(tableSource), + fieldNames = TableEnvironment.getFieldNames(tableSource)) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala deleted file mode 100644 index 3f8aeb3f8bdbd..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchTableSource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -/** - * Partial implementation of the [[BatchTableSource]] trait. - * - * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. - */ -abstract class AbstractBatchTableSource[T] - extends AbstractTableSource[T] - with BatchTableSource[T] { - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala deleted file mode 100644 index 97446633eb9c9..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractStreamTableSource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -/** - * Partial implementation of the [[StreamTableSource]] trait. - * - * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. - */ -abstract class AbstractStreamTableSource[T] - extends AbstractTableSource[T] - with StreamTableSource[T] { - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala deleted file mode 100644 index 10f60ef39e525..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractTableSource.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -import org.apache.flink.table.api.TableEnvironment - -/** - * Partial implementation of the [[AbstractTableSource]] trait. - * - * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. - */ -abstract class AbstractTableSource[T] extends TableSource[T] { - - /** Returns the names of the table fields. */ - override def getFieldNames: Array[String] = { - TableEnvironment.getFieldNames(getReturnType) - } - - /** Returns the indices of the table fields. */ - override def getFieldIndices: Array[Int] = { - TableEnvironment.getFieldIndexes(getReturnType) - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala deleted file mode 100644 index b84f4ded6bd47..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchStreamTableSource.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -/** Defines an external batch table and an external stream table and provides access to its data. - * - * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. - */ -trait BatchStreamTableSource[T] extends BatchTableSource[T] with StreamTableSource[T] { - -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 06139b1324563..73318a6537a65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableEnvironment, TableException} /** * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a @@ -53,7 +53,9 @@ class CsvTableSource( ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) - extends AbstractBatchStreamTableSource[Row] + extends BatchTableSource[Row] + with StreamTableSource[Row] + with DefinedFieldNames with ProjectableTableSource[Row] { /** @@ -88,6 +90,9 @@ class CsvTableSource( /** Returns the names of the table fields. */ override def getFieldNames: Array[String] = fieldNames + /** Returns the names of the table fields. */ + override def getFieldIndices = TableEnvironment.getFieldIndices(getReturnType) + /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala similarity index 71% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala index 8406bd630285f..bead3e9f75965 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala @@ -19,12 +19,17 @@ package org.apache.flink.table.sources /** - * Partial implementation of the [[BatchStreamTableSource]] trait. + * Trait that defines custom field names and their indices in the underlying + * data type. * - * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]]. + * Should be extended together with [[TableSource]] trait. */ -abstract class AbstractBatchStreamTableSource[T] - extends AbstractTableSource[T] - with BatchStreamTableSource[T] { +trait DefinedFieldNames { + + /** Returns the names of the table fields. */ + def getFieldNames: Array[String] + + /** Returns the indices of the table fields. */ + def getFieldIndices: Array[Int] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index cb34bb72eb29f..a3eb03d267b31 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -29,19 +29,13 @@ import org.apache.flink.table.api.TableEnvironment * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case * field names and field indices are derived from the returned type. * - * In case if custom field names are required one need to implement both - * [[TableSource#getFieldsNames]] and [[TableSource#getFieldsIndices]]. + * In case if custom field names are required one need to additionally implement + * the [[DefinedFieldNames]] trait. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the names of the table fields. */ - def getFieldNames: Array[String] - - /** Returns the indices of the table fields. */ - def getFieldIndices: Array[Int] - /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala index 71bcb0162108b..6e4859b2182f0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor} import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.sources.{AbstractBatchTableSource, BatchTableSource, CsvTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource, TableSource} import org.apache.flink.api.scala._ object CommonTestData { @@ -69,7 +69,7 @@ object CommonTestData { } def getNestedTableSource: BatchTableSource[Person] = { - new AbstractBatchTableSource[Person] { + new BatchTableSource[Person] { override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = { val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment executionEnvironment.fromCollection( @@ -81,7 +81,9 @@ object CommonTestData { ) } - override def getReturnType: TypeInformation[Person] = TypeExtractor.getForClass(classOf[Person]) + override def getReturnType: TypeInformation[Person] = { + TypeExtractor.getForClass(classOf[Person]) + } } } From ba8c571977b6ad30aafefec449c4170035df5d51 Mon Sep 17 00:00:00 2001 From: Ivan Mushketyk Date: Tue, 10 Jan 2017 19:46:35 +0000 Subject: [PATCH 7/7] [FLINK-5360] Fix according to review --- .../org/apache/flink/table/sources/CsvTableSource.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 73318a6537a65..4f96d7bacde31 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -55,7 +55,6 @@ class CsvTableSource( lenient: Boolean = false) extends BatchTableSource[Row] with StreamTableSource[Row] - with DefinedFieldNames with ProjectableTableSource[Row] { /** @@ -74,7 +73,7 @@ class CsvTableSource( throw TableException("Number of field names and field types must be equal.") } - private val returnType = new RowTypeInfo(fieldTypes: _*) + private val returnType = new RowTypeInfo(fieldTypes, fieldNames) private var selectedFields: Array[Int] = fieldTypes.indices.toArray @@ -87,11 +86,6 @@ class CsvTableSource( override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType) } - /** Returns the names of the table fields. */ - override def getFieldNames: Array[String] = fieldNames - - /** Returns the names of the table fields. */ - override def getFieldIndices = TableEnvironment.getFieldIndices(getReturnType) /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType