Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}

/**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @return a struct type with the input fieldNames and input fieldTypes
*/
def buildRowDataType(
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]])
: RelDataType = {
val rowDataTypeBuilder = builder
fieldNames
.zip(fieldTypes)
.foreach { f =>
rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
}
rowDataTypeBuilder.build
}

override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// always set those to default value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -35,13 +34,10 @@ import scala.collection.JavaConverters._
abstract class BatchScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowRelDataType: RelDataType)
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with DataSetRel {

override def deriveRowType() = rowRelDataType

override def toString: String = {
s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
import org.apache.flink.api.table.plan.schema.TableSourceTable
import org.apache.flink.api.table.sources.BatchTableSource

Expand All @@ -32,18 +31,20 @@ class BatchTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
val tableSource: BatchTableSource[_])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to declare this field as val ? It seems that it is only used in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuchong , Thanks for your review. For later use, the projection pushdown rule needs to get the actual tablesource of the BatchTableSourceScan, then do something on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get you !

extends BatchScan(cluster, traitSet, table) {

val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]]
override def deriveRowType() = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
}

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new BatchTableSourceScan(
cluster,
traitSet,
getTable,
getRowType
tableSource
)
}

Expand All @@ -54,6 +55,6 @@ class BatchTableSourceScan(
val config = tableEnv.getConfig
val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]

convertToExpectedType(inputDataSet, tableSourceTable, expectedType, config)
convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class DataSetScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
rowRelDataType: RelDataType)
extends BatchScan(cluster, traitSet, table) {

val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])

override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetScan(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ class BatchTableSourceScanRule
val scan: TableScan = rel.asInstanceOf[TableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)

val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
.asInstanceOf[BatchTableSource[_]]
new BatchTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
rel.getRowType
tableSource
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ abstract class FlinkTable[T](

override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
val builder = flinkTypeFactory.builder
fieldNames
.zip(fieldTypes)
.foreach { f =>
builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
}
builder.build
flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
}

}