From 92e2539b4178b3049f97bb1a6137ee91c943870a Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Tue, 22 Mar 2016 18:51:46 +0100 Subject: [PATCH] [FLINK-3587] Bump Calcite version to 1.7.0 - Add DataSetValues and DataStreamValues due to changed Calcite RelNode generation. - Pass TableEnvironment instead of TableConfig for DataSet and DataStream translation. - Add methods to create new DataSources to BatchTableEnvironment and StreamTableEnvironment. - Remove copied Calcite rule that got fixed. --- flink-libraries/flink-table/pom.xml | 2 +- .../java/table/BatchTableEnvironment.scala | 17 ++- .../java/table/StreamTableEnvironment.scala | 17 ++- .../scala/table/BatchTableEnvironment.scala | 18 ++- .../scala/table/StreamTableEnvironment.scala | 19 ++- .../api/table/BatchTableEnvironment.scala | 15 ++- .../api/table/StreamTableEnvironment.scala | 14 +- .../api/table/codegen/CodeGenerator.scala | 11 +- .../plan/nodes/dataset/DataSetAggregate.scala | 12 +- .../plan/nodes/dataset/DataSetCalc.scala | 17 ++- .../plan/nodes/dataset/DataSetJoin.scala | 16 +-- .../table/plan/nodes/dataset/DataSetRel.scala | 11 +- .../plan/nodes/dataset/DataSetSource.scala | 13 +- .../plan/nodes/dataset/DataSetUnion.scala | 12 +- .../plan/nodes/dataset/DataSetValues.scala | 118 +++++++++++++++++ .../nodes/datastream/DataStreamCalc.scala | 9 +- .../plan/nodes/datastream/DataStreamRel.scala | 9 +- .../nodes/datastream/DataStreamSource.scala | 6 +- .../nodes/datastream/DataStreamUnion.scala | 9 +- .../nodes/datastream/DataStreamValues.scala | 75 +++++++++++ .../api/table/plan/rules/FlinkRuleSets.scala | 22 ++-- .../rules/dataSet/DataSetAggregateRule.scala | 2 +- .../plan/rules/dataSet/DataSetCalcRule.scala | 2 +- .../plan/rules/dataSet/DataSetJoinRule.scala | 2 +- .../plan/rules/dataSet/DataSetScanRule.scala | 2 +- .../plan/rules/dataSet/DataSetUnionRule.scala | 2 +- .../rules/dataSet/DataSetValuesRule.scala | 50 +++++++ .../FlinkFilterAggregateTransposeRule.scala | 123 ------------------ .../datastream/DataStreamValuesRule.scala | 50 +++++++ 29 files changed, 476 insertions(+), 199 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 088df98b06399..bb841a3c0e04d 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -111,7 +111,7 @@ under the License. org.apache.calcite calcite-core - 1.5.0 + 1.7.0 diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala index 69bff95436ed2..dd9033ed46080 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala @@ -17,11 +17,12 @@ */ package org.apache.flink.api.java.table +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.table.expressions.ExpressionParser -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.{Row, TableConfig, Table} /** * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]] @@ -162,4 +163,18 @@ class BatchTableEnvironment( translate[T](table)(typeInfo) } + /** + * Creates a [[Row]] [[DataSet]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]]. + * @return A [[Row]] [[DataSet]] created from the [[InputFormat]]. + */ + override private[flink] def createDataSetSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): DataSet[Row] = { + + execEnv.createInput(inputFormat, typeInfo) + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala index 7479426cd8ded..980e45b2d5abb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala @@ -17,9 +17,10 @@ */ package org.apache.flink.api.java.table +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.{Row, TableConfig, Table} import org.apache.flink.api.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -120,4 +121,18 @@ class StreamTableEnvironment( translate[T](table)(typeInfo) } + /** + * Creates a [[Row]] [[DataStream]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]]. + * @return A [[Row]] [[DataStream]] created from the [[InputFormat]]. + */ + override private[flink] def createDataStreamSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): DataStream[Row] = { + + execEnv.createInput(inputFormat, typeInfo) + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala index a18f33863837b..24f23f266860f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala @@ -17,10 +17,12 @@ */ package org.apache.flink.api.scala.table +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ +import org.apache.flink.api.java.{DataSet => JavaSet} import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.{Row, TableConfig, Table} import scala.reflect.ClassTag @@ -139,4 +141,18 @@ class BatchTableEnvironment( wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) } + /** + * Creates a [[Row]] [[JavaSet]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[JavaSet]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[JavaSet]]. + * @return A [[Row]] [[JavaSet]] created from the [[InputFormat]]. + */ + override private[flink] def createDataSetSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): JavaSet[Row] = { + + execEnv.createInput(inputFormat).javaSet + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala index 15ef55e2ecb55..48de95378ab7a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala @@ -17,9 +17,12 @@ */ package org.apache.flink.api.scala.table +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, TableConfig, Table} import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream} import org.apache.flink.streaming.api.scala.asScalaStream @@ -99,4 +102,18 @@ class StreamTableEnvironment( asScalaStream(translate(table)) } + /** + * Creates a [[Row]] [[JavaStream]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[JavaStream]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[JavaStream]]. + * @return A [[Row]] [[JavaStream]] created from the [[InputFormat]]. + */ + override private[flink] def createDataStreamSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): JavaStream[Row] = { + + execEnv.createInput(inputFormat)(typeInfo).javaStream + } + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index b6aa229e8b97e..ade3b4956cef8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.io.DiscardingOutputFormat @@ -34,6 +35,7 @@ import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.DataSetTable +import org.apache.flink.streaming.api.datastream.DataStream /** * The abstract base class for batch TableEnvironments. @@ -226,11 +228,22 @@ abstract class BatchTableEnvironment(config: TableConfig) extends TableEnvironme dataSetPlan match { case node: DataSetRel => node.translateToPlan( - config, + this, Some(tpe.asInstanceOf[TypeInformation[Any]]) ).asInstanceOf[DataSet[A]] case _ => ??? } } + /** + * Creates a [[Row]] [[DataSet]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[DataSet]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[DataSet]]. + * @return A [[Row]] [[DataSet]] created from the [[InputFormat]]. + */ + private[flink] def createDataSetSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): DataSet[Row] + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 7644e6d1f35d1..8724b5abfd54a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.PlanGenException @@ -185,7 +186,7 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm dataStreamPlan match { case node: DataStreamRel => node.translateToPlan( - config, + this, Some(tpe.asInstanceOf[TypeInformation[Any]]) ).asInstanceOf[DataStream[A]] case _ => ??? @@ -193,4 +194,15 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm } + /** + * Creates a [[Row]] [[DataStream]] from an [[InputFormat]]. + * + * @param inputFormat [[InputFormat]] from which the [[DataStream]] is created. + * @param typeInfo [[TypeInformation]] of the type of the [[DataStream]]. + * @return A [[Row]] [[DataStream]] created from the [[InputFormat]]. + */ + private[flink] def createDataStreamSource( + inputFormat: InputFormat[Row, _], + typeInfo: TypeInformation[Row]): DataStream[Row] + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 7b3b02a5af4c8..d41674c6846c0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -580,7 +580,14 @@ class CodeGenerator( case VARCHAR | CHAR => generateNonNullLiteral(resultType, "\"" + value.toString + "\"") case SYMBOL => - val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal() + + val symbolOrdinal = + if (classOf[Enum[_]].isAssignableFrom(value.getClass) ) { + value.asInstanceOf[Enum[_]].ordinal() + } else { + value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal() + } + generateNonNullLiteral(resultType, symbolOrdinal.toString) case _ => ??? // TODO more types } @@ -747,6 +754,8 @@ class CodeGenerator( override def visitOver(over: RexOver): GeneratedExpression = ??? + override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression = ??? + // ---------------------------------------------------------------------------------------------- // generator helping methods // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 6e421304c43d3..12095a268de55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.table.runtime.MapRunner import org.apache.flink.api.table.runtime.aggregate.AggregateUtil import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo} -import org.apache.flink.api.table.{Row, TableConfig} +import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig} import scala.collection.JavaConverters._ @@ -76,26 +76,28 @@ class DataSetAggregate( .item("select", aggregationToString) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val child = this.getInput - val rowCnt = RelMetadataQuery.getRowCount(child) + val rowCnt = metadata.getRowCount(child) val rowSize = this.estimateRowSize(child.getRowType) val aggCnt = this.namedAggregates.size planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } override def translateToPlan( - config: TableConfig, + tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + val config = tableEnv.getConfig + val groupingKeys = grouping.indices.toArray // add grouping fields, position keys in the input, and input type val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, inputType, rowType, grouping, config) val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( - config, + tableEnv, // tell the input operator that this operator currently only supports Rows as input Some(TypeConverter.DEFAULT_ROW_TYPE)) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala index 67daffccde74e..13bb39d3da0fe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.plan.nodes.FlinkCalc import org.apache.flink.api.table.typeutils.TypeConverter import TypeConverter._ -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.BatchTableEnvironment import org.apache.calcite.rex._ /** @@ -69,17 +69,17 @@ class DataSetCalc( calcProgram.getCondition != null) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val child = this.getInput - val rowCnt = RelMetadataQuery.getRowCount(child) + val rowCnt = metadata.getRowCount(child) val exprCnt = calcProgram.getExprCount planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0) } - override def getRows: Double = { + override def estimateRowCount(metadata: RelMetadataQuery): Double = { val child = this.getInput - val rowCnt = RelMetadataQuery.getRowCount(child) + val rowCnt = metadata.getRowCount(child) if (calcProgram.getCondition != null) { // we reduce the result card to push filters down @@ -89,10 +89,13 @@ class DataSetCalc( } } - override def translateToPlan(config: TableConfig, + override def translateToPlan( + tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) + val config = tableEnv.getConfig + + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val returnType = determineReturnType( getRowType, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index de5489749ffba..61e8995f11677 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinInfo -import org.apache.calcite.rel.logical.LogicalJoin import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} import org.apache.calcite.sql.fun.SqlStdOperatorTable @@ -31,10 +30,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.runtime.FlatJoinRunner import org.apache.flink.api.table.typeutils.TypeConverter -import org.apache.flink.api.table.{TableException, TableConfig} +import org.apache.flink.api.table.{BatchTableEnvironment, TableException} import org.apache.flink.api.common.functions.FlatJoinFunction import TypeConverter.determineReturnType import scala.collection.mutable.ArrayBuffer @@ -92,7 +90,7 @@ class DataSetJoin( .item("join", joinSelectionToString) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { if (!translatable) { // join cannot be translated. Make huge costs @@ -101,7 +99,7 @@ class DataSetJoin( // join can be translated. Compute cost estimate val children = this.getInputs children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) => - val rowCnt = RelMetadataQuery.getRowCount(child) + val rowCnt = metadata.getRowCount(child) val rowSize = this.estimateRowSize(child.getRowType) cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) } @@ -110,9 +108,11 @@ class DataSetJoin( } override def translateToPlan( - config: TableConfig, + tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + val config = tableEnv.getConfig + val returnType = determineReturnType( getRowType, expectedType, @@ -156,8 +156,8 @@ class DataSetJoin( }) } - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val generator = new CodeGenerator(config, leftDataSet.getType, Some(rightDataSet.getType)) val conversion = generator.generateConverterResultExpression( diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index eaf6b266b3a9b..e8f81fd53cb8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.{BatchTableEnvironment, TableEnvironment, TableConfig} import org.apache.flink.api.table.plan.nodes.FlinkRel import scala.collection.JavaConversions._ @@ -31,9 +31,9 @@ import scala.collection.JavaConversions._ trait DataSetRel extends RelNode with FlinkRel { /** - * Translates the FlinkRelNode into a Flink operator. + * Translates the [[DataSetRel]] node into a [[DataSet]] operator. * - * @param config runtime configuration + * @param tableEnv [[org.apache.flink.api.table.BatchTableEnvironment]] of the translated Table. * @param expectedType specifies the type the Flink operator should return. The type must * have the same arity as the result. For instance, if the * expected type is a RowTypeInfo this method will return a DataSet of @@ -42,9 +42,8 @@ trait DataSetRel extends RelNode with FlinkRel { * @return DataSet of type expectedType or RowTypeInfo */ def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]] = None) - : DataSet[Any] + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any] private[flink] def estimateRowSize(rowType: RelDataType): Double = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala index 6f94251d93fe4..01d71b7d75188 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.PojoTypeInfo -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.BatchTableEnvironment import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.typeutils.TypeConverter import TypeConverter.determineReturnType @@ -67,16 +67,17 @@ class DataSetSource( s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))" } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { - val rowCnt = RelMetadataQuery.getRowCount(this) + val rowCnt = metadata.getRowCount(this) planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) } override def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]]) - : DataSet[Any] = { + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + val config = tableEnv.getConfig val inputDataSet: DataSet[Any] = dataSetTable.dataSet val inputType = inputDataSet.getType diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index e4e0a145c132b..b6f6a19ccd5f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ @@ -62,22 +62,22 @@ class DataSetUnion( super.explainTerms(pw).item("union", unionSelectionToString) } - override def computeSelfCost (planner: RelOptPlanner): RelOptCost = { + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val children = this.getInputs val rowCnt = children.foldLeft(0D) { (rows, child) => - rows + RelMetadataQuery.getRowCount(child) + rows + metadata.getRowCount(child) } planner.getCostFactory.makeCost(rowCnt, 0, 0) } override def translateToPlan( - config: TableConfig, + tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(config) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(config) + val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala new file mode 100644 index 0000000000000..9dfd34603612c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.rel.{RelWriter, RelNode} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Values +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala._ +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig} + +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + +/** + * DataSet RelNode for a LogicalValues. + * + */ +class DataSetValues( + cluster: RelOptCluster, + traitSet: RelTraitSet, + rowType: RelDataType, + tuples: ImmutableList[ImmutableList[RexLiteral]]) + extends Values(cluster, rowType, tuples, traitSet) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetValues( + cluster, + traitSet, + rowType, + tuples + ) + } + + override def toString: String = { + "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("values", valuesFieldsToString) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + val config = tableEnv.getConfig + + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo] + + val inputFormat = new ValuesInputFormat(tuples) + + tableEnv.createDataSetSource(inputFormat, returnType).asInstanceOf[DataSet[Any]] + } + + private def valuesFieldsToString: String = { + rowType.getFieldNames.asScala.toList.mkString(", ") + } + +} + +class ValuesInputFormat(val tuples: ImmutableList[ImmutableList[RexLiteral]]) + extends GenericInputFormat[Row] + with NonParallelInput { + + var readIdx = 0 + + override def reachedEnd(): Boolean = readIdx == tuples.size() + + override def nextRecord(reuse: Row): Row = { + + if (readIdx == tuples.size()) { + return null + } + + val t = tuples.get(readIdx) + readIdx += 1 + + var i = 0 + for(f <- t) { + reuse.setField(i, f.getValue) + i += 1 + } + reuse + } +} + + diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala index fb058f34870ad..6dfcd03ad6754 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.RexProgram import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.StreamTableEnvironment import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.plan.nodes.FlinkCalc import org.apache.flink.api.table.typeutils.TypeConverter._ @@ -68,10 +68,13 @@ class DataStreamCalc( calcProgram.getCondition != null) } - override def translateToPlan(config: TableConfig, + override def translateToPlan( + tableEnv: StreamTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(config) + val config = tableEnv.getConfig + + val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val returnType = determineReturnType( getRowType, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala index 0673a35128074..6cf13a54ce14d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.plan.nodes.datastream import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig} import org.apache.flink.api.table.plan.nodes.FlinkRel import org.apache.flink.streaming.api.datastream.DataStream @@ -29,7 +29,7 @@ trait DataStreamRel extends RelNode with FlinkRel { /** * Translates the FlinkRelNode into a Flink operator. * - * @param config runtime configuration + * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. * @param expectedType specifies the type the Flink operator should return. The type must * have the same arity as the result. For instance, if the * expected type is a RowTypeInfo this method will return a DataSet of @@ -38,9 +38,8 @@ trait DataStreamRel extends RelNode with FlinkRel { * @return DataStream of type expectedType or RowTypeInfo */ def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]] = None) - : DataStream[Any] + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala index 314759cc26731..9fc4d5e928d2b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.PojoTypeInfo -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.StreamTableEnvironment import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType import org.apache.flink.api.table.plan.schema.DataStreamTable @@ -60,10 +60,12 @@ class DataStreamSource( } override def translateToPlan( - config: TableConfig, + tableEnv: StreamTableEnvironment, expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = { + val config = tableEnv.getConfig + val inputDataStream: DataStream[Any] = dataStreamTable.dataStream val inputType = inputDataStream.getType diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala index 8c9cca07de93f..e72e9a82cebab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala @@ -22,7 +22,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelNode, RelWriter, BiRel} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.StreamTableEnvironment import org.apache.flink.streaming.api.datastream.DataStream import scala.collection.JavaConverters._ @@ -60,11 +60,12 @@ class DataStreamUnion( "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))" } - override def translateToPlan(config: TableConfig, + override def translateToPlan( + tableEnv: StreamTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { - val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(config) - val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(config) + val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) leftDataSet.union(rightDataSet) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala new file mode 100644 index 0000000000000..3e64e649a98a7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Values +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.nodes.dataset.ValuesInputFormat +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + + +/** + * DataStream RelNode for LogicalValues. + */ +class DataStreamValues( + cluster: RelOptCluster, + traitSet: RelTraitSet, + rowType: RelDataType, + tuples: ImmutableList[ImmutableList[RexLiteral]]) + extends Values(cluster, rowType, tuples, traitSet) + with DataStreamRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamValues( + cluster, + traitSet, + rowType, + tuples + ) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]) : DataStream[Any] = { + + val config = tableEnv.getConfig + + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo] + + val inputFormat = new ValuesInputFormat(tuples) + + tableEnv.createDataStreamSource(inputFormat, returnType).asInstanceOf[DataStream[Any]] + + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 427530b887974..e9bcaa2556e60 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -21,9 +21,7 @@ package org.apache.flink.api.table.plan.rules import org.apache.calcite.rel.rules._ import org.apache.calcite.tools.{RuleSets, RuleSet} import org.apache.flink.api.table.plan.rules.dataSet._ -import org.apache.flink.api.table.plan.rules.datastream.DataStreamCalcRule -import org.apache.flink.api.table.plan.rules.datastream.DataStreamScanRule -import org.apache.flink.api.table.plan.rules.datastream.DataStreamUnionRule +import org.apache.flink.api.table.plan.rules.datastream._ object FlinkRuleSets { @@ -41,7 +39,7 @@ object FlinkRuleSets { // push filter into the children of a join FilterJoinRule.JOIN, // push filter through an aggregation - FlinkFilterAggregateTransposeRule.INSTANCE, + FilterAggregateTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, @@ -100,7 +98,8 @@ object FlinkRuleSets { DataSetCalcRule.INSTANCE, DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, - DataSetUnionRule.INSTANCE + DataSetUnionRule.INSTANCE, + DataSetValuesRule.INSTANCE ) /** @@ -108,11 +107,6 @@ object FlinkRuleSets { */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( - // translate to DataStream nodes - DataStreamCalcRule.INSTANCE, - DataStreamScanRule.INSTANCE, - DataStreamUnionRule.INSTANCE, - // calc rules FilterToCalcRule.INSTANCE, ProjectToCalcRule.INSTANCE, @@ -134,7 +128,13 @@ object FlinkRuleSets { ProjectRemoveRule.INSTANCE, // merge and push unions rules - UnionEliminatorRule.INSTANCE + UnionEliminatorRule.INSTANCE, + + // translate to DataStream nodes + DataStreamCalcRule.INSTANCE, + DataStreamScanRule.INSTANCE, + DataStreamUnionRule.INSTANCE, + DataStreamValuesRule.INSTANCE ) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala index e6a699364f592..0449fc33435f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -30,7 +30,7 @@ class DataSetAggregateRule classOf[LogicalAggregate], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkAggregateRule") + "DataSetAggregateRule") { def convert(rel: RelNode): RelNode = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala index 3821024bfc04b..88e74a9aca8bc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala @@ -29,7 +29,7 @@ class DataSetCalcRule classOf[LogicalCalc], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkCalcRule") + "DataSetCalcRule") { def convert(rel: RelNode): RelNode = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala index 89d33c9dd7208..55100d20af752 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala @@ -32,7 +32,7 @@ class DataSetJoinRule classOf[LogicalJoin], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkJoinRule") + "DataSetJoinRule") { /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala index 3cdaca32c8492..f95f6ea44fd57 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala @@ -31,7 +31,7 @@ class DataSetScanRule classOf[LogicalTableScan], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkScanRule") + "DataSetScanRule") { /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala index cd1de1e71a83f..7809d6d1ac1ec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala @@ -29,7 +29,7 @@ class DataSetUnionRule classOf[LogicalUnion], Convention.NONE, DataSetConvention.INSTANCE, - "FlinkUnionRule") + "DataSetUnionRule") { /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala new file mode 100644 index 0000000000000..c28b458980bb5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetValuesRule.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{RelOptRule, RelTraitSet, Convention} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetValues, DataSetConvention} + +class DataSetValuesRule + extends ConverterRule( + classOf[LogicalValues], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetValuesRule") +{ + + def convert(rel: RelNode): RelNode = { + + val values: LogicalValues = rel.asInstanceOf[LogicalValues] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + + new DataSetValues( + rel.getCluster, + traitSet, + rel.getRowType, + values.getTuples) + } +} + +object DataSetValuesRule { + val INSTANCE: RelOptRule = new DataSetValuesRule +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala deleted file mode 100644 index c27fc75baff1e..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkFilterAggregateTransposeRule.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataSet - -import org.apache.calcite.plan.RelOptRule -import org.apache.calcite.plan.RelOptRuleCall -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.Aggregate -import org.apache.calcite.rel.core.Filter -import org.apache.calcite.rel.core.RelFactories -import org.apache.calcite.rex.RexNode -import org.apache.calcite.tools.RelBuilder -import org.apache.calcite.tools.RelBuilderFactory -import org.apache.calcite.util.ImmutableBitSet -import com.google.common.collect.ImmutableList - -import scala.collection.JavaConversions._ - -/** - * This rule is a (fixed) copy of the FilterAggregateTransposeRule of Apache Calcite. - * - * A fix for this rule is contained Calcite's master branch. - * This custom rule can be removed once Calcite 1.7 is released and our dependency adjusted. - * - * Planner rule that pushes a `org.apache.calcite.rel.core.Filter` - * past a `org.apache.calcite.rel.core.Aggregate`. - * - * @see org.apache.calcite.rel.rules.AggregateFilterTransposeRule - */ -class FlinkFilterAggregateTransposeRule( - filterClass: Class[_ <: Filter], - builderFactory: RelBuilderFactory, - aggregateClass: Class[_ <: Aggregate]) - extends RelOptRule( - RelOptRule.operand(filterClass, RelOptRule.operand(aggregateClass, RelOptRule.any)), - builderFactory, - null) -{ - - def onMatch(call: RelOptRuleCall) { - val filterRel: Filter = call.rel(0) - val aggRel: Aggregate = call.rel(1) - val conditions = RelOptUtil.conjunctions(filterRel.getCondition).toList - val rexBuilder = filterRel.getCluster.getRexBuilder - val origFields = aggRel.getRowType.getFieldList.toList - - // Fixed computation of adjustments - val adjustments = aggRel.getGroupSet.asList().zipWithIndex.map { - case (g, i) => g - i - }.toArray - - var pushedConditions: List[RexNode] = Nil - var remainingConditions: List[RexNode] = Nil - - for (condition <- conditions) { - val rCols: ImmutableBitSet = RelOptUtil.InputFinder.bits(condition) - if (canPush(aggRel, rCols)) { - pushedConditions = condition.accept( - new RelOptUtil.RexInputConverter( - rexBuilder, - origFields, - aggRel.getInput(0).getRowType.getFieldList, - adjustments)) :: pushedConditions - } - else { - remainingConditions = condition :: remainingConditions - } - } - val builder: RelBuilder = call.builder - var rel: RelNode = builder.push(aggRel.getInput).filter(pushedConditions).build - if (rel eq aggRel.getInput(0)) { - return - } - rel = aggRel.copy(aggRel.getTraitSet, ImmutableList.of(rel)) - rel = builder.push(rel).filter(remainingConditions).build - call.transformTo(rel) - } - - private def canPush(aggregate: Aggregate, rCols: ImmutableBitSet): Boolean = { - val groupKeys: ImmutableBitSet = ImmutableBitSet.range(0, aggregate.getGroupSet.cardinality) - if (!groupKeys.contains(rCols)) { - return false - } - if (aggregate.indicator) { - import scala.collection.JavaConversions._ - for (groupingSet <- aggregate.getGroupSets) { - if (!groupingSet.contains(rCols)) { - return false - } - } - } - true - } -} - -object FlinkFilterAggregateTransposeRule { - /** The default instance of `FilterAggregateTransposeRule`. - * - * It matches any kind of agg. or filter - */ - val INSTANCE: FlinkFilterAggregateTransposeRule = - new FlinkFilterAggregateTransposeRule( - classOf[Filter], - RelFactories.LOGICAL_BUILDER, - classOf[Aggregate]) -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala new file mode 100644 index 0000000000000..fa2b428e95e2c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamValuesRule.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention} + +class DataStreamValuesRule + extends ConverterRule( + classOf[LogicalValues], + Convention.NONE, + DataStreamConvention.INSTANCE, + "DataStreamValuesRule") +{ + + def convert(rel: RelNode): RelNode = { + + val values: LogicalValues = rel.asInstanceOf[LogicalValues] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + + new DataStreamValues( + rel.getCluster, + traitSet, + rel.getRowType, + values.getTuples) + } +} + +object DataStreamValuesRule { + val INSTANCE: RelOptRule = new DataStreamValuesRule +}