Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class DataStreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends StreamScan(cluster, traitSet, table, rowType) {
rowRelDataType: RelDataType)
extends StreamScan(cluster, traitSet, table) {

val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])

override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamScan(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ import scala.collection.JavaConverters._
abstract class StreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowRelDataType: RelDataType)
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with DataStreamRel {

override def deriveRowType() = rowRelDataType

protected def convertToExpectedType(
input: DataStream[Any],
flinkTable: FlinkTable[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@ package org.apache.flink.api.table.plan.nodes.datastream

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.api.table.plan.schema.TableSourceTable
import org.apache.flink.api.table.sources.StreamTableSource
import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream

/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
class StreamTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends StreamScan(cluster, traitSet, table, rowType) {
tableSource: StreamTableSource[_])
extends StreamScan(cluster, traitSet, table) {

val tableSourceTable = table.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
override def deriveRowType() = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
}

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamTableSourceScan(
cluster,
traitSet,
table,
rowType
getTable,
tableSource
)
}

Expand All @@ -55,7 +56,7 @@ class StreamTableSourceScan(
val inputDataStream: DataStream[Any] = tableSource
.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]

convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config)
convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class StreamTableSourceScanRule
val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)

// The original registered table source
val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]

new StreamTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
rel.getRowType
tableSource
)
}
}
Expand Down