From d0718ff512a931cf0bb024df516a36215f486df5 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Tue, 29 Nov 2016 12:26:02 +0800 Subject: [PATCH 1/5] modify constructor of BatchScan, BatchTableSourceScan, DataSetScan Test Plan: junit Reviewers: kete.yangkt Differential Revision: http://phabricator.taobao.net/D6601 --- .../table/plan/nodes/dataset/BatchScan.scala | 5 +- .../nodes/dataset/BatchTableSourceScan.scala | 48 ++++++++++++++----- .../plan/nodes/dataset/DataSetScan.scala | 6 ++- .../dataSet/BatchTableSourceScanRule.scala | 3 +- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 15b2081544fc8..4533975d87e32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -35,13 +35,10 @@ import scala.collection.JavaConverters._ abstract class BatchScan( cluster: RelOptCluster, traitSet: RelTraitSet, - table: RelOptTable, - rowRelDataType: RelDataType) + table: RelOptTable) extends TableScan(cluster, traitSet, table) with DataSetRel { - override def deriveRowType() = rowRelDataType - override def toString: String = { s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index 10d95344d7316..cfe1995bc1972 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -23,37 +23,59 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory} import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.BatchTableSource /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - rowType: RelDataType) - extends BatchScan(cluster, traitSet, table, rowType) { + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + tableSource: Option[BatchTableSource[_]] = None) + extends BatchScan(cluster, traitSet, table) { + + override def deriveRowType() = { + tableSource match { + case Some(ts) => + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + val builder = flinkTypeFactory.builder + ts.getFieldsNames + .zip(ts.getFieldTypes) + .foreach { f => + builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) + } + builder.build + case None => getTable.getRowType + } + } - val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) - val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]] override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new BatchTableSourceScan( cluster, traitSet, getTable, - getRowType + tableSource ) } override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { val config = tableEnv.getConfig - val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDataSet, tableSourceTable, expectedType, config) + tableSource match { + case Some(ts) => + val inputDs = ts.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] + convertToExpectedType(inputDs, new TableSourceTable(ts), expectedType, config) + case None => + val originTableSourceTable = getTable.unwrap(classOf[TableSourceTable]) + val originTableSource = originTableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]] + val originDs = originTableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] + convertToExpectedType(originDs, originTableSourceTable, expectedType, config) + + } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala index 3c34bc3839a87..b7831368c8052 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala @@ -35,11 +35,13 @@ class DataSetScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) - extends BatchScan(cluster, traitSet, table, rowType) { + rowRelDataType: RelDataType) + extends BatchScan(cluster, traitSet, table) { val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]]) + override def deriveRowType() = rowRelDataType + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetScan( cluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 1a0d2a1e670c8..5ada2b739a949 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -60,8 +60,7 @@ class BatchTableSourceScanRule new BatchTableSourceScan( rel.getCluster, traitSet, - scan.getTable, - rel.getRowType + scan.getTable ) } } From 068b2ec7df9207bf246305da3dd744fcd339844a Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Wed, 30 Nov 2016 09:21:42 +0800 Subject: [PATCH 2/5] modify code style and extract common method --- .../table/plan/nodes/dataset/BatchScan.scala | 1 - .../nodes/dataset/BatchTableSourceScan.scala | 22 ++++----- .../api/table/plan/schema/FlinkTable.scala | 10 ++-- .../api/table/typeutils/RowTypeBuilder.scala | 47 +++++++++++++++++++ 4 files changed, 60 insertions(+), 20 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 4533975d87e32..a6de2378548ff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -19,7 +19,6 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan._ -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.common.typeinfo.TypeInformation diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index cfe1995bc1972..b504fd47ac0b4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -20,32 +20,26 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory} import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.BatchTableSource +import org.apache.flink.api.table.typeutils.RowTypeBuilder /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - tableSource: Option[BatchTableSource[_]] = None) + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + tableSource: Option[BatchTableSource[_]] = None) extends BatchScan(cluster, traitSet, table) { override def deriveRowType() = { tableSource match { case Some(ts) => val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - val builder = flinkTypeFactory.builder - ts.getFieldsNames - .zip(ts.getFieldTypes) - .foreach { f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) - } - builder.build + RowTypeBuilder.build(flinkTypeFactory, ts.getFieldsNames, ts.getFieldTypes) case None => getTable.getRowType } } @@ -78,4 +72,8 @@ class BatchTableSourceScan( } } + + def actualTableSource() = { + tableSource.getOrElse(getTable.unwrap(classOf[TableSourceTable]).tableSource.asInstanceOf[BatchTableSource[_]]) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala index d95b513a540d4..500ff4eba952c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala @@ -22,6 +22,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.typeutils.RowTypeBuilder import org.apache.flink.api.table.{FlinkTypeFactory, TableException} abstract class FlinkTable[T]( @@ -60,13 +61,8 @@ abstract class FlinkTable[T]( override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - val builder = flinkTypeFactory.builder - fieldNames - .zip(fieldTypes) - .foreach { f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) - } - builder.build + RowTypeBuilder.build(flinkTypeFactory, fieldNames, fieldTypes) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala new file mode 100644 index 0000000000000..ac29a1d3ff531 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.FlinkTypeFactory +import org.apache.calcite.rel.`type`.RelDataType + + +object RowTypeBuilder { + + /** + * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory + * + * @param flinkTypeFactory + * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] + * and Calcite's [[RelDataType]] + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's [[TypeInformation]] + * @return a struct type with the input fieldNames and input fieldTypes + */ + def build(flinkTypeFactory: FlinkTypeFactory, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): RelDataType = { + val builder = flinkTypeFactory.builder + fieldNames + .zip(fieldTypes) + .foreach { f => + builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) + } + builder.build + } +} From fe0ac06e7a76663100a76ec91c8873de8def548b Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Wed, 30 Nov 2016 11:21:36 +0800 Subject: [PATCH 3/5] let rule decide which tableSource to create a BatchTableScan --- .../nodes/dataset/BatchTableSourceScan.scala | 27 +++++-------------- .../dataSet/BatchTableSourceScanRule.scala | 5 ++-- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index b504fd47ac0b4..e894a9d88fd61 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -32,16 +32,13 @@ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - tableSource: Option[BatchTableSource[_]] = None) + tableSource: BatchTableSource[_]) extends BatchScan(cluster, traitSet, table) { override def deriveRowType() = { - tableSource match { - case Some(ts) => - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - RowTypeBuilder.build(flinkTypeFactory, ts.getFieldsNames, ts.getFieldTypes) - case None => getTable.getRowType - } + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + RowTypeBuilder.build(flinkTypeFactory, tableSource.getFieldsNames, tableSource.getFieldTypes) + } @@ -59,21 +56,11 @@ class BatchTableSourceScan( expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { val config = tableEnv.getConfig - - tableSource match { - case Some(ts) => - val inputDs = ts.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDs, new TableSourceTable(ts), expectedType, config) - case None => - val originTableSourceTable = getTable.unwrap(classOf[TableSourceTable]) - val originTableSource = originTableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]] - val originDs = originTableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(originDs, originTableSourceTable, expectedType, config) - - } + val inputDs = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] + convertToExpectedType(inputDs, new TableSourceTable(tableSource), expectedType, config) } def actualTableSource() = { - tableSource.getOrElse(getTable.unwrap(classOf[TableSourceTable]).tableSource.asInstanceOf[BatchTableSource[_]]) + tableSource } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 5ada2b739a949..f75906f9cf544 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -56,11 +56,12 @@ class BatchTableSourceScanRule def convert(rel: RelNode): RelNode = { val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource.asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, traitSet, - scan.getTable + scan.getTable, + tableSource ) } } From 1e4bebaaf2d0e23dc561d42feceecb4537336b57 Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Thu, 1 Dec 2016 11:45:34 +0800 Subject: [PATCH 4/5] Decouple BatchTableSourceScan with TableSourceTable --- .../flink/api/table/FlinkTypeFactory.scala | 17 +++++++ .../nodes/dataset/BatchTableSourceScan.scala | 18 +++---- .../dataSet/BatchTableSourceScanRule.scala | 1 + .../api/table/plan/schema/FlinkTable.scala | 4 +- .../api/table/typeutils/RowTypeBuilder.scala | 47 ------------------- 5 files changed, 25 insertions(+), 62 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index ee71ce9dd7842..02c9db05a3b66 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -72,6 +72,23 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } } + /** + * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory + * + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's [[TypeInformation]] + * @return a struct type with the input fieldNames and input fieldTypes + */ + def buildRowDataType(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): RelDataType = { + val rowDataTypeBuilder = builder + fieldNames + .zip(fieldTypes) + .foreach { f => + rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true) + } + rowDataTypeBuilder.build + } + override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue // always set those to default value diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index e894a9d88fd61..14da86296e816 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -25,23 +25,20 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory} import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.BatchTableSource -import org.apache.flink.api.table.typeutils.RowTypeBuilder /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - tableSource: BatchTableSource[_]) + val tableSource: BatchTableSource[_]) extends BatchScan(cluster, traitSet, table) { override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - RowTypeBuilder.build(flinkTypeFactory, tableSource.getFieldsNames, tableSource.getFieldTypes) - + flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) } - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new BatchTableSourceScan( cluster, @@ -52,15 +49,12 @@ class BatchTableSourceScan( } override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { val config = tableEnv.getConfig - val inputDs = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDs, new TableSourceTable(tableSource), expectedType, config) - } + val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - def actualTableSource() = { - tableSource + convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index f75906f9cf544..44d09a2414be6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -56,6 +56,7 @@ class BatchTableSourceScanRule def convert(rel: RelNode): RelNode = { val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource.asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala index 500ff4eba952c..84d6d7ead2fbc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala @@ -22,7 +22,6 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.typeutils.RowTypeBuilder import org.apache.flink.api.table.{FlinkTypeFactory, TableException} abstract class FlinkTable[T]( @@ -61,8 +60,7 @@ abstract class FlinkTable[T]( override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - RowTypeBuilder.build(flinkTypeFactory, fieldNames, fieldTypes) - + flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala deleted file mode 100644 index ac29a1d3ff531..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeBuilder.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.FlinkTypeFactory -import org.apache.calcite.rel.`type`.RelDataType - - -object RowTypeBuilder { - - /** - * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory - * - * @param flinkTypeFactory - * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] - * and Calcite's [[RelDataType]] - * @param fieldNames field names - * @param fieldTypes field types, every element is Flink's [[TypeInformation]] - * @return a struct type with the input fieldNames and input fieldTypes - */ - def build(flinkTypeFactory: FlinkTypeFactory, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): RelDataType = { - val builder = flinkTypeFactory.builder - fieldNames - .zip(fieldTypes) - .foreach { f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) - } - builder.build - } -} From 7d558b723a3241a63b1ecdcfb92f15b7676f71bc Mon Sep 17 00:00:00 2001 From: beyond1920 Date: Thu, 1 Dec 2016 15:36:42 +0800 Subject: [PATCH 5/5] make long length shorter to pass the flink code style check --- .../scala/org/apache/flink/api/table/FlinkTypeFactory.scala | 5 ++++- .../table/plan/rules/dataSet/BatchTableSourceScanRule.scala | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index 02c9db05a3b66..12dace4922c8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -79,7 +79,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * @param fieldTypes field types, every element is Flink's [[TypeInformation]] * @return a struct type with the input fieldNames and input fieldTypes */ - def buildRowDataType(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): RelDataType = { + def buildRowDataType( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]) + : RelDataType = { val rowDataTypeBuilder = builder fieldNames .zip(fieldTypes) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 44d09a2414be6..8e3d8bb159696 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -57,7 +57,8 @@ class BatchTableSourceScanRule val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource.asInstanceOf[BatchTableSource[_]] + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource + .asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, traitSet,