Skip to content

Commit

Permalink
[FLINK-13391][table-planner-blink] blink should not invoke deprecated…
Browse files Browse the repository at this point in the history
… getReturnType of TableSource
  • Loading branch information
JingsongLi committed Jul 24, 2019
1 parent a7e8203 commit b465c69
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
Expand Up @@ -18,9 +18,15 @@

package org.apache.flink.table.plan.nodes.physical

import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.core.io.InputSplit
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.sources.{InputFormatTableSource, StreamTableSource, TableSource}
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelWriter
Expand All @@ -38,6 +44,9 @@ abstract class PhysicalTableSourceScan(
relOptTable: FlinkRelOptTable)
extends TableScan(cluster, traitSet, relOptTable) {

// cache table source transformation.
protected var sourceTransform: Transformation[_] = _

protected val tableSourceTable: TableSourceTable[_] =
relOptTable.unwrap(classOf[TableSourceTable[_]])

Expand All @@ -52,4 +61,19 @@ abstract class PhysicalTableSourceScan(
super.explainTerms(pw).item("fields", getRowType.getFieldNames.asScala.mkString(", "))
}

def getSourceTransformation(
streamEnv: StreamExecutionEnvironment): Transformation[_] = {
if (sourceTransform == null) {
sourceTransform = tableSource match {
case source: InputFormatTableSource[_] =>
val resultType = fromDataTypeToLegacyInfo(source.getProducedDataType)
.asInstanceOf[TypeInformation[Any]]
streamEnv.createInput(
source.getInputFormat.asInstanceOf[InputFormat[Any, _ <: InputSplit]],
resultType).getTransformation
case source: StreamTableSource[_] => source.getDataStream(streamEnv).getTransformation
}
}
sourceTransform
}
}
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.api.dag.Transformation
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableException
import org.apache.flink.table.codegen.CodeGeneratorContext
import org.apache.flink.table.dataformat.BaseRow
Expand Down Expand Up @@ -53,9 +52,6 @@ class BatchExecTableSourceScan(
with BatchPhysicalRel
with BatchExecNode[BaseRow]{

// cache table source transformation.
private var sourceTransform: Transformation[_] = _

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecTableSourceScan(cluster, traitSet, relOptTable)
}
Expand Down Expand Up @@ -83,15 +79,6 @@ class BatchExecTableSourceScan(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}

def getSourceTransformation(
streamEnv: StreamExecutionEnvironment): Transformation[_] = {
if (sourceTransform == null) {
sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
getDataStream(streamEnv).getTransformation
}
sourceTransform
}

override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
Expand Down
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes.physical.stream

import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{DataTypes, TableException}
Expand Down Expand Up @@ -60,9 +59,6 @@ class StreamExecTableSourceScan(
with StreamPhysicalRel
with StreamExecNode[BaseRow] {

// cache table source transformation.
private var sourceTransform: Transformation[_] = _

override def producesUpdates: Boolean = false

override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
Expand Down Expand Up @@ -95,15 +91,6 @@ class StreamExecTableSourceScan(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}

def getSourceTransformation(
streamEnv: StreamExecutionEnvironment): Transformation[_] = {
if (sourceTransform == null) {
sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
getDataStream(streamEnv).getTransformation
}
sourceTransform
}

override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
Expand Down
Expand Up @@ -613,7 +613,8 @@ class TestInputFormatTableSource[T](
new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig))
}

override def getReturnType: TypeInformation[T] = returnType
override def getReturnType: TypeInformation[T] =
throw new RuntimeException("Should not invoke this deprecated method.")

override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)

Expand Down

0 comments on commit b465c69

Please sign in to comment.