From 31178370c6600273d507a275ab544888028e31a0 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Wed, 7 Dec 2016 10:57:04 +0300 Subject: [PATCH 01/16] [FLINK-5303] Added GROUPING SETS implementation. --- .../api/table/expressions/functions.scala | 42 +++++++++++ .../table/expressions/ExpressionParser.scala | 18 ++++- .../rules/dataSet/DataSetAggregateRule.scala | 62 ++++++++++------ .../DataSetAggregateWithNullValuesRule.scala | 50 ++++++++----- .../datastream/DataStreamAggregateRule.scala | 71 +++++++++++++------ .../aggregate/AggregateMapFunction.scala | 6 +- .../table/validate/FunctionCatalog.scala | 7 ++ .../scala/batch/sql/AggregationsITCase.scala | 19 +++-- 8 files changed, 204 insertions(+), 71 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala new file mode 100644 index 0000000000000..c57c07ed775a6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala @@ -0,0 +1,42 @@ +package org.apache.flink.api.table.expressions + +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder + +abstract sealed class GroupingFunction extends Expression { + + override def toString = s"Grouping($children)" + + override private[flink] def resultType = + throw new UnsupportedOperationException("A grouping function has no result type.") +} + +case class Grouping(children: List[Expression]) extends GroupingFunction { + + /** + * Convert Expression to its counterpart in Calcite, i.e. RexNode + */ + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { + relBuilder.call(SqlStdOperatorTable.GROUPING, children.map(_.toRexNode): _*) + } +} + +case class GroupingId(children: List[Expression]) extends GroupingFunction { + + /** + * Convert Expression to its counterpart in Calcite, i.e. RexNode + */ + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { + relBuilder.call(SqlStdOperatorTable.GROUPING_ID, children.map(_.toRexNode): _*) + } +} + +case class GroupId(children: List[Expression]) extends GroupingFunction { + + /** + * Convert Expression to its counterpart in Calcite, i.e. RexNode + */ + override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { + relBuilder.call(SqlStdOperatorTable.GROUP_ID, children.map(_.toRexNode): _*) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index 48dbce6f41598..98bbf38525e6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -51,6 +51,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val ARRAY: Keyword = Keyword("Array") lazy val AS: Keyword = Keyword("as") lazy val COUNT: Keyword = Keyword("count") + lazy val GROUP_ID: Keyword = Keyword("group_id") + lazy val GROUPING: Keyword = Keyword("grouping") + lazy val GROUPING_ID: Keyword = Keyword("grouping_id") lazy val AVG: Keyword = Keyword("avg") lazy val MIN: Keyword = Keyword("min") lazy val MAX: Keyword = Keyword("max") @@ -89,7 +92,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val FLATTEN: Keyword = Keyword("flatten") def functionIdent: ExpressionParser.Parser[String] = - not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ + not(GROUP_ID) ~ not(GROUPING) ~ not(GROUPING_ID) ~ + not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~> super.ident @@ -302,6 +306,15 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixArray: PackratParser[Expression] = ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) } + lazy val prefixGroupId: PackratParser[Expression] = + GROUP_ID ~ "(" ~> expressionList <~ ")" ^^ (ee => GroupId(ee)) + + lazy val prefixGroupingId: PackratParser[Expression] = + GROUPING_ID ~ "(" ~> expressionList <~ ")" ^^ (ee => GroupingId(ee)) + + lazy val prefixGrouping: PackratParser[Expression] = + GROUPING ~ "(" ~> expressionList <~ ")" ^^ (ee => Grouping(ee)) + lazy val prefixSum: PackratParser[Expression] = SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) } @@ -376,7 +389,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } lazy val prefixed: PackratParser[Expression] = - prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | + prefixGroupId | prefixGroupingId | prefixGrouping | + prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | prefixFlattening | diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index d634a6cad6d48..9b7bfbfcc35ef 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -21,19 +21,17 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion} import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} - +import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion} import scala.collection.JavaConversions._ class DataSetAggregateRule extends ConverterRule( - classOf[LogicalAggregate], - Convention.NONE, - DataSetConvention.INSTANCE, - "DataSetAggregateRule") - { + classOf[LogicalAggregate], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetAggregateRule") { override def matches(call: RelOptRuleCall): Boolean = { val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] @@ -50,13 +48,7 @@ class DataSetAggregateRule throw TableException("DISTINCT aggregates are currently not supported.") } - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } - - !distinctAggs && !groupSets && !agg.indicator + !distinctAggs } override def convert(rel: RelNode): RelNode = { @@ -64,16 +56,40 @@ class DataSetAggregateRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - new DataSetAggregate( - rel.getCluster, - traitSet, - convInput, - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray) + if (agg.indicator) { + agg.groupSets.map(set => + new DataSetAggregate( + rel.getCluster, + traitSet, + convInput, + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + set.toArray + ).asInstanceOf[RelNode] + ).reduce( + (rel1, rel2) => { + new DataSetUnion( + rel.getCluster, + traitSet, + rel1, rel2, + rel.getRowType + ) + } + ) + } else { + new DataSetAggregate( + rel.getCluster, + traitSet, + convInput, + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + agg.getGroupSet.toArray + ) } } +} object DataSetAggregateRule { val INSTANCE: RelOptRule = new DataSetAggregateRule diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala index b708af4dbf139..7871c00ad6001 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala @@ -26,7 +26,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion, LogicalValues} import org.apache.calcite.rex.RexLiteral import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} +import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion} /** * Rule for insert [[org.apache.flink.types.Row]] with null records into a [[DataSetAggregate]] @@ -55,12 +55,7 @@ class DataSetAggregateWithNullValuesRule throw TableException("DISTINCT aggregates are currently not supported.") } - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } - !distinctAggs && !groupSets && !agg.indicator + !distinctAggs } override def convert(rel: RelNode): RelNode = { @@ -80,15 +75,38 @@ class DataSetAggregateWithNullValuesRule val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals) val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true) - new DataSetAggregate( - cluster, - traitSet, - RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE), - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray - ) + if (agg.indicator) { + agg.groupSets.map(set => + new DataSetAggregate( + cluster, + traitSet, + RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE), + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + set.toArray + ).asInstanceOf[RelNode] + ).reduce( + (rel1, rel2) => { + new DataSetUnion( + rel.getCluster, + traitSet, + rel1, rel2, + rel.getRowType + ) + } + ) + } else { + new DataSetAggregate( + cluster, + traitSet, + RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE), + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + agg.getGroupSet.toArray + ) + } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index bf8a18ef3a9d6..eb45bd90135fc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -22,19 +22,17 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.api.TableException -import org.apache.flink.table.expressions.Alias import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate -import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention, DataStreamUnion} import scala.collection.JavaConversions._ class DataStreamAggregateRule extends ConverterRule( - classOf[LogicalWindowAggregate], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamAggregateRule") - { + classOf[LogicalWindowAggregate], + Convention.NONE, + DataStreamConvention.INSTANCE, + "DataStreamAggregateRule") { override def matches(call: RelOptRuleCall): Boolean = { val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate] @@ -45,13 +43,14 @@ class DataStreamAggregateRule throw TableException("DISTINCT aggregates are currently not supported.") } - // check if we have grouping sets - val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet - if (groupSets || agg.indicator) { - throw TableException("GROUPING SETS are currently not supported.") - } + // // check if we have grouping sets + // val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet + // if (groupSets || agg.indicator) { + // throw TableException("GROUPING SETS are currently not supported.") + // } - !distinctAggs && !groupSets && !agg.indicator + !distinctAggs + // && !groupSets && !agg.indicator } override def convert(rel: RelNode): RelNode = { @@ -59,18 +58,44 @@ class DataStreamAggregateRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE) - new DataStreamAggregate( - agg.getWindow, - agg.getNamedProperties, - rel.getCluster, - traitSet, - convInput, - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.getGroupSet.toArray) + if (agg.indicator) { + agg.groupSets.map(set => + new DataStreamAggregate( + agg.getWindow, + agg.getNamedProperties, + rel.getCluster, + traitSet, + convInput, + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + set.toArray + ).asInstanceOf[RelNode] + ).reduce( + (rel1, rel2) => { + new DataStreamUnion( + rel.getCluster, + traitSet, + rel1, rel2, + rel.getRowType + ) + } + ) + } else { + new DataStreamAggregate( + agg.getWindow, + agg.getNamedProperties, + rel.getCluster, + traitSet, + convInput, + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + agg.getGroupSet.toArray + ) } } +} object DataStreamAggregateRule { val INSTANCE: RelOptRule = new DataStreamAggregateRule diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala index 21a96e0923e6b..0033ff73251d6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala @@ -37,7 +37,7 @@ class AggregateMapFunction[IN, OUT]( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.size == aggFields.size) + Preconditions.checkArgument(aggregates.length == aggFields.length) val partialRowLength = groupingKeys.length + aggregates.map(_.intermediateDataType.length).sum output = new Row(partialRowLength) @@ -46,11 +46,11 @@ class AggregateMapFunction[IN, OUT]( override def map(value: IN): OUT = { val input = value.asInstanceOf[Row] - for (i <- 0 until aggregates.length) { + for (i <- aggregates.indices) { val fieldValue = input.getField(aggFields(i)) aggregates(i).prepare(fieldValue, output) } - for (i <- 0 until groupingKeys.length) { + for (i <- groupingKeys.indices) { output.setField(i, input.getField(groupingKeys(i))) } output.asInstanceOf[OUT] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index f92b3a18f69ac..ffc6ec839bc39 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -147,6 +147,10 @@ object FunctionCatalog { "isNotTrue" -> classOf[IsNotTrue], "isNotFalse" -> classOf[IsNotFalse], + "groupId" -> classOf[GroupId], + "grouping" -> classOf[Grouping], + "groupingId" -> classOf[GroupingId], + // aggregate functions "avg" -> classOf[Avg], "count" -> classOf[Count], @@ -257,6 +261,9 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.NOT, SqlStdOperatorTable.UNARY_MINUS, SqlStdOperatorTable.UNARY_PLUS, + SqlStdOperatorTable.GROUP_ID, + SqlStdOperatorTable.GROUPING, + SqlStdOperatorTable.GROUPING_ID, // AGGREGATE OPERATORS SqlStdOperatorTable.SUM, SqlStdOperatorTable.COUNT, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala index 4f55beea13aed..c1960c4d1f04f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala @@ -245,19 +245,30 @@ class AggregationsITCase( tEnv.sql(sqlQuery).toDataSet[Row] } - @Test(expected = classOf[TableException]) + @Test def testGroupingSetAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT _2, _3, avg(_1) as a FROM MyTable GROUP BY GROUPING SETS (_2, _3)" + val sqlQuery = "SELECT _2, _3, avg(_1) as a, GROUPING_ID(_2, _3) as g FROM MyTable GROUP BY GROUPING SETS (_2, _3)" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) - // must fail. grouping sets are not supported - tEnv.sql(sqlQuery).toDataSet[Row] + val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + + val expected = + "6,null,18,1,\n5,null,13,1,\n4,null,8,1,\n3,null,5,1,\n2,null,2,1,\n1,null,1,1,\n" + + "null,Luke Skywalker,6,2,\nnull,I am fine.,5,2,\nnull,Hi,1,2,\n" + + "null,Hello world, how are you?,4,2,\nnull,Hello world,3,2,\nnull,Hello,2,2,\n" + + "null,Comment#9,15,2,\nnull,Comment#8,14,2,\nnull,Comment#7,13,2,\n" + + "null,Comment#6,12,2,\nnull,Comment#5,11,2,\nnull,Comment#4,10,2,\n" + + "null,Comment#3,9,2,\nnull,Comment#2,8,2,\nnull,Comment#15,21,2,\n" + + "null,Comment#14,20,2,\nnull,Comment#13,19,2,\nnull,Comment#12,18,2,\n" + + "null,Comment#11,17,2,\nnull,Comment#10,16,2,\nnull,Comment#1,7,0" + + TestBaseUtils.compareResultAsText(result.asJava, expected) } @Test From 8d4923bc131ac9407c2ffc7fa9e12db0161ecb37 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Wed, 7 Dec 2016 17:23:35 +0300 Subject: [PATCH 02/16] [FLINK-5303] Fixed grouping sets implementation. --- .../api/table/expressions/functions.scala | 22 +++++++- .../plan/nodes/dataset/DataSetAggregate.scala | 37 +++++++++++-- .../datastream/DataStreamAggregate.scala | 35 +++++++++++- .../rules/dataSet/DataSetAggregateRule.scala | 6 +- .../DataSetAggregateWithNullValuesRule.scala | 6 +- .../datastream/DataStreamAggregateRule.scala | 6 +- .../aggregate/GroupingsMapFunction.scala | 55 +++++++++++++++++++ .../scala/batch/sql/AggregationsITCase.scala | 20 ++++--- 8 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala index c57c07ed775a6..3b3c87898e800 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala @@ -1,14 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.api.table.expressions import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo abstract sealed class GroupingFunction extends Expression { override def toString = s"Grouping($children)" - override private[flink] def resultType = - throw new UnsupportedOperationException("A grouping function has no result type.") + override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO } case class Grouping(children: List[Expression]) extends GroupingFunction { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index a5c42d92d8ab1..c32061c9005f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -23,18 +23,20 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkAggregate -import org.apache.flink.table.runtime.aggregate.AggregateUtil +import org.apache.flink.table.runtime.aggregate.{AggregateUtil, GroupingsMapFunction} import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.types.Row import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer /** * Flink RelNode which matches along with a LogicalAggregate. @@ -46,7 +48,8 @@ class DataSetAggregate( namedAggregates: Seq[CalcitePair[AggregateCall, String]], rowRelDataType: RelDataType, inputType: RelDataType, - grouping: Array[Int]) + grouping: Array[Int], + indicator: Boolean) extends SingleRel(cluster, traitSet, inputNode) with FlinkAggregate with DataSetRel { @@ -61,7 +64,9 @@ class DataSetAggregate( namedAggregates, getRowType, inputType, - grouping) + grouping, + indicator + ) } override def toString: String = { @@ -124,7 +129,7 @@ class DataSetAggregate( val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) - val result = { + val reducedInput = { if (groupingKeys.length > 0) { // grouped aggregation val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + @@ -148,6 +153,30 @@ class DataSetAggregate( } } + var result = reducedInput + if (indicator) { + val fields = rowRelDataType.getFieldList.asScala.toList + var mapping = ArrayBuffer[(Int, Int)]() + for (i <- fields.indices) { + for (j <- fields.indices) { + if (fields(j).getName.equals("i$" + fields(i).getName)) { + mapping += ((i, j)) + } + } + } + + if (mapping.nonEmpty) { + val mapFunction = new GroupingsMapFunction[Row, Row](mapping.toArray, rowTypeInfo) + + val prepareOpName = s"prepare grouping sets" + result = reducedInput + .asInstanceOf[DataSet[Row]] + .map(mapFunction) + .name(prepareOpName) + .asInstanceOf[DataSet[Any]] + } + } + // if the expected type is not a Row, inject a mapper to convert to the expected type expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala index 9902486d0a8b5..830b3a25f949d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.StreamTableEnvironment import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer class DataStreamAggregate( window: LogicalWindow, @@ -53,7 +54,8 @@ class DataStreamAggregate( namedAggregates: Seq[CalcitePair[AggregateCall, String]], rowRelDataType: RelDataType, inputType: RelDataType, - grouping: Array[Int]) + grouping: Array[Int], + indicator: Boolean) extends SingleRel(cluster, traitSet, inputNode) with FlinkAggregate with DataStreamRel { @@ -70,7 +72,9 @@ class DataStreamAggregate( namedAggregates, getRowType, inputType, - grouping) + grouping, + indicator + ) } override def toString: String = { @@ -142,7 +146,7 @@ class DataStreamAggregate( val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - val result: DataStream[Any] = { + val intermediateStream: DataStream[Any] = { // check whether all aggregates support partial aggregate if (AggregateUtil.doAllSupportPartialAggregation( namedAggregates.map(_.getKey), @@ -242,6 +246,31 @@ class DataStreamAggregate( } } } + + var result = intermediateStream + if (indicator) { + val fields = rowRelDataType.getFieldList.asScala.toList + var mapping = ArrayBuffer[(Int, Int)]() + for (i <- fields.indices) { + for (j <- fields.indices) { + if (fields(j).getName.equals("i$" + fields(i).getName)) { + mapping += ((i, j)) + } + } + } + + if (mapping.nonEmpty) { + val mapFunction = new GroupingsMapFunction[Row, Row](mapping.toArray, rowTypeInfo) + + val prepareOpName = s"prepare grouping sets" + result = intermediateStream + .asInstanceOf[DataStream[Row]] + .map(mapFunction) + .name(prepareOpName) + .asInstanceOf[DataStream[Any]] + } + } + // if the expected type is not a Row, inject a mapper to convert to the expected type expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index 9b7bfbfcc35ef..f94c45133343e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -65,7 +65,8 @@ class DataSetAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray + set.toArray, + agg.indicator ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -85,7 +86,8 @@ class DataSetAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray + agg.getGroupSet.toArray, + agg.indicator ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala index 7871c00ad6001..0482fb4461d4f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala @@ -84,7 +84,8 @@ class DataSetAggregateWithNullValuesRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray + set.toArray, + agg.indicator ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -104,7 +105,8 @@ class DataSetAggregateWithNullValuesRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray + agg.getGroupSet.toArray, + agg.indicator ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index eb45bd90135fc..16cfcbd64d682 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -69,7 +69,8 @@ class DataStreamAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray + set.toArray, + agg.indicator ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -91,7 +92,8 @@ class DataStreamAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray + agg.getGroupSet.toArray, + agg.indicator ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala new file mode 100644 index 0000000000000..564984d388133 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +class GroupingsMapFunction[IN, OUT](private val mappings: Array[(Int, Int)], + @transient private val returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] { + + private var output: Row = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(mappings) + output = new Row(returnType.getTotalFields) + } + + override def map(value: IN): OUT = { + + val input = value.asInstanceOf[Row] + for (i <- 0 until returnType.getTotalFields) { + output.setField(i, input.getField(i)) + } + mappings.foreach { + case (inputIndex, outputIndex) => + output.setField(outputIndex, input.getField(inputIndex) == null) + } + output.asInstanceOf[OUT] + } + + override def getProducedType: TypeInformation[OUT] = { + returnType + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala index c1960c4d1f04f..a748f09d9cf6e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala @@ -251,22 +251,24 @@ class AggregationsITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT _2, _3, avg(_1) as a, GROUPING_ID(_2, _3) as g FROM MyTable GROUP BY GROUPING SETS (_2, _3)" + val sqlQuery = + "SELECT _2, _3, avg(_1) as a, GROUP_ID() as g FROM MyTable GROUP BY GROUPING SETS (_2, _3)" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + result.foreach(println(_)) val expected = - "6,null,18,1,\n5,null,13,1,\n4,null,8,1,\n3,null,5,1,\n2,null,2,1,\n1,null,1,1,\n" + - "null,Luke Skywalker,6,2,\nnull,I am fine.,5,2,\nnull,Hi,1,2,\n" + - "null,Hello world, how are you?,4,2,\nnull,Hello world,3,2,\nnull,Hello,2,2,\n" + - "null,Comment#9,15,2,\nnull,Comment#8,14,2,\nnull,Comment#7,13,2,\n" + - "null,Comment#6,12,2,\nnull,Comment#5,11,2,\nnull,Comment#4,10,2,\n" + - "null,Comment#3,9,2,\nnull,Comment#2,8,2,\nnull,Comment#15,21,2,\n" + - "null,Comment#14,20,2,\nnull,Comment#13,19,2,\nnull,Comment#12,18,2,\n" + - "null,Comment#11,17,2,\nnull,Comment#10,16,2,\nnull,Comment#1,7,0" + "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + + "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + + "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" + + "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" + + "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" + + "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" + + "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" + + "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2" TestBaseUtils.compareResultAsText(result.asJava, expected) } From c3b15ce80c8fd89e390e992930aa6eba2b14f31b Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Wed, 7 Dec 2016 17:35:46 +0300 Subject: [PATCH 03/16] [FLINK-5303] Small fixes. --- .../table/plan/rules/dataSet/DataSetAggregateRule.scala | 2 +- .../plan/rules/datastream/DataStreamAggregateRule.scala | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index f94c45133343e..1b554ff6811a7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion} +import org.apache.calcite.rel.logical.LogicalAggregate import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion} import scala.collection.JavaConversions._ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index 16cfcbd64d682..1b39484050ec7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -43,14 +43,7 @@ class DataStreamAggregateRule throw TableException("DISTINCT aggregates are currently not supported.") } - // // check if we have grouping sets - // val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet - // if (groupSets || agg.indicator) { - // throw TableException("GROUPING SETS are currently not supported.") - // } - !distinctAggs - // && !groupSets && !agg.indicator } override def convert(rel: RelNode): RelNode = { From 73e74f5485bf313f2ca83e999957464b05e6edc3 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 10:46:09 +0300 Subject: [PATCH 04/16] [FLINK-5303] Some improvements. --- .../api/table/expressions/functions.scala | 60 ------------------- .../table/expressions/ExpressionParser.scala | 19 +----- .../plan/nodes/dataset/DataSetAggregate.scala | 32 ++-------- .../datastream/DataStreamAggregate.scala | 35 +++-------- .../AggregateReduceCombineFunction.scala | 2 + .../AggregateReduceGroupFunction.scala | 11 ++++ .../runtime/aggregate/AggregateUtil.scala | 31 +++++++++- .../aggregate/GroupingsMapFunction.scala | 55 ----------------- .../table/validate/FunctionCatalog.scala | 4 -- .../scala/batch/sql/AggregationsITCase.scala | 1 - 10 files changed, 55 insertions(+), 195 deletions(-) delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala delete mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala deleted file mode 100644 index 3b3c87898e800..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/functions.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions - -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -abstract sealed class GroupingFunction extends Expression { - - override def toString = s"Grouping($children)" - - override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO -} - -case class Grouping(children: List[Expression]) extends GroupingFunction { - - /** - * Convert Expression to its counterpart in Calcite, i.e. RexNode - */ - override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { - relBuilder.call(SqlStdOperatorTable.GROUPING, children.map(_.toRexNode): _*) - } -} - -case class GroupingId(children: List[Expression]) extends GroupingFunction { - - /** - * Convert Expression to its counterpart in Calcite, i.e. RexNode - */ - override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { - relBuilder.call(SqlStdOperatorTable.GROUPING_ID, children.map(_.toRexNode): _*) - } -} - -case class GroupId(children: List[Expression]) extends GroupingFunction { - - /** - * Convert Expression to its counterpart in Calcite, i.e. RexNode - */ - override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = { - relBuilder.call(SqlStdOperatorTable.GROUP_ID, children.map(_.toRexNode): _*) - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index 98bbf38525e6d..d212ef1b1d0a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -51,9 +51,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val ARRAY: Keyword = Keyword("Array") lazy val AS: Keyword = Keyword("as") lazy val COUNT: Keyword = Keyword("count") - lazy val GROUP_ID: Keyword = Keyword("group_id") - lazy val GROUPING: Keyword = Keyword("grouping") - lazy val GROUPING_ID: Keyword = Keyword("grouping_id") lazy val AVG: Keyword = Keyword("avg") lazy val MIN: Keyword = Keyword("min") lazy val MAX: Keyword = Keyword("max") @@ -92,8 +89,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val FLATTEN: Keyword = Keyword("flatten") def functionIdent: ExpressionParser.Parser[String] = - not(GROUP_ID) ~ not(GROUPING) ~ not(GROUPING_ID) ~ - not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ + not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~> super.ident @@ -306,15 +302,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixArray: PackratParser[Expression] = ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) } - lazy val prefixGroupId: PackratParser[Expression] = - GROUP_ID ~ "(" ~> expressionList <~ ")" ^^ (ee => GroupId(ee)) - - lazy val prefixGroupingId: PackratParser[Expression] = - GROUPING_ID ~ "(" ~> expressionList <~ ")" ^^ (ee => GroupingId(ee)) - - lazy val prefixGrouping: PackratParser[Expression] = - GROUPING ~ "(" ~> expressionList <~ ")" ^^ (ee => Grouping(ee)) - lazy val prefixSum: PackratParser[Expression] = SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) } @@ -389,9 +376,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } lazy val prefixed: PackratParser[Expression] = - prefixGroupId | prefixGroupingId | prefixGrouping | - prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | - prefixStart | prefixEnd | + prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | prefixFlattening | prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index c32061c9005f3..7f382fe6f8719 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkAggregate -import org.apache.flink.table.runtime.aggregate.{AggregateUtil, GroupingsMapFunction} +import org.apache.flink.table.runtime.aggregate.AggregateUtil import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.api.BatchTableEnvironment @@ -109,7 +109,9 @@ class DataSetAggregate( namedAggregates, inputType, rowRelDataType, - grouping) + grouping, + indicator + ) val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( tableEnv, @@ -129,7 +131,7 @@ class DataSetAggregate( val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) - val reducedInput = { + val result = { if (groupingKeys.length > 0) { // grouped aggregation val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + @@ -153,30 +155,6 @@ class DataSetAggregate( } } - var result = reducedInput - if (indicator) { - val fields = rowRelDataType.getFieldList.asScala.toList - var mapping = ArrayBuffer[(Int, Int)]() - for (i <- fields.indices) { - for (j <- fields.indices) { - if (fields(j).getName.equals("i$" + fields(i).getName)) { - mapping += ((i, j)) - } - } - } - - if (mapping.nonEmpty) { - val mapFunction = new GroupingsMapFunction[Row, Row](mapping.toArray, rowTypeInfo) - - val prepareOpName = s"prepare grouping sets" - result = reducedInput - .asInstanceOf[DataSet[Row]] - .map(mapFunction) - .name(prepareOpName) - .asInstanceOf[DataSet[Any]] - } - } - // if the expected type is not a Row, inject a mapper to convert to the expected type expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala index 830b3a25f949d..b0afc8158ed4e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -43,7 +43,6 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.StreamTableEnvironment import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer class DataStreamAggregate( window: LogicalWindow, @@ -146,7 +145,7 @@ class DataStreamAggregate( val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - val intermediateStream: DataStream[Any] = { + val result: DataStream[Any] = { // check whether all aggregates support partial aggregate if (AggregateUtil.doAllSupportPartialAggregation( namedAggregates.map(_.getKey), @@ -211,7 +210,9 @@ class DataStreamAggregate( inputType, rowRelDataType, grouping, - namedProperties) + indicator, + namedProperties + ) val keyedStream = mappedInput.keyBy(groupingKeys: _*) val windowedStream = @@ -232,7 +233,9 @@ class DataStreamAggregate( inputType, rowRelDataType, grouping, - namedProperties) + indicator, + namedProperties + ) val windowedStream = createNonKeyedWindowedStream(window, mappedInput) @@ -247,30 +250,6 @@ class DataStreamAggregate( } } - var result = intermediateStream - if (indicator) { - val fields = rowRelDataType.getFieldList.asScala.toList - var mapping = ArrayBuffer[(Int, Int)]() - for (i <- fields.indices) { - for (j <- fields.indices) { - if (fields(j).getName.equals("i$" + fields(i).getName)) { - mapping += ((i, j)) - } - } - } - - if (mapping.nonEmpty) { - val mapFunction = new GroupingsMapFunction[Row, Row](mapping.toArray, rowTypeInfo) - - val prepareOpName = s"prepare grouping sets" - result = intermediateStream - .asInstanceOf[DataStream[Row]] - .map(mapFunction) - .name(prepareOpName) - .asInstanceOf[DataStream[Any]] - } - } - // if the expected type is not a Row, inject a mapper to convert to the expected type expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala index 31b85cdaf9f7a..93f32a0a5ea92 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala @@ -41,12 +41,14 @@ class AggregateReduceCombineFunction( private val aggregates: Array[Aggregate[_ <: Any]], private val groupKeysMapping: Array[(Int, Int)], private val aggregateMapping: Array[(Int, Int)], + private val additionalMapping: Array[(Int, Int)], private val intermediateRowArity: Int, private val finalRowArity: Int) extends AggregateReduceGroupFunction( aggregates, groupKeysMapping, aggregateMapping, + additionalMapping, intermediateRowArity, finalRowArity) with CombineFunction[Row, Row] { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala index c1efebb87a893..b295446089863 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -19,12 +19,14 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable +import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util.{Collector, Preconditions} import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer /** * It wraps the aggregate logic inside of @@ -40,6 +42,7 @@ class AggregateReduceGroupFunction( private val aggregates: Array[Aggregate[_ <: Any]], private val groupKeysMapping: Array[(Int, Int)], private val aggregateMapping: Array[(Int, Int)], + private val additionalMapping: Array[(Int, Int)], private val intermediateRowArity: Int, private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { @@ -87,6 +90,14 @@ class AggregateReduceGroupFunction( output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) } + // Evaluate grouping sets additional values + if (additionalMapping != null && additionalMapping.nonEmpty) { + additionalMapping.foreach { + case (inputIndex, outputIndex) => + output.setField(outputIndex, output.productElement(inputIndex) == null) + } + } + out.collect(output) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 282e6c0f59bcc..8acc4d0f47507 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -98,7 +98,9 @@ object AggregateUtil { namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, - groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = { + groupings: Array[Int], + indicator: Boolean + ): RichGroupReduceFunction[Row, Row] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -112,6 +114,8 @@ object AggregateUtil { outputType, groupings) + val additionalMapping = getAdditionalMapping(outputType) + val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial) val intermediateRowArity = groupings.length + @@ -123,6 +127,7 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, + additionalMapping, intermediateRowArity, outputType.getFieldCount) } @@ -131,6 +136,7 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, + additionalMapping, intermediateRowArity, outputType.getFieldCount) } @@ -175,6 +181,7 @@ object AggregateUtil { inputType: RelDataType, outputType: RelDataType, groupings: Array[Int], + indicator: Boolean, properties: Seq[NamedWindowProperty]) : AllWindowFunction[Row, Row, DataStreamWindow] = { @@ -183,7 +190,9 @@ object AggregateUtil { namedAggregates, inputType, outputType, - groupings) + groupings, + indicator + ) if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -204,6 +213,7 @@ object AggregateUtil { inputType: RelDataType, outputType: RelDataType, groupings: Array[Int], + indicator: Boolean, properties: Seq[NamedWindowProperty]) : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { @@ -212,7 +222,9 @@ object AggregateUtil { namedAggregates, inputType, outputType, - groupings) + groupings, + indicator + ) if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -349,6 +361,19 @@ object AggregateUtil { (groupingOffsetMapping, aggOffsetMapping) } + private def getAdditionalMapping(outputType: RelDataType): Array[(Int, Int)] = { + val fields = outputType.getFieldList + var mappingsBuffer = ArrayBuffer[(Int, Int)]() + for (i <- fields.indices) { + for (j <- fields.indices) { + if (fields(j).getName.equals("i$" + fields(i).getName)) { + mappingsBuffer += ((i, j)) + } + } + } + mappingsBuffer.toArray + } + private def isTimeWindow(window: LogicalWindow) = { window match { case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala deleted file mode 100644 index 564984d388133..0000000000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupingsMapFunction.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.aggregate - -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - -class GroupingsMapFunction[IN, OUT](private val mappings: Array[(Int, Int)], - @transient private val returnType: TypeInformation[OUT]) - extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] { - - private var output: Row = _ - - override def open(config: Configuration) { - Preconditions.checkNotNull(mappings) - output = new Row(returnType.getTotalFields) - } - - override def map(value: IN): OUT = { - - val input = value.asInstanceOf[Row] - for (i <- 0 until returnType.getTotalFields) { - output.setField(i, input.getField(i)) - } - mappings.foreach { - case (inputIndex, outputIndex) => - output.setField(outputIndex, input.getField(inputIndex) == null) - } - output.asInstanceOf[OUT] - } - - override def getProducedType: TypeInformation[OUT] = { - returnType - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index ffc6ec839bc39..d5cefd2e6e979 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -147,10 +147,6 @@ object FunctionCatalog { "isNotTrue" -> classOf[IsNotTrue], "isNotFalse" -> classOf[IsNotFalse], - "groupId" -> classOf[GroupId], - "grouping" -> classOf[Grouping], - "groupingId" -> classOf[GroupingId], - // aggregate functions "avg" -> classOf[Avg], "count" -> classOf[Count], diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala index a748f09d9cf6e..d6f2b7bb648a5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala @@ -258,7 +258,6 @@ class AggregationsITCase( tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() - result.foreach(println(_)) val expected = "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + From 0ead10be48ba45e82db845625394cdc4ffd0c931 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 12:32:53 +0300 Subject: [PATCH 05/16] [FLINK-5303] Added tests. --- .../api/java/batch/sql/GroupingSetsTest.java | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java new file mode 100644 index 0000000000000..43a85e045781d --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java @@ -0,0 +1,135 @@ +package org.apache.flink.api.java.batch.sql; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableConfig; +import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Comparator; +import java.util.List; + +public class GroupingSetsTest { + + private final static String TABLE_NAME = "MyTable"; + private BatchTableEnvironment tableEnv; + + @Before + public void setup() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig()); + DataSet> dataSet = CollectionDataSets.get3TupleDataSet(env); + tableEnv.registerDataSet(TABLE_NAME, dataSet); + } + + @Test + public void testGroupingSets() throws Exception { + String query = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " + TABLE_NAME + + " GROUP BY GROUPING SETS (f1, f2)"; + + String expected = + "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + + "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + + "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" + + "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" + + "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" + + "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" + + "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" + + "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"; + + checkSql(query, expected); + } + + @Test + public void testCubeAsGroupingSets() throws Exception { + String cubeQuery = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " + + " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " + + " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " + + " GROUPING_ID(f1, f2) as gid " + + " FROM " + TABLE_NAME + " GROUP BY CUBE (f1, f2)"; + + String groupingSetsQuery = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " + + " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " + + " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " + + " GROUPING_ID(f1, f2) as gid " + + " FROM " + TABLE_NAME + + " GROUP BY GROUPING SETS ((f1, f2), (f1), (f2), ())"; + + compareSql(cubeQuery, groupingSetsQuery); + } + + @Test + public void testRollupAsGroupingSets() throws Exception { + String rollupQuery = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " + + " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " + + " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " + + " GROUPING_ID(f1, f2) as gid " + + " FROM " + TABLE_NAME + " GROUP BY ROLLUP (f1, f2)"; + + String groupingSetsQuery = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " + + " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " + + " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " + + " GROUPING_ID(f1, f2) as gid " + + " FROM " + TABLE_NAME + + " GROUP BY GROUPING SETS ((f1, f2), (f1), ())"; + + compareSql(rollupQuery, groupingSetsQuery); + } + + /** + * Execute SQL query and check results. + * + * @param query SQL query. + * @param expected Expected result. + */ + private void checkSql(String query, String expected) throws Exception { + Table resultTable = tableEnv.sql(query); + DataSet resultDataSet = tableEnv.toDataSet(resultTable, Row.class); + List results = resultDataSet.collect(); + TestBaseUtils.compareResultAsText(results, expected); + } + + private void compareSql(String query1, String query2) throws Exception { + + // Function to map row to string + MapFunction mapFunction = new MapFunction() { + + @Override + public String map(Row value) throws Exception { + return value == null ? "null" : value.toString(); + } + }; + + // Execute first query and store results + Table resultTable1 = tableEnv.sql(query1); + DataSet resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class); + List results1 = resultDataSet1.map(mapFunction).collect(); + + // Execute second query and store results + Table resultTable2 = tableEnv.sql(query2); + DataSet resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class); + List results2 = resultDataSet2.map(mapFunction).collect(); + + // Compare results + TestBaseUtils.compareResultCollections(results1, results2, new Comparator() { + + @Override + public int compare(String o1, String o2) { + return o2 == null ? o1 == null ? 0 : 1 : o1.compareTo(o2); + } + }); + } +} From 0a94ad877dbc8988e0635b043118ff48d0ecc7a4 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 12:34:35 +0300 Subject: [PATCH 06/16] [FLINK-5303] Test small fix. --- .../api/java/batch/sql/GroupingSetsTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java index 43a85e045781d..e62486c1a504c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.api.java.batch.sql; import org.apache.flink.api.common.functions.MapFunction; From 93a90d9d6a8508bda0bb3d690c5a9dd609f4bdb8 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 14:34:19 +0300 Subject: [PATCH 07/16] [FLINK-5303] Grouping sets tests and fixes. --- .../AggregateReduceGroupFunction.scala | 11 +++--- .../api/java/batch/sql/GroupingSetsTest.java | 35 +++++++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala index b295446089863..c4f697733bdee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -19,14 +19,12 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util.{Collector, Preconditions} import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer /** * It wraps the aggregate logic inside of @@ -92,9 +90,12 @@ class AggregateReduceGroupFunction( // Evaluate grouping sets additional values if (additionalMapping != null && additionalMapping.nonEmpty) { - additionalMapping.foreach { - case (inputIndex, outputIndex) => - output.setField(outputIndex, output.productElement(inputIndex) == null) + + val groupingFields = groupKeysMapping.map(_._1) + additionalMapping.map { + case (inputIndex, outputIndex) => (outputIndex, groupingFields.contains(inputIndex)) + }.foreach { + case (index, flag) => output.setField(index, !flag) } } diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java index e62486c1a504c..963f555a74d68 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.Row; @@ -38,14 +39,29 @@ public class GroupingSetsTest { private final static String TABLE_NAME = "MyTable"; + private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls"; private BatchTableEnvironment tableEnv; @Before public void setup() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig()); + DataSet> dataSet = CollectionDataSets.get3TupleDataSet(env); tableEnv.registerDataSet(TABLE_NAME, dataSet); + + MapOperator, Tuple3> dataSetWithNulls = + dataSet.map(new MapFunction, Tuple3>() { + + @Override + public Tuple3 map(Tuple3 value) throws Exception { + if (value.f2.toLowerCase().contains("world")) { + value.f2 = null; + } + return value; + } + }); + tableEnv.registerDataSet(TABLE_WITH_NULLS_NAME, dataSetWithNulls); } @Test @@ -67,6 +83,25 @@ public void testGroupingSets() throws Exception { checkSql(query, expected); } + @Test + public void testGroupingSetsWithNulls() throws Exception { + String query = + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " + TABLE_WITH_NULLS_NAME + + " GROUP BY GROUPING SETS (f1, f2)"; + + String expected = + "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + + "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + + "null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" + + "null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" + + "null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" + + "null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" + + "null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" + + "null,Comment#1,7,2"; + + checkSql(query, expected); + } + @Test public void testCubeAsGroupingSets() throws Exception { String cubeQuery = From d258c5c9e3c8ae443cceae8182ea09c79af1f867 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 14:44:41 +0300 Subject: [PATCH 08/16] [FLINK-5303] Some cleanup. --- .../flink/table/plan/nodes/dataset/DataSetAggregate.scala | 2 -- .../org/apache/flink/table/validate/FunctionCatalog.scala | 3 --- 2 files changed, 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index 7f382fe6f8719..6751b04e78cb7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -23,7 +23,6 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -36,7 +35,6 @@ import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.types.Row import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer /** * Flink RelNode which matches along with a LogicalAggregate. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index d5cefd2e6e979..f92b3a18f69ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -257,9 +257,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.NOT, SqlStdOperatorTable.UNARY_MINUS, SqlStdOperatorTable.UNARY_PLUS, - SqlStdOperatorTable.GROUP_ID, - SqlStdOperatorTable.GROUPING, - SqlStdOperatorTable.GROUPING_ID, // AGGREGATE OPERATORS SqlStdOperatorTable.SUM, SqlStdOperatorTable.COUNT, From fcba0e2ca5e48fac6da3478d1257b50cc504bd02 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 16:14:14 +0300 Subject: [PATCH 09/16] [FLINK-5303] Have supplemented documentation. --- docs/dev/table_api.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index d851c4c05c4b7..528e3f9ea3ef8 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1317,7 +1317,6 @@ Among others, the following SQL features are not supported, yet: - Interval arithmetic is currenly limited - Distinct aggregates (e.g., `COUNT(DISTINCT name)`) - Non-equi joins and Cartesian products -- Grouping sets *Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.* @@ -1391,7 +1390,7 @@ select: { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] - [ GROUP BY { groupItem [, groupItem ]* } ] + [ GROUP BY groupExpression ] [ HAVING booleanExpression ] selectWithoutFrom: @@ -1420,6 +1419,12 @@ tablePrimary: values: VALUES expression [, expression ]* +groupExpression: + groupItem [, groupItem ]* + | GROUPING SETS (groupItem [, groupItem ]*) + | ROLLUP (groupItem [, groupItem ]*) + | CUBE (groupItem [, groupItem ]*) + groupItem: expression | '(' ')' @@ -1442,7 +1447,7 @@ Although not every SQL feature is implemented yet, some string combinations are {% highlight sql %} -A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE +A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUP_ID, GROUPING, GROUPING_ID, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE {% endhighlight %} From 69e5e09a3ed51ef25c85ad1f58734527d92ca759 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Thu, 8 Dec 2016 17:56:00 +0300 Subject: [PATCH 10/16] [FLINK-5303] Improved documentation. --- docs/dev/table_api.md | 56 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 528e3f9ea3ef8..b7c20eb4436b0 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1390,7 +1390,7 @@ select: { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] - [ GROUP BY groupExpression ] + [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] selectWithoutFrom: @@ -1419,17 +1419,13 @@ tablePrimary: values: VALUES expression [, expression ]* -groupExpression: - groupItem [, groupItem ]* - | GROUPING SETS (groupItem [, groupItem ]*) - | ROLLUP (groupItem [, groupItem ]*) - | CUBE (groupItem [, groupItem ]*) - groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' - + | CUBE '(' expression [, expression ]* ')' + | ROLLUP '(' expression [, expression ]* ')' + | GROUPING SETS '(' groupItem [, groupItem ]* ')' ``` For a better definition of SQL queries within a Java String, Flink SQL uses a lexical policy similar to Java: @@ -3746,6 +3742,50 @@ MIN(value) + + + + + + + + + + + + + + + + + + + + + + + + +
Grouping functionsDescription
+ {% highlight text %} +GROUP_ID() +{% endhighlight %} + +

Returns an integer that uniquely identifies the combination of grouping keys.

+
+ {% highlight text %} +GROUPING(value) +{% endhighlight %} + +

Returns 1 if expression is rolled up in the current row’s grouping set, 0 otherwise.

+
+ {% highlight text %} +GROUPING_ID(value [, value]* ) +{% endhighlight %} + +

Returns a bit vector of the given grouping expressions.

+
+ From bf4c2b1248eaa13125ebad39e33271b3961f4942 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Wed, 21 Dec 2016 12:52:32 +0300 Subject: [PATCH 11/16] [FLINK-5303] Some fixes after rebase. --- .../flink/table/expressions/ExpressionParser.scala | 6 +++--- .../apache/flink/table/validate/FunctionCatalog.scala | 4 ++++ .../flink/api/java/batch/sql/GroupingSetsTest.java | 10 +++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index d212ef1b1d0a2..d85540a98e4b9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -376,9 +376,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } lazy val prefixed: PackratParser[Expression] = - prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd | - prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | - prefixFloor | prefixCeil | prefixGet | prefixFlattening | + prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | + prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | + prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | prefixFlattening | prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end // suffix/prefix composite diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index f92b3a18f69ac..c00f8bbd7aa09 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -257,6 +257,10 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.NOT, SqlStdOperatorTable.UNARY_MINUS, SqlStdOperatorTable.UNARY_PLUS, + // GROUPING FUNCTIONS + SqlStdOperatorTable.GROUP_ID, + SqlStdOperatorTable.GROUPING, + SqlStdOperatorTable.GROUPING_ID, // AGGREGATE OPERATORS SqlStdOperatorTable.SUM, SqlStdOperatorTable.COUNT, diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java index 963f555a74d68..b83ab486349bd 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java @@ -22,14 +22,14 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.MapOperator; -import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.TableConfig; -import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; From 34448009ca48b7a1b4c128e0ba1fb3c12f5bddc6 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Fri, 30 Dec 2016 12:04:36 +0300 Subject: [PATCH 12/16] [FLINK-5303] Cleaned up the code. --- .../plan/nodes/dataset/DataSetAggregate.scala | 13 ++++------- .../datastream/DataStreamAggregate.scala | 17 +++++--------- .../rules/dataSet/DataSetAggregateRule.scala | 6 ++--- .../DataSetAggregateWithNullValuesRule.scala | 6 ++--- .../datastream/DataStreamAggregateRule.scala | 6 ++--- .../AggregateReduceCombineFunction.scala | 17 +++++++------- .../AggregateReduceGroupFunction.scala | 22 ++++++++++--------- .../runtime/aggregate/AggregateUtil.scala | 21 ++++++------------ 8 files changed, 43 insertions(+), 65 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index 6751b04e78cb7..26f2d5b51a168 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -46,13 +46,12 @@ class DataSetAggregate( namedAggregates: Seq[CalcitePair[AggregateCall, String]], rowRelDataType: RelDataType, inputType: RelDataType, - grouping: Array[Int], - indicator: Boolean) + grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with FlinkAggregate with DataSetRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetAggregate( @@ -62,9 +61,7 @@ class DataSetAggregate( namedAggregates, getRowType, inputType, - grouping, - indicator - ) + grouping) } override def toString: String = { @@ -107,9 +104,7 @@ class DataSetAggregate( namedAggregates, inputType, rowRelDataType, - grouping, - indicator - ) + grouping) val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( tableEnv, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala index b0afc8158ed4e..6a3d4e32c5bbe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -53,13 +53,12 @@ class DataStreamAggregate( namedAggregates: Seq[CalcitePair[AggregateCall, String]], rowRelDataType: RelDataType, inputType: RelDataType, - grouping: Array[Int], - indicator: Boolean) + grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with FlinkAggregate with DataStreamRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamAggregate( @@ -71,9 +70,7 @@ class DataStreamAggregate( namedAggregates, getRowType, inputType, - grouping, - indicator - ) + grouping) } override def toString: String = { @@ -210,9 +207,7 @@ class DataStreamAggregate( inputType, rowRelDataType, grouping, - indicator, - namedProperties - ) + namedProperties) val keyedStream = mappedInput.keyBy(groupingKeys: _*) val windowedStream = @@ -233,9 +228,7 @@ class DataStreamAggregate( inputType, rowRelDataType, grouping, - indicator, - namedProperties - ) + namedProperties) val windowedStream = createNonKeyedWindowedStream(window, mappedInput) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index 1b554ff6811a7..0c34176ce5a78 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -65,8 +65,7 @@ class DataSetAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray, - agg.indicator + set.toArray ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -86,8 +85,7 @@ class DataSetAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray, - agg.indicator + agg.getGroupSet.toArray ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala index 0482fb4461d4f..7871c00ad6001 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala @@ -84,8 +84,7 @@ class DataSetAggregateWithNullValuesRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray, - agg.indicator + set.toArray ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -105,8 +104,7 @@ class DataSetAggregateWithNullValuesRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray, - agg.indicator + agg.getGroupSet.toArray ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index 1b39484050ec7..c6d30531d30d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -62,8 +62,7 @@ class DataStreamAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - set.toArray, - agg.indicator + set.toArray ).asInstanceOf[RelNode] ).reduce( (rel1, rel2) => { @@ -85,8 +84,7 @@ class DataStreamAggregateRule agg.getNamedAggCalls, rel.getRowType, agg.getInput.getRowType, - agg.getGroupSet.toArray, - agg.indicator + agg.getGroupSet.toArray ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala index 93f32a0a5ea92..5237ecfbf47d4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala @@ -25,30 +25,31 @@ import org.apache.flink.types.Row import scala.collection.JavaConversions._ - /** * It wraps the aggregate logic inside of * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and * [[org.apache.flink.api.java.operators.GroupCombineOperator]] * - * @param aggregates The aggregate functions. - * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping The index mapping between aggregate function list and aggregated value - * index in output Row. + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate + * Row and output Row. */ class AggregateReduceCombineFunction( private val aggregates: Array[Aggregate[_ <: Any]], private val groupKeysMapping: Array[(Int, Int)], private val aggregateMapping: Array[(Int, Int)], - private val additionalMapping: Array[(Int, Int)], + private val groupingSetsMapping: Array[(Int, Int)], private val intermediateRowArity: Int, private val finalRowArity: Int) extends AggregateReduceGroupFunction( aggregates, groupKeysMapping, aggregateMapping, - additionalMapping, + groupingSetsMapping, intermediateRowArity, finalRowArity) with CombineFunction[Row, Row] { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala index c4f697733bdee..568338381d98e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -20,27 +20,29 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} import scala.collection.JavaConversions._ /** - * It wraps the aggregate logic inside of + * It wraps the aggregate logic inside of * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. * - * @param aggregates The aggregate functions. - * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping The index mapping between aggregate function list and aggregated value - * index in output Row. + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate + * Row and output Row. */ class AggregateReduceGroupFunction( private val aggregates: Array[Aggregate[_ <: Any]], private val groupKeysMapping: Array[(Int, Int)], private val aggregateMapping: Array[(Int, Int)], - private val additionalMapping: Array[(Int, Int)], + private val groupingSetsMapping: Array[(Int, Int)], private val intermediateRowArity: Int, private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { @@ -89,10 +91,10 @@ class AggregateReduceGroupFunction( } // Evaluate grouping sets additional values - if (additionalMapping != null && additionalMapping.nonEmpty) { + if (groupingSetsMapping != null && groupingSetsMapping.nonEmpty) { val groupingFields = groupKeysMapping.map(_._1) - additionalMapping.map { + groupingSetsMapping.map { case (inputIndex, outputIndex) => (outputIndex, groupingFields.contains(inputIndex)) }.foreach { case (index, flag) => output.setField(index, !flag) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 8acc4d0f47507..8b70fcd3ea758 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -98,8 +98,7 @@ object AggregateUtil { namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, - groupings: Array[Int], - indicator: Boolean + groupings: Array[Int] ): RichGroupReduceFunction[Row, Row] = { val aggregates = transformToAggregateFunctions( @@ -114,7 +113,7 @@ object AggregateUtil { outputType, groupings) - val additionalMapping = getAdditionalMapping(outputType) + val groupingSetsMapping = getGroupingSetsMapping(outputType) val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial) @@ -127,7 +126,7 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, - additionalMapping, + groupingSetsMapping, intermediateRowArity, outputType.getFieldCount) } @@ -136,7 +135,7 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, - additionalMapping, + groupingSetsMapping, intermediateRowArity, outputType.getFieldCount) } @@ -181,7 +180,6 @@ object AggregateUtil { inputType: RelDataType, outputType: RelDataType, groupings: Array[Int], - indicator: Boolean, properties: Seq[NamedWindowProperty]) : AllWindowFunction[Row, Row, DataStreamWindow] = { @@ -190,9 +188,7 @@ object AggregateUtil { namedAggregates, inputType, outputType, - groupings, - indicator - ) + groupings) if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -213,7 +209,6 @@ object AggregateUtil { inputType: RelDataType, outputType: RelDataType, groupings: Array[Int], - indicator: Boolean, properties: Seq[NamedWindowProperty]) : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { @@ -222,9 +217,7 @@ object AggregateUtil { namedAggregates, inputType, outputType, - groupings, - indicator - ) + groupings) if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -361,7 +354,7 @@ object AggregateUtil { (groupingOffsetMapping, aggOffsetMapping) } - private def getAdditionalMapping(outputType: RelDataType): Array[(Int, Int)] = { + private def getGroupingSetsMapping(outputType: RelDataType): Array[(Int, Int)] = { val fields = outputType.getFieldList var mappingsBuffer = ArrayBuffer[(Int, Int)]() for (i <- fields.indices) { From ae4d4d7981470e2abe5d7b6acee30d2861033afa Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Fri, 30 Dec 2016 13:19:40 +0300 Subject: [PATCH 13/16] [FLINK-5303] Added test to check generated plan. --- .../api/java/batch/sql/GroupingSetsTest.java | 2 +- .../flink/table/GroupingSetsPlansTest.scala | 195 ++++++++++++++++++ 2 files changed, 196 insertions(+), 1 deletion(-) rename flink-libraries/flink-table/src/test/java/org/apache/flink/{ => table}/api/java/batch/sql/GroupingSetsTest.java (99%) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java similarity index 99% rename from flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java rename to flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java index b83ab486349bd..29819973b54df 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/GroupingSetsTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.java.batch.sql; +package org.apache.flink.table.api.java.batch.sql; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala new file mode 100644 index 0000000000000..594b1231ca4cd --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala @@ -0,0 +1,195 @@ +package org.apache.flink.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class GroupingSetsPlansTest extends TableTestBase { + + @Test + def testGroupingSetsPlan(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " + + "GROUP BY GROUPING SETS (b, c)" + + val group1 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "b"), + term("select", "b", + "AVG(a) AS c") + ) + + val group2 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "c"), + term("select", "c AS b", + "AVG(a) AS c") + ) + + val union = binaryNode( + "DataSetUnion", + group1, group2, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val aggregate = unaryNode( + "DataSetCalc", + union, + term("select", + "CASE(i$b, null, b) AS b", + "CASE(i$c, null, c) AS c", + "a", + "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g") // GROUP_ID() + ) + + util.verifySql(sqlQuery, aggregate) + } + + @Test + def testCubePlan(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " + + "GROUPING(b) as gb, GROUPING(c) as gc, " + + "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " + + "GROUPING_ID(b, c) as gid " + " FROM MyTable " + + "GROUP BY CUBE (b, c)" + + val group1 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "b, c"), + term("select", "b", "c", + "AVG(a) AS i$b") + ) + + val group2 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "b"), + term("select", "b", + "AVG(a) AS c") + ) + + val group3 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "c"), + term("select", "c AS b", + "AVG(a) AS c") + ) + + val group4 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("select", + "AVG(a) AS b") + ) + + val union1 = binaryNode( + "DataSetUnion", + group1, group2, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val union2 = binaryNode( + "DataSetUnion", + union1, group3, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val union3 = binaryNode( + "DataSetUnion", + union2, group4, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val aggregate = unaryNode( + "DataSetCalc", + union3, + term("select", + "CASE(i$b, null, b) AS b", + "CASE(i$c, null, c) AS c", + "a", + "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID() + "CASE(i$b, 1, 0) AS gb", // GROUPING(b) + "CASE(i$c, 1, 0) AS gc", // GROUPING(c) + "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b) + "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c) + "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c) + ) + + util.verifySql(sqlQuery, aggregate) + } + + @Test + def testRollupPlan(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " + + "GROUPING(b) as gb, GROUPING(c) as gc, " + + "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " + + "GROUPING_ID(b, c) as gid " + " FROM MyTable " + + "GROUP BY ROLLUP (b, c)" + + val group1 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "b, c"), + term("select", "b", "c", + "AVG(a) AS i$b") + ) + + val group2 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "b"), + term("select", "b", + "AVG(a) AS c") + ) + + val group3 = unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("select", + "AVG(a) AS b") + ) + + val union1 = binaryNode( + "DataSetUnion", + group1, group2, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val union2 = binaryNode( + "DataSetUnion", + union1, group3, + term("union", "b", "c", "i$b", "i$c", "a") + ) + + val aggregate = unaryNode( + "DataSetCalc", + union2, + term("select", + "CASE(i$b, null, b) AS b", + "CASE(i$c, null, c) AS c", + "a", + "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID() + "CASE(i$b, 1, 0) AS gb", // GROUPING(b) + "CASE(i$c, 1, 0) AS gc", // GROUPING(c) + "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b) + "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c) + "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c) + ) + + util.verifySql(sqlQuery, aggregate) + } +} From 2e575836fe2735a55744009546f6c92d40b1a83e Mon Sep 17 00:00:00 2001 From: Alexander Chermenin Date: Wed, 4 Jan 2017 02:33:48 +0300 Subject: [PATCH 14/16] [FLINK-5303] Added license for test. --- .../flink/table/GroupingSetsPlansTest.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala index 594b1231ca4cd..5d946d37ccb6f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/GroupingSetsPlansTest.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.table import org.apache.flink.api.scala._ From 8b66634e5c854eb0d36e63453b92a32739bacd7d Mon Sep 17 00:00:00 2001 From: Alexander Chermenin Date: Fri, 6 Jan 2017 22:34:35 +0300 Subject: [PATCH 15/16] [FLINK-5303] Some fixes for grouping sets. --- docs/dev/table_api.md | 6 ++-- .../rules/dataSet/DataSetAggregateRule.scala | 3 +- .../AggregateReduceGroupFunction.scala | 10 +++--- .../runtime/aggregate/AggregateUtil.scala | 32 +++++++++++++++---- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index b7c20eb4436b0..dce1a9a67f1de 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1443,7 +1443,7 @@ Although not every SQL feature is implemented yet, some string combinations are {% highlight sql %} -A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUP_ID, GROUPING, GROUPING_ID, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE +A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE {% endhighlight %} @@ -3765,7 +3765,7 @@ GROUP_ID()
{% highlight text %} -GROUPING(value) +GROUPING(expression) {% endhighlight %} @@ -3776,7 +3776,7 @@ GROUPING(value)
{% highlight text %} -GROUPING_ID(value [, value]* ) +GROUPING_ID(expression [, expression]* ) {% endhighlight %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index 0c34176ce5a78..5dd41c73d1cad 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -72,7 +72,8 @@ class DataSetAggregateRule new DataSetUnion( rel.getCluster, traitSet, - rel1, rel2, + rel1, + rel2, rel.getRowType ) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala index 568338381d98e..45803b2b13a10 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -91,13 +91,11 @@ class AggregateReduceGroupFunction( } // Evaluate grouping sets additional values - if (groupingSetsMapping != null && groupingSetsMapping.nonEmpty) { - + if (!groupingSetsMapping.isEmpty) { val groupingFields = groupKeysMapping.map(_._1) - groupingSetsMapping.map { - case (inputIndex, outputIndex) => (outputIndex, groupingFields.contains(inputIndex)) - }.foreach { - case (index, flag) => output.setField(index, !flag) + groupingSetsMapping.foreach { + case (inputIndex, outputIndex) => + output.setField(outputIndex, !groupingFields.contains(inputIndex)) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 8b70fcd3ea758..f0683f66b5769 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -113,7 +113,7 @@ object AggregateUtil { outputType, groupings) - val groupingSetsMapping = getGroupingSetsMapping(outputType) + val groupingSetsMapping = getGroupingSetsMapping(inputType, outputType) val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial) @@ -354,12 +354,32 @@ object AggregateUtil { (groupingOffsetMapping, aggOffsetMapping) } - private def getGroupingSetsMapping(outputType: RelDataType): Array[(Int, Int)] = { - val fields = outputType.getFieldList + private def getGroupingSetsMapping( + inputType: RelDataType, + outputType: RelDataType + ): Array[(Int, Int)] = { + + val inputFields = inputType.getFieldList.map(_.getName) + + val groupingFields = inputFields + .map(inputFieldName => { + val base = "i$" + inputFieldName + var name = base + var i = 0 + while (inputFields.contains(name)) { + name = base + "_" + i + i = i + 1 + } + inputFieldName -> name + }).toMap + + val outputFields = outputType.getFieldList var mappingsBuffer = ArrayBuffer[(Int, Int)]() - for (i <- fields.indices) { - for (j <- fields.indices) { - if (fields(j).getName.equals("i$" + fields(i).getName)) { + for (i <- outputFields.indices) { + for (j <- outputFields.indices) { + if (outputFields(j).getName.equals( + groupingFields.getOrElse(outputFields(i).getName, null) + )) { mappingsBuffer += ((i, j)) } } From 3a19e1661dbe3d905ba3c7ab74601cacb04f7903 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Mon, 9 Jan 2017 11:14:49 +0300 Subject: [PATCH 16/16] [FLINK-5303] Updated test case for grouping sets. --- ...gSetsTest.java => GroupingSetsITCase.java} | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) rename flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/{GroupingSetsTest.java => GroupingSetsITCase.java} (79%) diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java similarity index 79% rename from flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java rename to flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java index 29819973b54df..f7111f7cc1c09 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsTest.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java @@ -27,23 +27,31 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Comparator; import java.util.List; -public class GroupingSetsTest { +@RunWith(Parameterized.class) +public class GroupingSetsITCase extends TableProgramsTestBase { private final static String TABLE_NAME = "MyTable"; private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls"; private BatchTableEnvironment tableEnv; + public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) { + super(mode, tableConfigMode); + } + @Before - public void setup() { + public void setupTables() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig()); @@ -67,18 +75,41 @@ public Tuple3 map(Tuple3 value) th @Test public void testGroupingSets() throws Exception { String query = - "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " + TABLE_NAME + + "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " + + " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " + + " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " + + " GROUPING_ID(f1, f2) as gid " + + " FROM " + TABLE_NAME + " GROUP BY GROUPING SETS (f1, f2)"; String expected = - "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + - "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + - "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" + - "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" + - "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" + - "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" + - "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" + - "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"; + "1,null,1,1,0,1,0,1,1\n" + + "6,null,18,1,0,1,0,1,1\n" + + "2,null,2,1,0,1,0,1,1\n" + + "4,null,8,1,0,1,0,1,1\n" + + "5,null,13,1,0,1,0,1,1\n" + + "3,null,5,1,0,1,0,1,1\n" + + "null,Comment#11,17,2,1,0,1,0,2\n" + + "null,Comment#8,14,2,1,0,1,0,2\n" + + "null,Comment#2,8,2,1,0,1,0,2\n" + + "null,Comment#1,7,2,1,0,1,0,2\n" + + "null,Comment#14,20,2,1,0,1,0,2\n" + + "null,Comment#7,13,2,1,0,1,0,2\n" + + "null,Comment#6,12,2,1,0,1,0,2\n" + + "null,Comment#3,9,2,1,0,1,0,2\n" + + "null,Comment#12,18,2,1,0,1,0,2\n" + + "null,Comment#5,11,2,1,0,1,0,2\n" + + "null,Comment#15,21,2,1,0,1,0,2\n" + + "null,Comment#4,10,2,1,0,1,0,2\n" + + "null,Hi,1,2,1,0,1,0,2\n" + + "null,Comment#10,16,2,1,0,1,0,2\n" + + "null,Hello world,3,2,1,0,1,0,2\n" + + "null,I am fine.,5,2,1,0,1,0,2\n" + + "null,Hello world, how are you?,4,2,1,0,1,0,2\n" + + "null,Comment#9,15,2,1,0,1,0,2\n" + + "null,Comment#13,19,2,1,0,1,0,2\n" + + "null,Luke Skywalker,6,2,1,0,1,0,2\n" + + "null,Hello,2,2,1,0,1,0,2"; checkSql(query, expected); }