diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 95838cfa867426..8991aef374634b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -17,9 +17,13 @@ */ package org.apache.flink.table.api +import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.transformations.StreamTransformation import org.apache.flink.table.calcite.FlinkRelBuilder +import org.apache.flink.table.plan.nodes.calcite.LogicalSink +import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode} import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef import org.apache.flink.table.plan.optimize.{BatchOptimizer, Optimizer} import org.apache.flink.table.plan.schema.{BatchTableSourceTable, TableSourceSinkTable, TableSourceTable} @@ -32,6 +36,8 @@ import org.apache.calcite.plan.ConventionTraitDef import org.apache.calcite.rel.RelCollationTraitDef import org.apache.calcite.sql.SqlExplainLevel +import _root_.scala.collection.JavaConversions._ + /** * A session to construct between [[Table]] and [[DataStream]], its main function is: * @@ -79,6 +85,66 @@ class BatchTableEnvironment( } } + /** + * Merge global job parameters and table config parameters, + * and set the merged result to GlobalJobParameters + */ + private def mergeParameters(): Unit = { + if (streamEnv != null && streamEnv.getConfig != null) { + val parameters = new Configuration() + if (config != null && config.getConf != null) { + parameters.addAll(config.getConf) + } + + if (streamEnv.getConfig.getGlobalJobParameters != null) { + streamEnv.getConfig.getGlobalJobParameters.toMap.foreach { + kv => parameters.setString(kv._1, kv._2) + } + } + + streamEnv.getConfig.setGlobalJobParameters(parameters) + } + } + + /** + * Writes a [[Table]] to a [[TableSink]]. + * + * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the + * [[TableSink]] to write it. + * + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. + * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. + */ + override private[table] def writeToSink[T]( + table: Table, + sink: TableSink[T], + sinkName: String): Unit = { + mergeParameters() + + val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName) + val optimizedPlan = optimize(sinkNode) + val optimizedNodes = translateNodeDag(Seq(optimizedPlan)) + require(optimizedNodes.size() == 1) + translateToPlan(optimizedNodes.head) + } + + /** + * Translates a [[BatchExecNode]] plan into a [[StreamTransformation]]. + * Converts to target type if necessary. + * + * @param node The plan to translate. + * @return The [[StreamTransformation]] that corresponds to the given node. + */ + private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = { + node match { + case node: BatchExecNode[_] => node.translateToPlan(this) + case _ => + throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") + } + } + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index f285214a405f37..abb82ce214a6e3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -18,15 +18,24 @@ package org.apache.flink.table.api +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.filesystem.FsStateBackend +import org.apache.flink.runtime.state.memory.MemoryStateBackend import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.transformations.StreamTransformation import org.apache.flink.table.calcite.FlinkRelBuilder -import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, UpdateAsRetractionTraitDef} +import org.apache.flink.table.dataformat.BaseRow +import org.apache.flink.table.plan.nodes.calcite.LogicalSink +import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.plan.optimize.{Optimizer, StreamOptimizer} import org.apache.flink.table.plan.schema._ import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, UpdateAsRetractionTraitDef} import org.apache.flink.table.plan.util.FlinkRelOptUtil +import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo @@ -34,6 +43,8 @@ import org.apache.calcite.plan.ConventionTraitDef import org.apache.calcite.rel.RelCollationTraitDef import org.apache.calcite.sql.SqlExplainLevel +import _root_.scala.collection.JavaConversions._ + /** * The base class for stream TableEnvironments. * @@ -60,6 +71,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r + private var isConfigMerged: Boolean = false + override def queryConfig: StreamQueryConfig = new StreamQueryConfig override protected def getOptimizer: Optimizer = new StreamOptimizer(this) @@ -91,6 +104,102 @@ abstract class StreamTableEnvironment( AccModeTraitDef.INSTANCE) ) + /** + * Merge global job parameters and table config parameters, + * and set the merged result to GlobalJobParameters + */ + private def mergeParameters(): Unit = { + if (!isConfigMerged && execEnv != null && execEnv.getConfig != null) { + val parameters = new Configuration() + if (config != null && config.getConf != null) { + parameters.addAll(config.getConf) + } + + if (execEnv.getConfig.getGlobalJobParameters != null) { + execEnv.getConfig.getGlobalJobParameters.toMap.foreach { + kv => parameters.setString(kv._1, kv._2) + } + } + val isHeapState = Option(execEnv.getStateBackend) match { + case Some(backend) if backend.isInstanceOf[MemoryStateBackend] || + backend.isInstanceOf[FsStateBackend]=> true + case None => true + case _ => false + } + parameters.setBoolean(TableConfigOptions.SQL_EXEC_STATE_BACKEND_ON_HEAP, isHeapState) + execEnv.getConfig.setGlobalJobParameters(parameters) + isConfigMerged = true + } + } + + /** + * Writes a [[Table]] to a [[TableSink]]. + * + * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the + * [[TableSink]] to write it. + * + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. + * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. + */ + override private[table] def writeToSink[T]( + table: Table, + sink: TableSink[T], + sinkName: String): Unit = { + val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName) + translateSink(sinkNode) + } + + /** + * Translates a [[Table]] into a [[DataStream]]. + * + * The transformation involves optimizing the relational expression tree as defined by + * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. + * + * @param table The root node of the relational expression tree. + * @param updatesAsRetraction Set to true to encode updates as retraction messages. + * @param withChangeFlag Set to true to emit records with change flags. + * @param resultType The [[org.apache.flink.api.common.typeinfo.TypeInformation[_]] of + * the resulting [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The [[DataStream]] that corresponds to the translated [[Table]]. + */ + protected def translateToDataStream[T]( + table: Table, + updatesAsRetraction: Boolean, + withChangeFlag: Boolean, + resultType: TypeInformation[T]): DataStream[T] = { + val sink = new DataStreamTableSink[T](table, resultType, updatesAsRetraction, withChangeFlag) + val sinkName = createUniqueTableName() + val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName) + val transformation = translateSink(sinkNode) + new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]] + } + + private def translateSink(sink: LogicalSink): StreamTransformation[_] = { + mergeParameters() + + val optimizedPlan = optimize(sink) + val optimizedNodes = translateNodeDag(Seq(optimizedPlan)) + require(optimizedNodes.size() == 1) + translateToPlan(optimizedNodes.head) + } + + /** + * Translates a [[StreamExecNode]] plan into a [[StreamTransformation]]. + * + * @param node The plan to translate. + * @return The [[StreamTransformation]] of type [[BaseRow]]. + */ + private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = { + node match { + case node: StreamExecNode[_] => node.translateToPlan(this) + case _ => + throw new TableException("Cannot generate DataStream due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") + } + } + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4c2bdc43f3ce79..82fe8e49608921 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.api +import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{RowTypeInfo, _} @@ -25,9 +26,12 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.plan.cost.FlinkCostFactory +import org.apache.flink.table.plan.nodes.exec.ExecNode +import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel import org.apache.flink.table.plan.optimize.Optimizer import org.apache.flink.table.plan.schema.RelTable import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource import org.apache.flink.types.Row @@ -151,6 +155,18 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns specific query [[Optimizer]] depends on the concrete type of this TableEnvironment. */ protected def getOptimizer: Optimizer + /** + * Writes a [[Table]] to a [[TableSink]]. + * + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. + * @tparam T The data type that the [[TableSink]] expects. + */ + private[table] def writeToSink[T]( + table: Table, + sink: TableSink[T], + sinkName: String = null): Unit + /** * Generates the optimized [[RelNode]] dag from the original relational nodes. * @@ -171,6 +187,17 @@ abstract class TableEnvironment(val config: TableConfig) { */ private[flink] def optimize(root: RelNode): RelNode = optimize(Seq(root)).head + /** + * Convert [[org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel]] DAG + * to [[ExecNode]] DAG and translate them. + */ + @VisibleForTesting + private[flink] def translateNodeDag(rels: Seq[RelNode]): Seq[ExecNode[_, _]] = { + require(rels.nonEmpty && rels.forall(_.isInstanceOf[FlinkPhysicalRel])) + // convert FlinkPhysicalRel DAG to ExecNode DAG + rels.map(_.asInstanceOf[ExecNode[_, _]]) + } + /** * Registers a [[Table]] under a unique name in the TableEnvironment's catalog. * Registered tables can be referenced in SQL queries. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala index a025f454fd0ff7..4d589083699a43 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala @@ -18,9 +18,14 @@ package org.apache.flink.table.api -import org.apache.calcite.rel.RelNode +import org.apache.flink.table.calcite.FlinkTypeFactory._ import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.`type`.TypeConverters.createInternalTypeInfoFromInternalType + +import org.apache.calcite.rel.RelNode + +import _root_.scala.collection.JavaConversions._ /** * The implementation of the [[Table]]. @@ -35,12 +40,22 @@ import org.apache.flink.table.functions.TemporalTableFunction */ class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends Table { + private lazy val tableSchema: TableSchema = { + val rowType = relNode.getRowType + val fieldNames = rowType.getFieldList.map(_.getName) + val fieldTypes = rowType.getFieldList map { tp => + val internalType = toInternalType(tp.getType) + createInternalTypeInfoFromInternalType(internalType) + } + new TableSchema(fieldNames.toArray, fieldTypes.toArray) + } + /** * Returns the Calcite RelNode represent this Table. */ def getRelNode: RelNode = relNode - override def getSchema: TableSchema = ??? + override def getSchema: TableSchema = tableSchema override def printSchema(): Unit = ??? diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala new file mode 100644 index 00000000000000..2e7d81db7e714c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo} +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.types.Row + +import _root_.java.{lang, math, sql, util} + +import _root_.scala.annotation.varargs + +/** + * This class enumerates all supported types of the Table API & SQL. + */ +object Types { + + /** + * Returns type information for a Table API string or SQL VARCHAR type. + */ + val STRING: TypeInformation[String] = JTypes.STRING + + /** + * Returns type information for a Table API boolean or SQL BOOLEAN type. + */ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** + * Returns type information for a Table API byte or SQL TINYINT type. + */ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** + * Returns type information for a Table API short or SQL SMALLINT type. + */ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** + * Returns type information for a Table API integer or SQL INT/INTEGER type. + */ + val INT: TypeInformation[lang.Integer] = JTypes.INT + + /** + * Returns type information for a Table API long or SQL BIGINT type. + */ + val LONG: TypeInformation[lang.Long] = JTypes.LONG + + /** + * Returns type information for a Table API float or SQL FLOAT/REAL type. + */ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** + * Returns type information for a Table API integer or SQL DOUBLE type. + */ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE + + /** + * Returns type information for a Table API big decimal or SQL DECIMAL type. + */ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** + * Returns type information for a Table API SQL date or SQL DATE type. + */ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** + * Returns type information for a Table API SQL time or SQL TIME type. + */ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** + * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. + */ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** + * Returns type information for a Table API interval of months. + */ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** + * Returns type information for a Table API interval milliseconds. + */ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** + * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. + * + * A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null regardless of the field's type. + * The type of row fields cannot be automatically inferred; therefore, it is required to provide + * type information whenever a row is used. + * + *
The schema of rows can have up to Integer.MAX_VALUE
fields, however, all
+ * row instances must strictly adhere to the schema defined by the type info.
+ *
+ * This method generates type information with fields of the given types; the fields have
+ * the default names (f0, f1, f2 ..).
+ *
+ * @param types The types of the row fields, e.g., Types.STRING, Types.INT
+ */
+ @varargs
+ def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
+ JTypes.ROW(types: _*)
+ }
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
+ * and with given names.
+ *
+ * A row is a variable-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null independent of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ *
The schema of rows can have up to Integer.MAX_VALUE
fields, however, all
+ * row instances must strictly adhere to the schema defined by the type info.
+ *
+ * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
+ *
+ * @param fieldNames array of field names
+ * @param types array of field types
+ */
+ def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
+ JTypes.ROW_NAMED(fieldNames, types: _*)
+ }
+
+ /**
+ * Generates type information for an array consisting of Java primitive elements. The elements
+ * do not support null values.
+ *
+ * @param elementType type of the array elements; e.g. Types.INT
+ */
+ def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+ elementType match {
+ case BOOLEAN => PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+ case BYTE => PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+ case SHORT => PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+ case INT => PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+ case LONG => PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+ case FLOAT => PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+ case DOUBLE => PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+ case _ =>
+ throw new TableException(s"$elementType cannot be an element of a primitive array." +
+ s"Only Java primitive types are supported.")
+ }
+ }
+
+ /**
+ * Generates type information for an array consisting of Java object elements. Null values for
+ * elements are supported.
+ *
+ * @param elementType type of the array elements; e.g. Types.STRING or Types.INT
+ */
+ def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
+ ObjectArrayTypeInfo.getInfoFor(elementType)
+ }
+
+ /**
+ * Generates type information for a Java HashMap. Null values in keys are not supported. An
+ * entry's value can be null.
+ *
+ * @param keyType type of the keys of the map e.g. Types.STRING
+ * @param valueType type of the values of the map e.g. Types.STRING
+ */
+ def MAP[K, V](
+ keyType: TypeInformation[K],
+ valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
+ new MapTypeInfo(keyType, valueType)
+ }
+
+ /**
+ * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an
+ * arbitrary key to an integer value. Null values in keys are not supported.
+ *
+ * @param elementType type of the elements of the multiset e.g. Types.STRING
+ */
+ def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = {
+ new MultisetTypeInfo(elementType)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 04df0cfda928f8..136ea4d66f75a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,6 +17,10 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.streaming.api.scala.{createTypeInformation => _, _}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
@@ -127,6 +131,49 @@ class StreamTableEnvironment @deprecated(
registerDataStreamInternal(name, dataStream.javaStream, exprs)
}
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+ val returnType = createTypeInformation[T]
+ asScalaStream(translateToDataStream[T](
+ table,
+ updatesAsRetraction = false,
+ withChangeFlag = false,
+ returnType))
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the requested data type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
+ val returnType = createTypeInformation[(Boolean, T)]
+ asScalaStream(translateToDataStream[(Boolean, T)](
+ table,
+ updatesAsRetraction = true,
+ withChangeFlag = true,
+ returnType))
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
new file mode 100644
index 00000000000000..d565c85ee4cea4
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
+import org.apache.flink.table.api.{Table, TableException, TableImpl}
+
+/**
+ * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
+ *
+ * @param table The table to convert.
+ */
+class TableConversions(table: Table) {
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ @deprecated("This method only supports conversion of append-only tables. In order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
+ def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation]: DataStream[T] = {
+ table.asInstanceOf[TableImpl].tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toAppendStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+ /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ */
+ def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
+ table.asInstanceOf[TableImpl].tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toRetractStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 20fbe8ebcd8813..eb2ff27e5c4cae 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -60,6 +60,10 @@ import _root_.scala.language.implicitConversions
*/
package object scala {
+ implicit def table2TableConversions(table: Table): TableConversions = {
+ new TableConversions(table)
+ }
+
implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
new DataStreamConversions[T](set, set.dataType)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 43826c0e580298..81fea7d71bb296 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.calcite
import org.apache.flink.table.`type`._
import org.apache.flink.table.api.{TableException, TableSchema}
import org.apache.flink.table.plan.schema.{GenericRelDataType, _}
+
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index b27280ed2865a7..4e7003ded66d55 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.`type`._
import org.apache.flink.table.dataformat.DataFormatConverters.IdentityConverter
import org.apache.flink.table.dataformat.{Decimal, _}
import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.types.Row
import java.lang.reflect.Method
import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
@@ -163,6 +164,14 @@ object CodeGenUtils {
case _ => "null"
}
+ /**
+ * If it's internally compatible, don't need to DataStructure converter.
+ * clazz != classOf[Row] => Row can only infer GenericType[Row].
+ */
+ def isInternalClass(clazz: Class[_], t: TypeInformation[_]): Boolean =
+ clazz != classOf[Object] && clazz != classOf[Row] &&
+ (classOf[BaseRow].isAssignableFrom(clazz) || clazz == t.getTypeClass)
+
// -------------------------- Method & Enum ---------------------------------------
def qualifyMethod(method: Method): String =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
new file mode 100644
index 00000000000000..7a41f13a38614c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.GenericRow
+import org.apache.flink.table.generated.GeneratedInput
+import org.apache.flink.table.`type`.InternalType
+
+/**
+ * A code generator for generating Flink [[GenericInputFormat]]s.
+ */
+object InputFormatCodeGenerator {
+
+ /**
+ * Generates a values input format that can be passed to Java compiler.
+ *
+ * @param ctx The code generator context
+ * @param name Class name of the input format. Must not be unique but has to be a
+ * valid Java class identifier.
+ * @param records code for creating records
+ * @param returnType expected return type
+ * @param outRecordTerm term of the output
+ * @tparam T Return type of the Flink Function.
+ * @return instance of GeneratedFunction
+ */
+ def generateValuesInputFormat[T](
+ ctx: CodeGeneratorContext,
+ name: String,
+ records: Seq[String],
+ returnType: InternalType,
+ outRecordTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_TERM,
+ outRecordWriterTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_WRITER_TERM)
+ : GeneratedInput[GenericInputFormat[T]] = {
+ val funcName = newName(name)
+
+ ctx.addReusableOutputRecord(returnType, classOf[GenericRow], outRecordTerm,
+ Some(outRecordWriterTerm))
+
+ val funcCode = j"""
+ public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+ private int nextIdx = 0;
+
+ ${ctx.reuseMemberCode()}
+
+ public $funcName(Object[] references) throws Exception {
+ ${ctx.reuseInitCode()}
+ }
+
+ @Override
+ public boolean reachedEnd() throws java.io.IOException {
+ return nextIdx >= ${records.length};
+ }
+
+ @Override
+ public Object nextRecord(Object reuse) {
+ switch (nextIdx) {
+ ${records.zipWithIndex.map { case (r, i) =>
+ s"""
+ |case $i:
+ | $r
+ |break;
+ """.stripMargin
+ }.mkString("\n")}
+ }
+ nextIdx++;
+ return $outRecordTerm;
+ }
+ }
+ """.stripMargin
+
+ new GeneratedInput(funcName, funcCode, ctx.references.toArray)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
new file mode 100644
index 00000000000000..f51803f64c747d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
+
+object SinkCodeGenerator {
+
+ private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] = {
+ try {
+ sink match {
+ // DataStreamTableSink has no generic class, so we need get the type to get type class.
+ case sink: DataStreamTableSink[_] => sink.getOutputType.getTypeClass
+ case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]], sink.getClass, 0)
+ .getTypeClass.asInstanceOf[Class[_]]
+ }
+ } catch {
+ case _: InvalidTypesException =>
+ classOf[Object]
+ }
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
new file mode 100644
index 00000000000000..c207bd685a52f2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.runtime.values.ValuesInputFormat
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexLiteral
+
+import com.google.common.collect.ImmutableList
+
+import scala.collection.JavaConversions._
+
+object ValuesCodeGenerator {
+
+ def generatorInputFormat(
+ tableEnv: TableEnvironment,
+ rowType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ description: String): ValuesInputFormat = {
+ val config = tableEnv.getConfig
+ val outputType = FlinkTypeFactory.toInternalRowType(rowType)
+
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, false)
+ // generate code for every record
+ val generatedRecords = tuples.map { r =>
+ exprGenerator.generateResultExpression(
+ r.map(exprGenerator.generateExpression), outputType, classOf[GenericRow])
+ }
+
+ // generate input format
+ val generatedFunction = InputFormatCodeGenerator.generateValuesInputFormat[BaseRow](
+ ctx,
+ description,
+ generatedRecords.map(_.code),
+ outputType)
+
+ val baseRowTypeInfo = new BaseRowTypeInfo(outputType.getFieldTypes, outputType.getFieldNames)
+ new ValuesInputFormat(generatedFunction, baseRowTypeInfo)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 059a971184b559..35eab30a6e5f69 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -18,14 +18,24 @@
package org.apache.flink.table.plan.nodes.physical.batch
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils
+import org.apache.flink.table.codegen.SinkCodeGenerator._
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.calcite.Sink
-import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink, TableSink}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import java.util
+import scala.collection.JavaConversions._
+
/**
* Batch physical RelNode to to write data into an external sink defined by a [[TableSink]].
*/
@@ -36,10 +46,70 @@ class BatchExecSink[T](
sink: TableSink[T],
sinkName: String)
extends Sink(cluster, traitSet, inputRel, sink, sinkName)
- with BatchPhysicalRel {
+ with BatchPhysicalRel
+ with BatchExecNode[Any] {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
}
+ //~ ExecNode methods -----------------------------------------------------------
+
+ /**
+ * For sink operator, the records will not pass through it, so it's DamBehavior is FULL_DAM.
+ *
+ * @return Returns [[DamBehavior]] of Sink.
+ */
+ override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
+
+ override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+ List(getInput.asInstanceOf[ExecNode[BatchTableEnvironment, _]])
+ }
+
+ override protected def translateToPlanInternal(
+ tableEnv: BatchTableEnvironment): StreamTransformation[Any] = {
+ val resultTransformation = sink match {
+ case batchTableSink: BatchTableSink[T] =>
+ val transformation = translateToStreamTransformation(withChangeFlag = false, tableEnv)
+ val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
+ batchTableSink.emitBoundedStream(
+ boundedStream, tableEnv.getConfig, tableEnv.streamEnv.getConfig).getTransformation
+
+ case streamTableSink: DataStreamTableSink[T] =>
+ // In case of table to bounded stream through BatchTableEnvironment#toBoundedStream, we
+ // insert a DataStreamTableSink then wrap it as a LogicalSink, there is no real batch table
+ // sink, so we do not need to invoke TableSink#emitBoundedStream and set resource, just a
+ // translation to StreamTransformation is ok.
+ translateToStreamTransformation(withChangeFlag = streamTableSink.withChangeFlag, tableEnv)
+
+ case _ =>
+ throw new TableException("Only Support BatchTableSink or DataStreamTableSink!")
+ }
+ resultTransformation.asInstanceOf[StreamTransformation[Any]]
+ }
+
+ private def translateToStreamTransformation(
+ withChangeFlag: Boolean,
+ tableEnv: BatchTableEnvironment): StreamTransformation[T] = {
+ val resultType = sink.getOutputType
+ TableEnvironment.validateType(resultType)
+ val inputNode = getInputNodes.get(0)
+ inputNode match {
+ // Sink's input must be BatchExecNode[BaseRow] now.
+ case node: BatchExecNode[BaseRow] =>
+ val plan = node.translateToPlan(tableEnv)
+ // TODO support SinkConversion after FLINK-11974 is done
+ val typeClass = extractTableSinkTypeClass(sink)
+ if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+ plan.asInstanceOf[StreamTransformation[T]]
+ } else {
+ throw new TableException(s"Not support SinkConvention now.")
+ }
+ case _ =>
+ throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
+ }
+ }
+
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
index 00f47827cf7e90..ad312f299a8325 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
@@ -18,13 +18,21 @@
package org.apache.flink.table.plan.nodes.physical.batch
-import com.google.common.collect.ImmutableList
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
+import com.google.common.collect.ImmutableList
+
import java.util
import scala.collection.JavaConversions._
@@ -38,7 +46,8 @@ class BatchExecValues(
tuples: ImmutableList[ImmutableList[RexLiteral]],
outputRowType: RelDataType)
extends Values(cluster, outputRowType, tuples, traitSet)
- with BatchPhysicalRel {
+ with BatchPhysicalRel
+ with BatchExecNode[BaseRow] {
override def deriveRowType(): RelDataType = outputRowType
@@ -51,5 +60,23 @@ class BatchExecValues(
.item("values", getRowType.getFieldNames.toList.mkString(", "))
}
+ //~ ExecNode methods -----------------------------------------------------------
+
+ override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
+
+ override protected def translateToPlanInternal(
+ tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
+ val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+ tableEnv,
+ getRowType,
+ tuples,
+ getRelTypeName)
+ tableEnv.streamEnv.createInput(inputFormat, inputFormat.getProducedType).getTransformation
+ }
+
+ override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+ new util.ArrayList[ExecNode[BatchTableEnvironment, _]]()
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 7b74b67df3aa80..c9adccc33dcef2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -18,7 +18,17 @@
package org.apache.flink.table.plan.nodes.physical.stream
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils
+import org.apache.flink.table.codegen.SinkCodeGenerator.extractTableSinkTypeClass
+import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.calcite.Sink
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.sinks._
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -26,6 +36,8 @@ import org.apache.calcite.rel.RelNode
import java.util
+import scala.collection.JavaConversions._
+
/**
* Stream physical RelNode to to write data into an external sink defined by a [[TableSink]].
*/
@@ -36,12 +48,13 @@ class StreamExecSink[T](
sink: TableSink[T],
sinkName: String)
extends Sink(cluster, traitSet, inputRel, sink, sinkName)
- with StreamPhysicalRel {
+ with StreamPhysicalRel
+ with StreamExecNode[Any] {
override def producesUpdates: Boolean = false
override def needsUpdatesAsRetraction(input: RelNode): Boolean =
- sink.isInstanceOf[BaseRetractStreamTableSink[_]]
+ sink.isInstanceOf[RetractStreamTableSink[_]]
override def consumesRetractions: Boolean = false
@@ -53,5 +66,107 @@ class StreamExecSink[T](
new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
}
+ //~ ExecNode methods -----------------------------------------------------------
+
+ override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = {
+ List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+ }
+
+ override protected def translateToPlanInternal(
+ tableEnv: StreamTableEnvironment): StreamTransformation[Any] = {
+ val resultTransformation = sink match {
+ case streamTableSink: StreamTableSink[T] =>
+ val transformation = streamTableSink match {
+ case _: RetractStreamTableSink[T] =>
+ translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+ case upsertSink: UpsertStreamTableSink[T] =>
+ // check for append only table
+ val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
+ upsertSink.setIsAppendOnly(isAppendOnlyTable)
+ translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+ case _: AppendStreamTableSink[T] =>
+ // verify table is an insert-only (append-only) table
+ if (!UpdatingPlanChecker.isAppendOnly(this)) {
+ throw new TableException(
+ "AppendStreamTableSink requires that Table has only insert changes.")
+ }
+
+ val accMode = this.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).getAccMode
+ if (accMode == AccMode.AccRetract) {
+ throw new TableException(
+ "AppendStreamTableSink can not be used to output retraction messages.")
+ }
+ translateToStreamTransformation(withChangeFlag = false, tableEnv)
+
+ case _ =>
+ throw new TableException(
+ "Stream Tables can only be emitted by AppendStreamTableSink, " +
+ "RetractStreamTableSink, or UpsertStreamTableSink.")
+ }
+ val dataStream = new DataStream(tableEnv.execEnv, transformation)
+ streamTableSink.emitDataStream(dataStream).getTransformation
+
+ case streamTableSink: DataStreamTableSink[_] =>
+ // In case of table to stream through BatchTableEnvironment#translateToDataStream,
+ // we insert a DataStreamTableSink then wrap it as a LogicalSink, there is no real batch
+ // table sink, so we do not need to invoke TableSink#emitBoundedStream and set resource,
+ // just a translation to StreamTransformation is ok.
+ translateToStreamTransformation(streamTableSink.withChangeFlag, tableEnv)
+
+ case _ =>
+ throw new TableException("Only Support StreamTableSink or DataStreamTableSink!")
+ }
+ resultTransformation.asInstanceOf[StreamTransformation[Any]]
+ }
+
+ /**
+ * Translates a logical [[RelNode]] into a [[StreamTransformation]].
+ *
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @return The [[StreamTransformation]] that corresponds to the translated [[Table]].
+ */
+ private def translateToStreamTransformation(
+ withChangeFlag: Boolean,
+ tableEnv: StreamTableEnvironment): StreamTransformation[T] = {
+ val inputNode = getInput
+ // if no change flags are requested, verify table is an insert-only (append-only) table.
+ if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(inputNode)) {
+ throw new TableException(
+ "Table is not an append-only table. " +
+ "Use the toRetractStream() in order to handle add and retract messages.")
+ }
+
+ // get BaseRow plan
+ val parTransformation = inputNode match {
+ // Sink's input must be StreamExecNode[BaseRow] now.
+ case node: StreamExecNode[BaseRow] =>
+ node.translateToPlan(tableEnv)
+ case _ =>
+ throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
+ }
+ val logicalType = inputNode.getRowType
+ val rowtimeFields = logicalType.getFieldList
+ .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+ if (rowtimeFields.size > 1) {
+ throw new TableException(
+ s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+ s"the table that should be converted to a DataStream.\n" +
+ s"Please select the rowtime field that should be used as event-time timestamp for the " +
+ s"DataStream by casting all other fields to TIMESTAMP.")
+ }
+ val resultType = sink.getOutputType
+ // TODO support SinkConversion after FLINK-11974 is done
+ val typeClass = extractTableSinkTypeClass(sink)
+ if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+ parTransformation.asInstanceOf[StreamTransformation[T]]
+ } else {
+ throw new TableException(s"Not support SinkConvention now.")
+ }
+
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
index b87080a525125b..4b63f189a01ef1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
@@ -18,13 +18,22 @@
package org.apache.flink.table.plan.nodes.physical.stream
-import com.google.common.collect.ImmutableList
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, TableException}
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+
import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
+import com.google.common.collect.ImmutableList
+
+import java.util
+
/**
* Stream physical RelNode for [[Values]].
*/
@@ -34,7 +43,8 @@ class StreamExecValues(
tuples: ImmutableList[ImmutableList[RexLiteral]],
outputRowType: RelDataType)
extends Values(cluster, outputRowType, tuples, traitSet)
- with StreamPhysicalRel {
+ with StreamPhysicalRel
+ with StreamExecNode[BaseRow] {
override def producesUpdates: Boolean = false
@@ -51,4 +61,28 @@ class StreamExecValues(
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamExecValues(cluster, traitSet, getTuples, outputRowType)
}
+
+ //~ ExecNode methods -----------------------------------------------------------
+
+ override protected def translateToPlanInternal(
+ tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+ if (tableEnv.getConfig.getConf.getBoolean(
+ TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED)) {
+ val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+ tableEnv,
+ getRowType,
+ tuples,
+ getRelTypeName)
+ tableEnv.execEnv.createInput(inputFormat, inputFormat.getProducedType).getTransformation
+ } else {
+ // enable this feature when runtime support do checkpoint when source finished
+ throw new TableException("Values source input is not supported currently. Probably " +
+ "there is a where condition which always returns false in your query.")
+ }
+ }
+
+ override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = {
+ new util.ArrayList[ExecNode[StreamTableEnvironment, _]]()
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
index e54c9d2c6d7e0d..97ff92b5dff67c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
@@ -31,7 +31,7 @@ trait Optimizer {
*
NOTES: *
1. The reused node in result DAG will be converted to the same RelNode. *
2. If a root node requires retract changes on Stream, the node should be
- * a [[org.apache.flink.table.sinks.BaseRetractStreamTableSink]] or
+ * a [[org.apache.flink.table.sinks.RetractStreamTableSink]] or
* a regular node with [[org.apache.flink.table.plan.trait.UpdateAsRetractionTrait]]
* which `updateAsRetraction` is true.
*
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index e2bd71046c7cfe..ac0592628043ae 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -209,6 +209,7 @@ object FlinkBatchRuleSets {
BatchExecScanTableSourceRule.INSTANCE,
BatchExecValuesRule.INSTANCE,
BatchExecCalcRule.INSTANCE,
- BatchExecUnionRule.INSTANCE
+ BatchExecUnionRule.INSTANCE,
+ BatchExecSinkRule.INSTANCE
)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index a32c926bee127d..e8b96ef064bb3f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -215,7 +215,8 @@ object FlinkStreamRuleSets {
StreamExecTableSourceScanRule.INSTANCE,
StreamExecValuesRule.INSTANCE,
StreamExecCalcRule.INSTANCE,
- StreamExecUnionRule.INSTANCE
+ StreamExecUnionRule.INSTANCE,
+ StreamExecSinkRule.INSTANCE
)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
new file mode 100644
index 00000000000000..93aaeb6ea57480
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.batch
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class BatchExecSinkRule extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchExecSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+ // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+ val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.BATCH_PHYSICAL)
+
+ new BatchExecSink(
+ rel.getCluster,
+ newTrait,
+ newInput,
+ sinkNode.sink,
+ sinkNode.sinkName)
+ }
+}
+
+object BatchExecSinkRule {
+
+ val INSTANCE: RelOptRule = new BatchExecSinkRule
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
new file mode 100644
index 00000000000000..5ae6ead086e40a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class StreamExecSinkRule extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamExecSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+ // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+ val newInput = RelOptRule.convert(sinkNode.getInput, FlinkConventions.STREAM_PHYSICAL)
+
+ new StreamExecSink(
+ rel.getCluster,
+ newTrait,
+ newInput,
+ sinkNode.sink,
+ sinkNode.sinkName)
+ }
+}
+
+object StreamExecSinkRule {
+
+ val INSTANCE: RelOptRule = new StreamExecSinkRule
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
new file mode 100644
index 00000000000000..d0c88072a51330
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/**
+ * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes.
+ *
+ * If the [[Table]] is also modified by update or delete changes, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
+ */
+trait AppendStreamTableSink[T] extends StreamTableSink[T]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
similarity index 69%
rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
index 82b95901c72725..8ab44b46131b26 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
@@ -18,17 +18,19 @@
package org.apache.flink.table.sinks
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api._
-/**
- * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
- * changes.
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
*
- * @tparam T Type of records that this [[TableSink]] expects and supports.
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
*/
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait BatchTableSink[T] extends TableSink[T] {
/** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
+ def emitBoundedStream(
+ boundedStream: DataStream[T],
+ tableConfig: TableConfig,
+ executionConfig: ExecutionConfig): DataStreamSink[_]
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
new file mode 100644
index 00000000000000..27a7d49d629fca
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.table.api._
+import org.apache.flink.types.Row
+
+/**
+ * A simple [[TableSink]] to emit data as T to a collection.
+ */
+class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeInformation[T]))
+ extends TableSinkBase[T] with BatchTableSink[T] {
+
+ private var collectOutputFormat: CollectOutputFormat[T] = _
+
+ override def emitBoundedStream(
+ boundedStream: DataStream[T],
+ tableConfig: TableConfig,
+ executionConfig: ExecutionConfig): DataStreamSink[T] = {
+ boundedStream.writeUsingOutputFormat(collectOutputFormat)
+ .name("collect")
+ }
+
+ override protected def copy: TableSinkBase[T] = {
+ new CollectTableSink(produceOutputType)
+ }
+
+ override def getOutputType: TypeInformation[T] = {
+ produceOutputType(getFieldTypes)
+ }
+
+ def init(typeSerializer: TypeSerializer[T], id: String): Unit = {
+ collectOutputFormat = new CollectOutputFormat(id, typeSerializer)
+ }
+}
+
+class CollectOutputFormat[T](id: String, typeSerializer: TypeSerializer[T])
+ extends RichOutputFormat[T] {
+
+ private var accumulator: SerializedListAccumulator[T] = _
+
+ override def writeRecord(record: T): Unit = {
+ accumulator.add(record, typeSerializer)
+ }
+
+ override def configure(parameters: Configuration): Unit = {
+ }
+
+ override def close(): Unit = {
+ // Important: should only be added in close method to minimize traffic of accumulators
+ getRuntimeContext.addAccumulator(id, accumulator)
+ }
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {
+ this.accumulator = new SerializedListAccumulator[T]
+ }
+}
+
+class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*))
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
new file mode 100644
index 00000000000000..c4a308dea88353
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{Table, TableException}
+
+/**
+ * A [[DataStreamTableSink]] specifies how to emit a [[Table]] to an DataStream[T]
+ *
+ * @param table The [[Table]] to emit.
+ * @param outputType The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @param updatesAsRetraction Set to true to encode updates as retraction messages.
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @tparam T The type of the resulting [[DataStream]].
+ */
+@Internal
+class DataStreamTableSink[T](
+ table: Table,
+ outputType: TypeInformation[T],
+ val updatesAsRetraction: Boolean,
+ val withChangeFlag: Boolean) extends TableSink[T] {
+
+ private lazy val tableSchema = table.getSchema
+
+ /**
+ * Return the type expected by this [[TableSink]].
+ *
+ * This type should depend on the types returned by [[getFieldNames]].
+ *
+ * @return The type expected by this [[TableSink]].
+ */
+ override def getOutputType: TypeInformation[T] = outputType
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] = Array(tableSchema.getFieldTypes: _*)
+
+ /** Returns the names of the table fields. */
+ override def getFieldNames: Array[String] = tableSchema.getFieldNames
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+ throw new TableException(s"configure is not supported.")
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
new file mode 100644
index 00000000000000..2ee044993ebe9a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
+
+/**
+ * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+ * changes.
+ *
+ * @tparam T Type of records that this [[TableSink]] expects and supports.
+ */
+trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
new file mode 100644
index 00000000000000..330faacb2961c5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
+
+
+/**
+ * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+ * changes. The [[Table]] must be have unique key fields (atomic or composite) or be append-only.
+ *
+ * If the [[Table]] does not have a unique key and is not append-only, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]]
+ * method.
+ *
+ * If the table is append-only, all messages will have a true flag and must be interpreted
+ * as insertions.
+ *
+ * @tparam T Type of records that this [[TableSink]] expects and supports.
+ */
+trait UpsertStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+ /**
+ * Configures the unique key fields of the [[Table]] to write.
+ * The method is called after [[TableSink.configure()]].
+ *
+ * The keys array might be empty, if the table consists of a single (updated) record.
+ * If the table does not have a key and is append-only, the keys attribute is null.
+ *
+ * @param keys the field names of the table's keys, an empty array if the table has a single
+ * row, and null if the table is append-only and has no key.
+ */
+ def setKeyFields(keys: Array[String]): Unit
+
+ /**
+ * Specifies whether the [[Table]] to write is append-only or not.
+ *
+ * @param isAppendOnly true if the table is append-only, false otherwise.
+ */
+ def setIsAppendOnly(isAppendOnly: JBool): Unit
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
new file mode 100644
index 00000000000000..457e13fe1fa648
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+/**
+ * Utility for BaseRow.
+ */
+public class BaseRowUtil {
+
+ public static String baseRowToString(BaseRow value, BaseRowTypeInfo rowTypeInfo, TimeZone tz) {
+ return baseRowToString(value, rowTypeInfo, tz, true);
+ }
+
+ public static String baseRowToString(BaseRow value, BaseRowTypeInfo rowTypeInfo, TimeZone tz, boolean withHeader) {
+ GenericRow genericRow = toGenericRow(value, rowTypeInfo);
+ return genericRowToString(genericRow, tz, withHeader);
+ }
+
+ private static String fieldToString(Object field, TimeZone tz) {
+ if (field instanceof Date || field instanceof Time || field instanceof Timestamp) {
+ // TODO support after FLINK-11898 is merged
+ throw new UnsupportedOperationException();
+ } else {
+ return StringUtils.arrayAwareToString(field);
+ }
+ }
+
+ private static String genericRowToString(GenericRow row, TimeZone tz, boolean withHeader) {
+ StringBuilder sb = new StringBuilder();
+ if (withHeader) {
+ sb.append(row.getHeader()).append("|");
+ }
+ for (int i = 0; i < row.getArity(); i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(fieldToString(row.getField(i), tz));
+ }
+ return sb.toString();
+ }
+
+ private static GenericRow toGenericRow(BaseRow baseRow, BaseRowTypeInfo baseRowTypeInfo) {
+ if (baseRow instanceof GenericRow) {
+ return (GenericRow) baseRow;
+ } else {
+ int fieldNum = baseRow.getArity();
+ GenericRow row = new GenericRow(fieldNum);
+ row.setHeader(baseRow.getHeader());
+ InternalType[] internalTypes = baseRowTypeInfo.getInternalTypes();
+ for (int i = 0; i < fieldNum; i++) {
+ if (baseRow.isNullAt(i)) {
+ row.setField(i, null);
+ } else {
+ row.setField(i, TypeGetterSetters.get(baseRow, i, internalTypes[i]));
+ }
+ }
+ return row;
+ }
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
new file mode 100644
index 00000000000000..19f3d551d0a4f0
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.table.runtime.utils.BatchTestBase
+
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends BatchTestBase {
+
+ @Test
+ def testValues(): Unit = {
+ val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+ val table = tEnv.sqlQuery(sqlQuery)
+ val actual = collectResults(table)
+ val expected = List("1,2,3")
+ assertEquals(expected.sorted, actual.sorted)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
new file mode 100644
index 00000000000000..d081cb54b9f057
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestingAppendBaseRowSink}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.`type`.InternalTypes.INT
+
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends StreamingTestBase {
+
+ @Test
+ def testValues(): Unit = {
+ tEnv.getConfig.getConf.setBoolean(TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED, true)
+
+ val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+
+ val outputType = new BaseRowTypeInfo(INT, INT, INT)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[BaseRow]
+ val sink = new TestingAppendBaseRowSink(outputType)
+ result.addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = List("0|1,2,3")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
new file mode 100644
index 00000000000000..0ef3c86171050a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.sinks.{CollectTableSink, TableSinkBase}
+import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+import org.apache.flink.util.AbstractID
+
+import _root_.java.util.{TimeZone, ArrayList => JArrayList}
+
+import _root_.scala.collection.JavaConversions._
+
+import org.junit.Before
+
+class BatchTestBase {
+
+ var env: StreamExecutionEnvironment = _
+ val conf: TableConfig = new TableConfig
+
+ // scala tableEnv
+ var tEnv: ScalaBatchTableEnv = _
+
+ @Before
+ def before(): Unit = {
+ // java env
+ val javaEnv = new LocalStreamEnvironment()
+ // scala env
+ this.env = new StreamExecutionEnvironment(javaEnv)
+ this.tEnv = ScalaBatchTableEnv.create(env)
+ }
+
+ def collectResults(table: Table): Seq[String] = {
+ val tableSchema = table.getSchema
+ val sink = new CollectBaseRowTableSink
+ val configuredSink = sink.configure(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+ .asInstanceOf[CollectBaseRowTableSink]
+ val outType = configuredSink.getOutputType.asInstanceOf[BaseRowTypeInfo]
+ val typeSerializer = outType.createSerializer(new ExecutionConfig)
+ val id = new AbstractID().toString
+ configuredSink.init(typeSerializer, id)
+ tEnv.writeToSink(table, configuredSink, "test")
+ val res = env.execute()
+
+ val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
+ val datas: Seq[BaseRow] = SerializedListAccumulator.deserializeList(accResult, typeSerializer)
+ val tz = TimeZone.getTimeZone("UTC")
+ datas.map(BaseRowUtil.baseRowToString(_, outType, tz, false))
+ }
+
+}
+
+class CollectBaseRowTableSink
+ extends CollectTableSink[BaseRow](
+ types => new BaseRowTypeInfo(types.map(createInternalTypeFromTypeInfo): _*)) {
+
+ override protected def copy: TableSinkBase[BaseRow] = {
+ new CollectBaseRowTableSink
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
new file mode 100644
index 00000000000000..3c5bdda0f84787
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+
+import _root_.java.util.TimeZone
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+import _root_.scala.collection.mutable.ArrayBuffer
+
+object StreamTestSink {
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+
+ private[utils] val idCounter: AtomicInteger = new AtomicInteger(0)
+
+ private[utils] val globalResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+ private[utils] val globalRetractResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+ private[utils] val globalUpsertResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]
+
+ private[utils] def getNewSinkId: Int = {
+ val idx = idCounter.getAndIncrement()
+ this.synchronized{
+ globalResults.put(idx, mutable.HashMap.empty[Int, ArrayBuffer[String]])
+ globalRetractResults.put(idx, mutable.HashMap.empty[Int, ArrayBuffer[String]])
+ globalUpsertResults.put(idx, mutable.HashMap.empty[Int, mutable.Map[String, String]])
+ }
+ idx
+ }
+
+ def clear(): Unit = {
+ globalResults.clear()
+ globalRetractResults.clear()
+ globalUpsertResults.clear()
+ }
+}
+
+abstract class AbstractExactlyOnceSink[T] extends RichSinkFunction[T] with CheckpointedFunction {
+ protected var resultsState: ListState[String] = _
+ protected var localResults: ArrayBuffer[String] = _
+ protected val idx: Int = StreamTestSink.getNewSinkId
+
+ protected var globalResults: mutable.Map[Int, ArrayBuffer[String]]= _
+ protected var globalRetractResults: mutable.Map[Int, ArrayBuffer[String]] = _
+ protected var globalUpsertResults: mutable.Map[Int, mutable.Map[String, String]] = _
+
+ override def initializeState(context: FunctionInitializationContext): Unit = {
+ resultsState = context.getOperatorStateStore
+ .getListState(new ListStateDescriptor[String]("sink-results", Types.STRING))
+
+ localResults = mutable.ArrayBuffer.empty[String]
+
+ if (context.isRestored) {
+ for (value <- resultsState.get().asScala) {
+ localResults += value
+ }
+ }
+
+ val taskId = getRuntimeContext.getIndexOfThisSubtask
+ StreamTestSink.synchronized(
+ StreamTestSink.globalResults(idx) += (taskId -> localResults)
+ )
+ }
+
+ override def snapshotState(context: FunctionSnapshotContext): Unit = {
+ resultsState.clear()
+ for (value <- localResults) {
+ resultsState.add(value)
+ }
+ }
+
+ protected def clearAndStashGlobalResults(): Unit = {
+ if (globalResults == null) {
+ StreamTestSink.synchronized{
+ globalResults = StreamTestSink.globalResults.remove(idx).get
+ globalRetractResults = StreamTestSink.globalRetractResults.remove(idx).get
+ globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
+ }
+ }
+ }
+
+ protected def getResults: List[String] = {
+ clearAndStashGlobalResults()
+ val result = ArrayBuffer.empty[String]
+ this.globalResults.foreach {
+ case (_, list) => result ++= list
+ }
+ result.toList
+ }
+}
+
+final class TestingAppendBaseRowSink(
+ rowTypeInfo: BaseRowTypeInfo, tz: TimeZone)
+ extends AbstractExactlyOnceSink[BaseRow] {
+
+ def this(rowTypeInfo: BaseRowTypeInfo) {
+ this(rowTypeInfo, TimeZone.getTimeZone("UTC"))
+ }
+
+ def invoke(value: BaseRow): Unit = localResults +=
+ BaseRowUtil.baseRowToString(value, rowTypeInfo, tz)
+
+ def getAppendResults: List[String] = getResults
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
new file mode 100644
index 00000000000000..d3c30b7579ecdb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Before, Rule}
+
+class StreamingTestBase {
+
+ var env: StreamExecutionEnvironment = _
+ var tEnv: StreamTableEnvironment = _
+ val _tempFolder = new TemporaryFolder
+ var enableObjectReuse = true
+ // used for accurate exception information checking.
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown: ExpectedException = expectedException
+
+ @Rule
+ def tempFolder: TemporaryFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ StreamTestSink.clear()
+ this.env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(4)
+ this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ if (enableObjectReuse) {
+ this.env.getConfig.enableObjectReuse()
+ }
+ this.tEnv = StreamTableEnvironment.create(env)
+ }
+
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 0678e71867af7b..5c5909fa13e44d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -27,6 +27,16 @@
*/
public class TableConfigOptions {
+ // ------------------------------------------------------------------------
+ // Source Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption