From 9cb36b957fc4ed2ef6ad965304b759e9c7a53300 Mon Sep 17 00:00:00 2001 From: Xingcan Cui Date: Thu, 14 Dec 2017 17:37:40 +0800 Subject: [PATCH] [FLINK-8258] [table] Enable query configuration for batch queries --- .../table/api/BatchTableEnvironment.scala | 25 ++++++---- .../api/java/BatchTableEnvironment.scala | 48 ++++++++++++++++++- .../api/scala/BatchTableEnvironment.scala | 20 +++++++- .../table/api/scala/TableConversions.scala | 25 +++++++++- .../nodes/dataset/BatchTableSourceScan.scala | 6 ++- .../plan/nodes/dataset/DataSetAggregate.scala | 8 ++-- .../plan/nodes/dataset/DataSetCalc.scala | 8 ++-- .../plan/nodes/dataset/DataSetCorrelate.scala | 10 ++-- .../plan/nodes/dataset/DataSetDistinct.scala | 8 ++-- .../plan/nodes/dataset/DataSetIntersect.scala | 10 ++-- .../plan/nodes/dataset/DataSetJoin.scala | 10 ++-- .../plan/nodes/dataset/DataSetMinus.scala | 10 ++-- .../table/plan/nodes/dataset/DataSetRel.scala | 5 +- .../plan/nodes/dataset/DataSetScan.scala | 6 ++- .../nodes/dataset/DataSetSingleRowJoin.scala | 14 +++--- .../plan/nodes/dataset/DataSetSort.scala | 24 +++++----- .../plan/nodes/dataset/DataSetUnion.scala | 10 ++-- .../plan/nodes/dataset/DataSetValues.scala | 6 ++- .../dataset/DataSetWindowAggregate.scala | 8 ++-- 19 files changed, 188 insertions(+), 73 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 8a78ac4d56571..613392ddbdab3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -68,6 +68,8 @@ abstract class BatchTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataSetTable_[0-9]+$".r + override def queryConfig: BatchQueryConfig = new BatchQueryConfig + /** * Checks if the chosen table name is valid. * @@ -169,9 +171,9 @@ abstract class BatchTableEnvironment( sink: TableSink[T], queryConfig: QueryConfig): Unit = { - // We do not pass the configuration on, because there is nothing to configure for batch queries. - queryConfig match { - case _: BatchQueryConfig => + // Check the query configuration to be a batch one. + val batchQueryConfig = queryConfig match { + case batchConfig: BatchQueryConfig => batchConfig case _ => throw new TableException("BatchQueryConfig required to configure batch query.") } @@ -180,7 +182,7 @@ abstract class BatchTableEnvironment( case batchSink: BatchTableSink[T] => val outputType = sink.getOutputType // translate the Table into a DataSet and provide the type that the TableSink expects. - val result: DataSet[T] = translate(table)(outputType) + val result: DataSet[T] = translate(table, batchQueryConfig)(outputType) // Give the DataSet to the TableSink to emit it. batchSink.emitDataSet(result) case _ => @@ -230,7 +232,8 @@ abstract class BatchTableEnvironment( private[flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row])) + val dataSet = translate[Row](optimizedPlan, ast.getRowType, queryConfig) ( + new GenericTypeInfo (classOf[Row])) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan @@ -372,10 +375,12 @@ abstract class BatchTableEnvironment( * @tparam A The type of the resulting [[DataSet]]. * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { + protected def translate[A]( + table: Table, + queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = { val relNode = table.getRelNode val dataSetPlan = optimize(relNode) - translate(dataSetPlan, relNode.getRowType) + translate(dataSetPlan, relNode.getRowType, queryConfig) } /** @@ -390,13 +395,13 @@ abstract class BatchTableEnvironment( */ protected def translate[A]( logicalPlan: RelNode, - logicalType: RelDataType) - (implicit tpe: TypeInformation[A]): DataSet[A] = { + logicalType: RelDataType, + queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = { TableEnvironment.validateType(tpe) logicalPlan match { case node: DataSetRel => - val plan = node.translateToPlan(this) + val plan = node.translateToPlan(this, queryConfig) val conversion = getConversionMapper( plan.getType, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala index 44910b6d4f8dc..b79d161bc3279 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala @@ -143,7 +143,8 @@ class BatchTableEnvironment( * @return The converted [[DataSet]]. */ def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = { - translate[T](table)(TypeExtractor.createTypeInfo(clazz)) + // Use the default query config. + translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz)) } /** @@ -160,7 +161,50 @@ class BatchTableEnvironment( * @return The converted [[DataSet]]. */ def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = { - translate[T](table)(typeInfo) + // Use the default batch query config. + translate[T](table, queryConfig)(typeInfo) + } + + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataSet]]. + * @param queryConfig The configuration for the query to generate. + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ + def toDataSet[T]( + table: Table, + clazz: Class[T], + queryConfig: BatchQueryConfig): DataSet[T] = { + translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz)) + } + + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]]. + * @param queryConfig The configuration for the query to generate. + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ + def toDataSet[T]( + table: Table, + typeInfo: TypeInformation[T], + queryConfig: BatchQueryConfig): DataSet[T] = { + translate[T](table, queryConfig)(typeInfo) } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala index 5def9f2abbb2b..0692fd2f1b962 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala @@ -137,7 +137,25 @@ class BatchTableEnvironment( * @return The converted [[DataSet]]. */ def toDataSet[T: TypeInformation](table: Table): DataSet[T] = { - wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) + // Use the default batch query config. + wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) + } + + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ + def toDataSet[T: TypeInformation](table: Table, queryConfig: BatchQueryConfig): DataSet[T] = { + wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index bd431eb81e1aa..152fc2793ed44 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException} +import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv} import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv} @@ -54,6 +54,29 @@ class TableConversions(table: Table) { } } + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ + def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] = { + + table.tableEnv match { + case tEnv: ScalaBatchTableEnv => + tEnv.toDataSet(table, queryConfig) + case _ => + throw new TableException( + "Only tables that originate from Scala DataSets can be converted to Scala DataSets.") + } + } + /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index d09d69c7f7c81..3897aa065fb6a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rex.RexNode import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types} +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException, Types} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema @@ -78,7 +78,9 @@ class BatchTableSourceScan( ) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index bdc3d7a7e0677..c65e301d712eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.plan.nodes.CommonAggregate @@ -86,10 +86,12 @@ class DataSetAggregate( planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val input = inputNode.asInstanceOf[DataSetRel] - val inputDS = input.translateToPlan(tableEnv) + val inputDS = input.translateToPlan(tableEnv, queryConfig) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 7417eb2dac56a..dd6c7a9dc9761 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -27,7 +27,7 @@ import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc @@ -82,11 +82,13 @@ class DataSetCalc( estimateRowCount(calcProgram, rowCnt) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val config = tableEnv.getConfig - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val generator = new FunctionCodeGenerator(config, false, inputDS.getType) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 5f945623b2de6..a4a93f7be0822 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -26,9 +26,7 @@ import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.BatchTableEnvironment -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan @@ -94,12 +92,14 @@ class DataSetCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val config = tableEnv.getConfig // we do not need to specify input type - val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala index 14116f1fc9b7d..3ec98b0a00117 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.runtime.aggregate.DistinctReduce import org.apache.flink.types.Row @@ -76,9 +76,11 @@ class DataSetDistinct( rowType.getFieldList.asScala.map(_.getName).mkString(", ") } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val groupKeys = (0 until rowRelDataType.getFieldCount).toArray // group on all fields inputDS diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala index 4497df33d8b4f..b6f3ed64d1f0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.runtime.IntersectCoGroupFunction import org.apache.flink.types.Row @@ -74,10 +74,12 @@ class DataSetIntersect( } } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val coGroupedDs = leftDataSet.coGroup(rightDataSet) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala index f039cf902784a..e461c57c20da6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala @@ -34,7 +34,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableException, Types} +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableConfig, TableException, Types} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonJoin @@ -115,7 +115,9 @@ class DataSetJoin( planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val config = tableEnv.getConfig @@ -160,8 +162,8 @@ class DataSetJoin( }) } - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) joinType match { case JoinRelType.INNER => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala index 9ba65bfdddb8c..96978a9e2aa8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.runtime.MinusCoGroupFunction import org.apache.flink.types.Row @@ -85,10 +85,12 @@ class DataSetMinus( rowCnt } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val coGroupedDs = leftDataSet.coGroup(rightDataSet) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala index aea61f7e4ee57..e243850d0b10b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.plan.nodes.FlinkRelNode import org.apache.flink.types.Row @@ -29,8 +29,9 @@ trait DataSetRel extends FlinkRelNode { * Translates the [[DataSetRel]] node into a [[DataSet]] operator. * * @param tableEnv The [[BatchTableEnvironment]] of the translated Table. + * @param queryConfig The configuration for the query to generate. * @return DataSet of type [[Row]] */ - def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] + def translateToPlan(tableEnv: BatchTableEnvironment, queryConfig: BatchQueryConfig): DataSet[Row] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala index 07ba7e14ae00c..d297d3b238aa7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema} import org.apache.flink.types.Row @@ -59,7 +59,9 @@ class DataSetScan( ) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val schema = new RowSchema(rowRelDataType) val inputDataSet: DataSet[Any] = dataSetTable.dataSet val fieldIdxs = dataSetTable.fieldIndexes diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala index 3c1c58b340440..70a783615f525 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala @@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexNode import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig} +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableConfig} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} @@ -91,10 +91,12 @@ class DataSetSingleRowJoin( planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val broadcastSetName = "joinSet" val mapSideJoin = generateMapFunction( tableEnv.getConfig, @@ -127,8 +129,8 @@ class DataSetSingleRowJoin( val isOuterJoin = joinType match { case JoinRelType.LEFT | JoinRelType.RIGHT => true case _ => false - } - + } + val codeGenerator = new FunctionCodeGenerator( config, isOuterJoin, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala index aa6620df4afd5..a518e30805293 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala @@ -21,14 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset import java.util import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.{RexLiteral, RexNode} -import org.apache.flink.api.common.operators.Order import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.{BatchTableEnvironment, TableException} +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException} import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction} import org.apache.flink.types.Row import org.apache.flink.table.plan.nodes.CommonSort @@ -80,7 +78,9 @@ class DataSetSort( } } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { if (fieldCollations.isEmpty) { throw TableException("Limiting the result without sorting is not allowed " + @@ -89,7 +89,7 @@ class DataSetSort( val config = tableEnv.getConfig - val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val currentParallelism = inputDs.getExecutionEnvironment.getParallelism var partitionedDs = if (currentParallelism == 1) { @@ -132,17 +132,17 @@ class DataSetSort( private val fieldCollations = collations.getFieldCollations.asScala .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) - + override def toString: String = { sortToString(getRowType, collations, offset, fetch) } - - override def explainTerms(pw: RelWriter) : RelWriter = { - sortExplainTerms( + + override def explainTerms(pw: RelWriter): RelWriter = { + sortExplainTerms( super.explainTerms(pw), - getRowType, - collations, - offset, + getRowType, + collations, + offset, fetch) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala index a87c6e3a95e72..1cdd198d627f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.types.Row import scala.collection.JavaConversions._ @@ -77,10 +77,12 @@ class DataSetUnion( getInputs.foldLeft(0.0)(_ + mq.getRowCount(_)) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) leftDataSet.union(rightDataSet) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala index 3a4ba47e18446..03e1caa058879 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.Values import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexLiteral import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.InputFormatCodeGenerator import org.apache.flink.table.runtime.io.ValuesInputFormat @@ -66,7 +66,9 @@ class DataSetValues( super.explainTerms(pw).item("values", valuesFieldsToString) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { val config = tableEnv.getConfig diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index 66dcc56d48d0f..745c4ed708a41 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.AggregationCodeGenerator @@ -105,9 +105,11 @@ class DataSetWindowAggregate( planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } - override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + override def translateToPlan( + tableEnv: BatchTableEnvironment, + queryConfig: BatchQueryConfig): DataSet[Row] = { - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig) val generator = new AggregationCodeGenerator( tableEnv.getConfig,