Skip to content

Commit

Permalink
[FLINK-11975][table-planner-blink] Support running a simple select fr…
Browse files Browse the repository at this point in the history
…om values query (apache#8035)
  • Loading branch information
beyond1920 authored and HuangZhenQiu committed Apr 19, 2019
1 parent 4642970 commit 1e72f65
Show file tree
Hide file tree
Showing 39 changed files with 2,012 additions and 22 deletions.
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.flink.table.api

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.plan.nodes.calcite.LogicalSink
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.optimize.{BatchOptimizer, Optimizer}
import org.apache.flink.table.plan.schema.{BatchTableSourceTable, TableSourceSinkTable, TableSourceTable}
Expand All @@ -32,6 +36,8 @@ import org.apache.calcite.plan.ConventionTraitDef
import org.apache.calcite.rel.RelCollationTraitDef
import org.apache.calcite.sql.SqlExplainLevel

import _root_.scala.collection.JavaConversions._

/**
* A session to construct between [[Table]] and [[DataStream]], its main function is:
*
Expand Down Expand Up @@ -79,6 +85,66 @@ class BatchTableEnvironment(
}
}

/**
* Merge global job parameters and table config parameters,
* and set the merged result to GlobalJobParameters
*/
private def mergeParameters(): Unit = {
if (streamEnv != null && streamEnv.getConfig != null) {
val parameters = new Configuration()
if (config != null && config.getConf != null) {
parameters.addAll(config.getConf)
}

if (streamEnv.getConfig.getGlobalJobParameters != null) {
streamEnv.getConfig.getGlobalJobParameters.toMap.foreach {
kv => parameters.setString(kv._1, kv._2)
}
}

streamEnv.getConfig.setGlobalJobParameters(parameters)
}
}

/**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
* [[TableSink]] to write it.
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
* @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
*/
override private[table] def writeToSink[T](
table: Table,
sink: TableSink[T],
sinkName: String): Unit = {
mergeParameters()

val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
val optimizedPlan = optimize(sinkNode)
val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
require(optimizedNodes.size() == 1)
translateToPlan(optimizedNodes.head)
}

/**
* Translates a [[BatchExecNode]] plan into a [[StreamTransformation]].
* Converts to target type if necessary.
*
* @param node The plan to translate.
* @return The [[StreamTransformation]] that corresponds to the given node.
*/
private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = {
node match {
case node: BatchExecNode[_] => node.translateToPlan(this)
case _ =>
throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
}

/**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
Expand Down
Expand Up @@ -18,22 +18,33 @@

package org.apache.flink.table.api

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, UpdateAsRetractionTraitDef}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.calcite.LogicalSink
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.optimize.{Optimizer, StreamOptimizer}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, UpdateAsRetractionTraitDef}
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo

import org.apache.calcite.plan.ConventionTraitDef
import org.apache.calcite.rel.RelCollationTraitDef
import org.apache.calcite.sql.SqlExplainLevel

import _root_.scala.collection.JavaConversions._

/**
* The base class for stream TableEnvironments.
*
Expand All @@ -60,6 +71,8 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r

private var isConfigMerged: Boolean = false

override def queryConfig: StreamQueryConfig = new StreamQueryConfig

override protected def getOptimizer: Optimizer = new StreamOptimizer(this)
Expand Down Expand Up @@ -91,6 +104,102 @@ abstract class StreamTableEnvironment(
AccModeTraitDef.INSTANCE)
)

/**
* Merge global job parameters and table config parameters,
* and set the merged result to GlobalJobParameters
*/
private def mergeParameters(): Unit = {
if (!isConfigMerged && execEnv != null && execEnv.getConfig != null) {
val parameters = new Configuration()
if (config != null && config.getConf != null) {
parameters.addAll(config.getConf)
}

if (execEnv.getConfig.getGlobalJobParameters != null) {
execEnv.getConfig.getGlobalJobParameters.toMap.foreach {
kv => parameters.setString(kv._1, kv._2)
}
}
val isHeapState = Option(execEnv.getStateBackend) match {
case Some(backend) if backend.isInstanceOf[MemoryStateBackend] ||
backend.isInstanceOf[FsStateBackend]=> true
case None => true
case _ => false
}
parameters.setBoolean(TableConfigOptions.SQL_EXEC_STATE_BACKEND_ON_HEAP, isHeapState)
execEnv.getConfig.setGlobalJobParameters(parameters)
isConfigMerged = true
}
}

/**
* Writes a [[Table]] to a [[TableSink]].
*
* Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
* [[TableSink]] to write it.
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
* @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
*/
override private[table] def writeToSink[T](
table: Table,
sink: TableSink[T],
sinkName: String): Unit = {
val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
translateSink(sinkNode)
}

/**
* Translates a [[Table]] into a [[DataStream]].
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
* @param updatesAsRetraction Set to true to encode updates as retraction messages.
* @param withChangeFlag Set to true to emit records with change flags.
* @param resultType The [[org.apache.flink.api.common.typeinfo.TypeInformation[_]] of
* the resulting [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translateToDataStream[T](
table: Table,
updatesAsRetraction: Boolean,
withChangeFlag: Boolean,
resultType: TypeInformation[T]): DataStream[T] = {
val sink = new DataStreamTableSink[T](table, resultType, updatesAsRetraction, withChangeFlag)
val sinkName = createUniqueTableName()
val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
val transformation = translateSink(sinkNode)
new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
}

private def translateSink(sink: LogicalSink): StreamTransformation[_] = {
mergeParameters()

val optimizedPlan = optimize(sink)
val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
require(optimizedNodes.size() == 1)
translateToPlan(optimizedNodes.head)
}

/**
* Translates a [[StreamExecNode]] plan into a [[StreamTransformation]].
*
* @param node The plan to translate.
* @return The [[StreamTransformation]] of type [[BaseRow]].
*/
private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = {
node match {
case node: StreamExecNode[_] => node.translateToPlan(this)
case _ =>
throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
}

/**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
Expand Down
Expand Up @@ -18,16 +18,20 @@

package org.apache.flink.table.api

import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.plan.optimize.Optimizer
import org.apache.flink.table.plan.schema.RelTable
import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.types.Row

Expand Down Expand Up @@ -151,6 +155,18 @@ abstract class TableEnvironment(val config: TableConfig) {
/** Returns specific query [[Optimizer]] depends on the concrete type of this TableEnvironment. */
protected def getOptimizer: Optimizer

/**
* Writes a [[Table]] to a [[TableSink]].
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
* @tparam T The data type that the [[TableSink]] expects.
*/
private[table] def writeToSink[T](
table: Table,
sink: TableSink[T],
sinkName: String = null): Unit

/**
* Generates the optimized [[RelNode]] dag from the original relational nodes.
*
Expand All @@ -171,6 +187,17 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
private[flink] def optimize(root: RelNode): RelNode = optimize(Seq(root)).head

/**
* Convert [[org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel]] DAG
* to [[ExecNode]] DAG and translate them.
*/
@VisibleForTesting
private[flink] def translateNodeDag(rels: Seq[RelNode]): Seq[ExecNode[_, _]] = {
require(rels.nonEmpty && rels.forall(_.isInstanceOf[FlinkPhysicalRel]))
// convert FlinkPhysicalRel DAG to ExecNode DAG
rels.map(_.asInstanceOf[ExecNode[_, _]])
}

/**
* Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
* Registered tables can be referenced in SQL queries.
Expand Down
Expand Up @@ -18,9 +18,14 @@

package org.apache.flink.table.api

import org.apache.calcite.rel.RelNode
import org.apache.flink.table.calcite.FlinkTypeFactory._
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.table.`type`.TypeConverters.createInternalTypeInfoFromInternalType

import org.apache.calcite.rel.RelNode

import _root_.scala.collection.JavaConversions._

/**
* The implementation of the [[Table]].
Expand All @@ -35,12 +40,22 @@ import org.apache.flink.table.functions.TemporalTableFunction
*/
class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends Table {

private lazy val tableSchema: TableSchema = {
val rowType = relNode.getRowType
val fieldNames = rowType.getFieldList.map(_.getName)
val fieldTypes = rowType.getFieldList map { tp =>
val internalType = toInternalType(tp.getType)
createInternalTypeInfoFromInternalType(internalType)
}
new TableSchema(fieldNames.toArray, fieldTypes.toArray)
}

/**
* Returns the Calcite RelNode represent this Table.
*/
def getRelNode: RelNode = relNode

override def getSchema: TableSchema = ???
override def getSchema: TableSchema = tableSchema

override def printSchema(): Unit = ???

Expand Down

0 comments on commit 1e72f65

Please sign in to comment.