From b465c69b21a3ac810bc34e4f95dc0b3a3d93281c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 24 Jul 2019 12:24:32 +0800 Subject: [PATCH] [FLINK-13391][table-planner-blink] blink should not invoke deprecated getReturnType of TableSource --- .../physical/PhysicalTableSourceScan.scala | 26 ++++++++++++++++++- .../batch/BatchExecTableSourceScan.scala | 13 ---------- .../stream/StreamExecTableSourceScan.scala | 13 ---------- .../flink/table/util/testTableSources.scala | 3 ++- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/PhysicalTableSourceScan.scala index 6b6675e29753da..a7f102cdf8c50b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/PhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/PhysicalTableSourceScan.scala @@ -18,9 +18,15 @@ package org.apache.flink.table.plan.nodes.physical +import org.apache.flink.api.common.io.InputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.dag.Transformation +import org.apache.flink.core.io.InputSplit +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable} -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.sources.{InputFormatTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelWriter @@ -38,6 +44,9 @@ abstract class PhysicalTableSourceScan( relOptTable: FlinkRelOptTable) extends TableScan(cluster, traitSet, relOptTable) { + // cache table source transformation. + protected var sourceTransform: Transformation[_] = _ + protected val tableSourceTable: TableSourceTable[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]) @@ -52,4 +61,19 @@ abstract class PhysicalTableSourceScan( super.explainTerms(pw).item("fields", getRowType.getFieldNames.asScala.mkString(", ")) } + def getSourceTransformation( + streamEnv: StreamExecutionEnvironment): Transformation[_] = { + if (sourceTransform == null) { + sourceTransform = tableSource match { + case source: InputFormatTableSource[_] => + val resultType = fromDataTypeToLegacyInfo(source.getProducedDataType) + .asInstanceOf[TypeInformation[Any]] + streamEnv.createInput( + source.getInputFormat.asInstanceOf[InputFormat[Any, _ <: InputSplit]], + resultType).getTransformation + case source: StreamTableSource[_] => source.getDataStream(streamEnv).getTransformation + } + } + sourceTransform + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index 8b058c495355bc..1ddf8701a20d81 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.api.dag.Transformation import org.apache.flink.runtime.operators.DamBehavior -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.TableException import org.apache.flink.table.codegen.CodeGeneratorContext import org.apache.flink.table.dataformat.BaseRow @@ -53,9 +52,6 @@ class BatchExecTableSourceScan( with BatchPhysicalRel with BatchExecNode[BaseRow]{ - // cache table source transformation. - private var sourceTransform: Transformation[_] = _ - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new BatchExecTableSourceScan(cluster, traitSet, relOptTable) } @@ -83,15 +79,6 @@ class BatchExecTableSourceScan( replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) } - def getSourceTransformation( - streamEnv: StreamExecutionEnvironment): Transformation[_] = { - if (sourceTransform == null) { - sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]]. - getDataStream(streamEnv).getTransformation - } - sourceTransform - } - override protected def translateToPlanInternal( planner: BatchPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index 27ff6a0870a59b..97ac2ab011c343 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.plan.nodes.physical.stream import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.{DataTypes, TableException} @@ -60,9 +59,6 @@ class StreamExecTableSourceScan( with StreamPhysicalRel with StreamExecNode[BaseRow] { - // cache table source transformation. - private var sourceTransform: Transformation[_] = _ - override def producesUpdates: Boolean = false override def needsUpdatesAsRetraction(input: RelNode): Boolean = false @@ -95,15 +91,6 @@ class StreamExecTableSourceScan( replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode]) } - def getSourceTransformation( - streamEnv: StreamExecutionEnvironment): Transformation[_] = { - if (sourceTransform == null) { - sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]]. - getDataStream(streamEnv).getTransformation - } - sourceTransform - } - override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala index dde0803544540e..ee7a6adaa011a0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala @@ -613,7 +613,8 @@ class TestInputFormatTableSource[T]( new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig)) } - override def getReturnType: TypeInformation[T] = returnType + override def getReturnType: TypeInformation[T] = + throw new RuntimeException("Should not invoke this deprecated method.") override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)