From db363d78011fd82171b423fdb188227f1829436e Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 18 Jan 2017 16:56:02 +0100 Subject: [PATCH] [FLINK-5047] [table] Add sliding group-windows for batch tables --- .../table/functions/AggregateFunction.scala | 8 +- .../dataset/DataSetWindowAggregate.scala | 118 ++++++++- .../runtime/aggregate/AggregateUtil.scala | 250 +++++++++++++++--- ...SessionWindowAggReduceGroupFunction.scala} | 2 +- ...SetSlideTimeWindowAggFlatMapFunction.scala | 77 ++++++ ...deTimeWindowAggReduceCombineFunction.scala | 110 ++++++++ ...lideTimeWindowAggReduceGroupFunction.scala | 147 ++++++++++ ...tSlideWindowAggReduceCombineFunction.scala | 117 ++++++++ ...SetSlideWindowAggReduceGroupFunction.scala | 141 ++++++++++ ...bleCountWindowAggReduceGroupFunction.scala | 3 - ...mbleTimeWindowAggReduceGroupFunction.scala | 4 +- ...cala => DataSetWindowAggMapFunction.scala} | 5 +- ...ncrementalAggregateAllWindowFunction.scala | 7 +- .../stream/table/AggregationsITCase.scala | 43 +-- .../DataSetWindowAggregateITCase.scala | 163 +++++++++++- .../DataStreamAggregateITCase.scala | 235 ++++++++++++++++ 16 files changed, 1341 insertions(+), 89 deletions(-) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/{DataSetSessionWindowAggregateReduceGroupFunction.scala => DataSetSessionWindowAggReduceGroupFunction.scala} (99%) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/{DataSetWindowAggregateMapFunction.scala => DataSetWindowAggMapFunction.scala} (96%) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala index 967d2ea1c2e2a..773c71fcfff32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala @@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def getValue(accumulator: Accumulator): T /** - * Process the input values and update the provided accumulator instance. + * Processes the input values and update the provided accumulator instance. * * @param accumulator the accumulator which contains the current * aggregated results @@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def accumulate(accumulator: Accumulator, input: Any): Unit /** - * Merge a list of accumulator instances into one accumulator instance. + * Merges a list of accumulator instances into one accumulator instance. * - * IMPORTANT: You may only return a new accumulator instance or the the first accumulator of the + * IMPORTANT: You may only return a new accumulator instance or the first accumulator of the * input list. If you return another instance, the result of the aggregation function might be * incorrect. * @@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { * * @return The type information for the accumulator. */ - def getAccumulatorType(): TypeInformation[_] = null + def getAccumulatorType: TypeInformation[_] = null } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index fb5ff3b0f6cf4..3693cb31ba18a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.types.Row @@ -111,17 +112,25 @@ class DataSetWindowAggregate( // whether identifiers are matched case-sensitively val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive() + window match { case EventTimeTumblingGroupWindow(_, _, size) => createEventTimeTumblingWindowDataSet( inputDS, isTimeInterval(size.resultType), caseSensitive) + case EventTimeSessionGroupWindow(_, _, gap) => createEventTimeSessionWindowDataSet(inputDS, caseSensitive) - case EventTimeSlidingGroupWindow(_, _, _, _) => - throw new UnsupportedOperationException( - "Event-time sliding windows in a batch environment are currently not supported") + + case EventTimeSlidingGroupWindow(_, _, size, slide) => + createEventTimeSlidingWindowDataSet( + inputDS, + isTimeInterval(size.resultType), + asLong(size), + asLong(slide), + caseSensitive) + case _: ProcessingTimeGroupWindow => throw new UnsupportedOperationException( "Processing-time tumbling windows are not supported in a batch environment, " + @@ -130,7 +139,6 @@ class DataSetWindowAggregate( } } - private def createEventTimeTumblingWindowDataSet( inputDS: DataSet[Row], isTimeWindow: Boolean, @@ -312,6 +320,108 @@ class DataSetWindowAggregate( } } + private def createEventTimeSlidingWindowDataSet( + inputDS: DataSet[Row], + isTimeWindow: Boolean, + size: Long, + slide: Long, + isParserCaseSensitive: Boolean) + : DataSet[Row] = { + + // create MapFunction for initializing the aggregations + // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) + + val mappedDataSet = inputDS + .map(mapFunction) + .name(prepareOperatorName) + + val mapReturnType = mappedDataSet.getType + + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val groupingKeys = grouping.indices.toArray + + // do partial aggregation if possible + val isPartial = doAllSupportPartialMerge( + namedAggregates.map(_.getKey), + inputType, + grouping.length) + + // only pre-tumble if it is worth it + val littleTumblingSize = determineLargestTumblingSize(size, slide) <= 1 + + val preparedDataSet = if (isTimeWindow) { + // time window + + if (isPartial && !littleTumblingSize) { + // partial aggregates + + val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1 + + // create GroupReduceFunction + // for pre-tumbling and replicating/omitting the content for each pane + val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) + + mappedDataSet.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeysAndAlignedRowtime: _*) + .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits + .name(prepareOperatorName) + } else { + // non-partial aggregates + + // create FlatMapFunction + // for replicating/omitting the content for each pane + val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) + + mappedDataSet + .flatMap(prepareFlatMapFunction) // replicates/omits + } + } else { + // count window + + throw new UnsupportedOperationException( + "Count sliding group windows on event-time are currently not supported.") + } + + val prepareReduceReturnType = preparedDataSet.getType + + // create GroupReduceFunction for final aggregation and conversion to output row + val aggregateReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties, + isInputCombined = false) + + // gets the window-start position in the intermediate result. + val windowStartPos = prepareReduceReturnType.getArity - 1 + + val groupingKeysAndWindowStart = groupingKeys :+ windowStartPos + + preparedDataSet + .groupBy(groupingKeysAndWindowStart: _*) + .reduceGroup(aggregateReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + } + private def prepareOperatorName: String = { val aggString = aggregationToString( inputType, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index acb6cd0ad708b..0e45444c37da3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -21,26 +21,26 @@ import java.util import org.apache.calcite.rel.`type`._ import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun._ -import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} +import org.apache.calcite.sql.{SqlAggFunction, SqlKind} +import org.apache.flink.api.common.functions.{FlatMapFunction, GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.expressions._ -import org.apache.flink.table.plan.logical._ -import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ -import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} import org.apache.flink.types.Row @@ -160,23 +160,37 @@ object AggregateUtil { groupings, aggregates, inputType, - Some(Array(Types.LONG))) + Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) val (timeFieldPos, tumbleTimeWindowSize) = window match { + case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) => + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, Some(asLong(size))) + case EventTimeTumblingGroupWindow(_, time, size) => val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - size match { - case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => - (timeFieldPos, Some(value)) - case _ => (timeFieldPos, None) - } + (timeFieldPos, None) + case EventTimeSessionGroupWindow(_, time, _) => - (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None) + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, None) + + case EventTimeSlidingGroupWindow(_, time, size, slide) + if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) => + // pre-tumble incremental aggregates on time-windows + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide)) + (timeFieldPos, Some(preTumblingSize)) + + case EventTimeSlidingGroupWindow(_, time, _, _) => + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, None) + case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } - new DataSetWindowAggregateMapFunction( + new DataSetWindowAggMapFunction( aggregates, aggFieldIndexes, groupings, @@ -185,6 +199,130 @@ object AggregateUtil { mapReturnType) } + /** + * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for + * partial aggregates of sliding windows (time and count-windows). + * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for + * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the + * window-start, and replicates or omits records for different panes of a sliding window. + * + * The output of the function contains the grouping keys, the intermediate aggregate values of + * all aggregate function and the aligned window start. Window start must not be a timestamp, + * but can also be a count value for count-windows. + * + * The output is stored in Row by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | + * v v + * +---------+---------+--------+--------+--------+--------+-------------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart | + * +---------+---------+--------+--------+--------+--------+-------------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 window start for pane mapping + * }}} + * + * NOTE: this function is only used for sliding windows with partial aggregates on batch tables. + */ + def createDataSetSlideWindowPrepareGroupReduceFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType, + isParserCaseSensitive: Boolean) + : RichGroupReduceFunction[Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + needRetraction = false)._2 + + val returnType: RowTypeInfo = createDataSetAggregateBufferDataType( + groupings, + aggregates, + inputType, + Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) + + window match { + case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + // sliding time-window + // for partial aggregations + new DataSetSlideTimeWindowAggReduceCombineFunction( + aggregates, + groupings.length, + returnType.getArity - 1, + asLong(size), + asLong(slide), + returnType) + + case _ => + throw new UnsupportedOperationException(s"$window is currently not supported on batch.") + } + } + + /** + * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for + * non-incremental aggregates of sliding windows (time-windows). + * + * It requires a prepared input (with intermediate aggregate fields), aligns the + * window-start, and replicates or omits records for different panes of a sliding window. + * + * The output of the function contains the grouping keys, the intermediate aggregate values of + * all aggregate function and the aligned window start. + * + * The output is stored in Row by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | + * v v + * +---------+---------+--------+--------+--------+--------+-------------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart | + * +---------+---------+--------+--------+--------+--------+-------------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 window start for pane mapping + * }}} + * + * NOTE: this function is only used for time-based sliding windows on batch tables. + */ + def createDataSetSlideWindowPrepareFlatMapFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType, + isParserCaseSensitive: Boolean) + : FlatMapFunction[Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + needRetraction = false)._2 + + val mapReturnType: RowTypeInfo = createDataSetAggregateBufferDataType( + groupings, + aggregates, + inputType, + Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) + + window match { + case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + new DataSetSlideTimeWindowAggFlatMapFunction( + aggregates.length, + groupings.length, + mapReturnType.getArity - 1, + asLong(size), + asLong(slide), + mapReturnType) + + case _ => + throw new UnsupportedOperationException( + s"$window is currently not supported in a batch environment.") + } + } + /** * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute window * aggregates on batch tables. If all aggregates support partial aggregation and is a time @@ -203,10 +341,10 @@ object AggregateUtil { isInputCombined: Boolean = false) : RichGroupReduceFunction[Row, Row] = { - val aggregates = transformToAggregateFunctions( + val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), inputType, - needRetraction = false)._2 + needRetraction = false) // the mapping relation between field index of intermediate aggregate Row and output Row. val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) @@ -259,7 +397,7 @@ object AggregateUtil { case EventTimeSessionGroupWindow(_, _, gap) => val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new DataSetSessionWindowAggregateReduceGroupFunction( + new DataSetSessionWindowAggReduceGroupFunction( aggregates, groupingOffsetMapping, aggOffsetMapping, @@ -268,6 +406,42 @@ object AggregateUtil { endPos, asLong(gap), isInputCombined) + + case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + if (doAllSupportPartialMerge(aggregates)) { + // for partial aggregations + new DataSetSlideWindowAggReduceCombineFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + startPos, + endPos, + asLong(size)) + } + else { + // for non-partial aggregations + new DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + startPos, + endPos, + asLong(size)) + } + + case EventTimeSlidingGroupWindow(_, _, size, _) => + new DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + None, + None, + asLong(size)) + case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } @@ -355,6 +529,7 @@ object AggregateUtil { needRetraction = false)._2 window match { + case EventTimeSessionGroupWindow(_, _, gap) => val combineReturnType: RowTypeInfo = createDataSetAggregateBufferDataType( @@ -368,6 +543,7 @@ object AggregateUtil { groupings, asLong(gap), combineReturnType) + case _ => throw new UnsupportedOperationException( s" [ ${window.getClass.getCanonicalName.split("\\.").last} ] is currently not " + @@ -662,7 +838,8 @@ object AggregateUtil { } val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName aggregateCall.getAggregation match { - case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => { + + case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => if (needRetraction) { aggregates(index) = sqlTypeName match { case TINYINT => @@ -702,8 +879,8 @@ object AggregateUtil { throw new TableException("Sum aggregate does no support type:" + sqlType) } } - } - case _: SqlAvgAggFunction => { + + case _: SqlAvgAggFunction => aggregates(index) = sqlTypeName match { case TINYINT => new ByteAvgAggFunction @@ -722,8 +899,8 @@ object AggregateUtil { case sqlType: SqlTypeName => throw new TableException("Avg aggregate does no support type:" + sqlType) } - } - case sqlMinMaxFunction: SqlMinMaxAggFunction => { + + case sqlMinMaxFunction: SqlMinMaxAggFunction => aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { if (needRetraction) { sqlTypeName match { @@ -815,9 +992,10 @@ object AggregateUtil { } } } - } + case _: SqlCountAggFunction => aggregates(index) = new CountAggFunction + case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } @@ -833,7 +1011,7 @@ object AggregateUtil { val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg => - val accType = agg.getAccumulatorType() + val accType = agg.getAccumulatorType if (accType != null) { accType } else { @@ -969,10 +1147,22 @@ object AggregateUtil { } } - private def asLong(expr: Expression): Long = expr match { + private[flink] def asLong(expr: Expression): Long = expr match { case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value case _ => throw new IllegalArgumentException() } + + private[flink] def determineLargestTumblingSize(size: Long, slide: Long): Long = { + if (slide > size) { + gcd(slide, size) + } else { + gcd(size, slide) + } + } + + private def gcd(a: Long, b: Long): Long = { + if (b == 0) a else gcd(b, a % b) + } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala similarity index 99% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index ebef211efb362..1f1968719897c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -50,7 +50,7 @@ import org.apache.flink.util.{Collector, Preconditions} * @param finalRowWindowEndPos The relative window-end field position. * @param gap Session time window gap. */ -class DataSetSessionWindowAggregateReduceGroupFunction( +class DataSetSessionWindowAggReduceGroupFunction( aggregates: Array[AggregateFunction[_ <: Any]], groupKeysMapping: Array[(Int, Int)], aggregateMapping: Array[(Int, Int)], diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala new file mode 100644 index 0000000000000..07e0d9f76a54c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + + +/** + * It is used for sliding windows on batch for time-windows. It takes a prepared input row, + * aligns the window start, and replicates or omits records for different panes of a sliding + * window. It is used for non-partial aggregations. + * + * @param aggregatesLength number of aggregate functions + * @param groupingKeysLength number of grouping keys + * @param windowSize window size of the sliding window + * @param windowSlide window slide of the sliding window + * @param returnType return type of this function + */ +class DataSetSlideTimeWindowAggFlatMapFunction( + private val aggregatesLength: Int, + private val groupingKeysLength: Int, + private val timeFieldPos: Int, + private val windowSize: Long, + private val windowSlide: Long, + @transient private val returnType: TypeInformation[Row]) + extends RichFlatMapFunction[Row, Row] + with ResultTypeQueryable[Row] { + + Preconditions.checkNotNull(aggregatesLength) + + private var intermediateRow: Row = _ + // add one field to store window start + private val intermediateRowArity: Int = groupingKeysLength + aggregatesLength + 1 + + override def open(config: Configuration) { + intermediateRow = new Row(intermediateRowArity) + } + + override def flatMap(record: Row, out: Collector[Row]): Unit = { + val windowStart = record.getField(timeFieldPos).asInstanceOf[Long] + + // adopted from SlidingEventTimeWindows.assignWindows + var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide) + + // adopted from SlidingEventTimeWindows.assignWindows + while (start > windowStart - windowSize) { + record.setField(timeFieldPos, start) + out.collect(record) + start -= windowSlide + } + } + + override def getProducedType: TypeInformation[Row] = { + returnType + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala new file mode 100644 index 0000000000000..65f0b9bcd9986 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.CombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]. + * + * It is used for sliding on batch for time-windows. + * + * @param aggregates aggregate functions + * @param groupingKeysLength number of grouping keys + * @param timeFieldPos position of aligned time field + * @param windowSize window size of the sliding window + * @param windowSlide window slide of the sliding window + * @param returnType return type of this function + */ +class DataSetSlideTimeWindowAggReduceCombineFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupingKeysLength: Int, + timeFieldPos: Int, + windowSize: Long, + windowSlide: Long, + returnType: TypeInformation[Row]) + extends DataSetSlideTimeWindowAggReduceGroupFunction( + aggregates, + groupingKeysLength, + timeFieldPos, + windowSize, + windowSlide, + returnType) + with CombineFunction[Row, Row] { + + override def combine(records: Iterable[Row]): Row = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + + // set group keys + i = 0 + while (i < groupingKeysLength) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set accumulators + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0)) + i += 1 + } + + intermediateRow.setField(timeFieldPos, record.getField(timeFieldPos)) + + return intermediateRow + } + } + + // this code path should never be reached as we return before the loop finishes + // we need this to prevent a compiler error + throw new IllegalArgumentException("Group is empty. This should never happen.") + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala new file mode 100644 index 0000000000000..33fd6ee5eee8e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with + * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, aligns the window start, + * and replicates or omits records for different panes of a sliding window. + * + * This function is similar to [[DataSetTumbleCountWindowAggReduceGroupFunction]], however, + * it does no final aggregate evaluation. It also includes the logic of + * [[DataSetSlideTimeWindowAggFlatMapFunction]]. + * + * @param aggregates aggregate functions + * @param groupingKeysLength number of grouping keys + * @param timeFieldPos position of aligned time field + * @param windowSize window size of the sliding window + * @param windowSlide window slide of the sliding window + * @param returnType return type of this function + */ +class DataSetSlideTimeWindowAggReduceGroupFunction( + private val aggregates: Array[AggregateFunction[_ <: Any]], + private val groupingKeysLength: Int, + private val timeFieldPos: Int, + private val windowSize: Long, + private val windowSlide: Long, + @transient private val returnType: TypeInformation[Row]) + extends RichGroupReduceFunction[Row, Row] + with ResultTypeQueryable[Row] { + + Preconditions.checkNotNull(aggregates) + + protected var intermediateRow: Row = _ + // add one field to store window start + protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1 + protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + private val intermediateWindowStartPos: Int = intermediateRowArity - 1 + + override def open(config: Configuration) { + intermediateRow = new Row(intermediateRowArity) + + // init lists with two empty accumulators + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + i += 1 + } + } + + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // trigger tumbling evaluation + if (!iterator.hasNext) { + val windowStart = record.getField(timeFieldPos).asInstanceOf[Long] + + // adopted from SlidingEventTimeWindows.assignWindows + var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide) + + // skip preparing output if it is not necessary + if (start > windowStart - windowSize) { + + // set group keys + i = 0 + while (i < groupingKeysLength) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set accumulators + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0)) + i += 1 + } + + // adopted from SlidingEventTimeWindows.assignWindows + while (start > windowStart - windowSize) { + intermediateRow.setField(intermediateWindowStartPos, start) + out.collect(intermediateRow) + start -= windowSlide + } + } + } + } + } + + override def getProducedType: TypeInformation[Row] = { + returnType + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala new file mode 100644 index 0000000000000..c11e86b53b878 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.CombineFunction +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]. + * + * It is used for sliding on batch for both time and count-windows. + * + * @param aggregates aggregate functions. + * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping index mapping between aggregate function list and aggregated value + * index in output Row. + * @param finalRowArity output row field count + * @param finalRowWindowStartPos relative window-start position to last field of output row + * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param windowSize size of the window, used to determine window-end for output row + */ +class DataSetSlideWindowAggReduceCombineFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupKeysMapping: Array[(Int, Int)], + aggregateMapping: Array[(Int, Int)], + finalRowArity: Int, + finalRowWindowStartPos: Option[Int], + finalRowWindowEndPos: Option[Int], + windowSize: Long) + extends DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupKeysMapping, + aggregateMapping, + finalRowArity, + finalRowWindowStartPos, + finalRowWindowEndPos, + windowSize) + with CombineFunction[Row, Row] { + + private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 + private val intermediateRow: Row = new Row(intermediateRowArity) + + override def combine(records: Iterable[Row]): Row = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + // set group keys + i = 0 + while (i < groupKeysMapping.length) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set the partial accumulated result + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupKeysMapping.length + i, accumulatorList(i).get(0)) + i += 1 + } + + intermediateRow.setField(windowStartPos, record.getField(windowStartPos)) + + return intermediateRow + } + } + + // this code path should never be reached as we return before the loop finishes + // we need this to prevent a compiler error + throw new IllegalArgumentException("Group is empty. This should never happen.") + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala new file mode 100644 index 0000000000000..e67fac0f7e38e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * It is used for sliding on batch for both time and count-windows. + * + * @param aggregates aggregate functions. + * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping index mapping between aggregate function list and aggregated value + * index in output Row. + * @param finalRowArity output row field count + * @param finalRowWindowStartPos relative window-start position to last field of output row + * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param windowSize size of the window, used to determine window-end for output row + */ +class DataSetSlideWindowAggReduceGroupFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupKeysMapping: Array[(Int, Int)], + aggregateMapping: Array[(Int, Int)], + finalRowArity: Int, + finalRowWindowStartPos: Option[Int], + finalRowWindowEndPos: Option[Int], + windowSize: Long) + extends RichGroupReduceFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + + private var collector: TimeWindowPropertyCollector = _ + private var output: Row = _ + private val accumulatorStartPos: Int = groupKeysMapping.length + protected val windowStartPos: Int = accumulatorStartPos + aggregates.length + + val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + + override def open(config: Configuration) { + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + + // init lists with two empty accumulators + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + i += 1 + } + } + + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + // set group keys value to final output + i = 0 + while (i < groupKeysMapping.length) { + val mapping = groupKeysMapping(i) + output.setField(mapping._1, record.getField(mapping._2)) + i += 1 + } + + // get final aggregate value and set to output. + i = 0 + while (i < aggregateMapping.length) { + val mapping = aggregateMapping(i) + val agg = aggregates(i) + val result = agg.getValue(accumulatorList(mapping._2).get(0)) + output.setField(mapping._1, result) + i += 1 + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + collector.wrappedCollector = out + collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long] + collector.windowEnd = collector.windowStart + windowSize + + collector.collect(output) + } else { + out.collect(output) + } + } + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala index 85df1d8865de6..ecc945c38faab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala @@ -47,10 +47,8 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { - private var aggregateBuffer: Row = _ private var output: Row = _ private val accumStartPos: Int = groupKeysMapping.length - private val intermediateRowArity: Int = accumStartPos + aggregates.length + 1 val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { new JArrayList[Accumulator](2) @@ -59,7 +57,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) - aggregateBuffer = new Row(intermediateRowArity) output = new Row(finalRowArity) // init lists with two empty accumulators diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 7ce0bf149e703..01d384f85a7c2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -112,12 +112,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( } // get final aggregate value and set to output. - aggregateMapping.foreach { - case (after, previous) => { + aggregateMapping.foreach { case (after, previous) => val agg = aggregates(previous) val result = agg.getValue(accumulatorList(previous).get(0)) output.setField(after, result) - } } // get window start timestamp diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala similarity index 96% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala index 68088fc298fdb..4a64c47bb240d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggregate import java.sql.Timestamp +import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable @@ -34,7 +35,7 @@ import org.apache.flink.util.Preconditions * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function * append an (aligned) rowtime field to the end of the output row. */ -class DataSetWindowAggregateMapFunction( +class DataSetWindowAggMapFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Int], private val groupingKeys: Array[Int], @@ -97,7 +98,7 @@ class DataSetWindowAggregateMapFunction( case f: Float => f.toLong case d: Double => d.toLong case s: String => s.toLong - case t: Timestamp => t.getTime + case t: Timestamp => SqlFunctions.toLong(t) case _ => throw new RuntimeException( s"Window time field doesn't support ${timeField.getClass} type currently") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala index 00aba1fcf552c..d5a028342ddd7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala @@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window]( if (iterator.hasNext) { val record = iterator.next() - out.collect(record) + var i = 0 + while (i < record.getArity) { + output.setField(i, record.getField(0)) + i += 1 + } + out.collect(output) } } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala index 818cd0ea92eb5..3e7b66b072d71 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala @@ -19,16 +19,16 @@ package org.apache.flink.table.api.scala.stream.table import org.apache.flink.api.scala._ -import org.apache.flink.types.Row -import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark -import org.apache.flink.table.api.scala.stream.utils.StreamITCase -import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test @@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } - - @Test - def testEventTimeSlidingWindow(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - - val stream = env - .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'string) - - val windowedTable = table - .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) - .groupBy('w, 'string) - .select('string, 'int.count, 'w.start, 'w.end, 'w.start) - - val results = windowedTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = Seq( - "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", - "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005", - "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01", - "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015", - "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", - "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", - "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } } -object GroupWindowITCase { +object AggregationsITCase { class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { override def checkAndGetNextWatermark( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala index d57f4f7533f8d..77ea66e9ad25e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala @@ -21,17 +21,16 @@ package org.apache.flink.table.runtime.dataset import java.math.BigDecimal import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, ValidationException} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.apache.flink.table.api.ValidationException -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import scala.collection.JavaConverters._ @@ -197,4 +196,162 @@ class DataSetWindowAggregateITCase( .toDataSet[Row] } + // ---------------------------------------------------------------------------------------------- + // Sliding windows + // ---------------------------------------------------------------------------------------------- + + @Test(expected = classOf[UnsupportedOperationException]) + def testAllEventTimeSlidingGroupWindowOverCount(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + // Count sliding group window on event-time are currently not supported + table + .window(Slide over 2.rows every 2.rows on 'long as 'w) + .groupBy('w) + .select('int.count) + .toDataSet[Row] + } + + @Test + def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { + // please keep this test in sync with the DataStream variant + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 2.milli on 'long as 'w) + .groupBy('w) + .select('int.count, 'w.start, 'w.end) + + val expected = + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" + + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" + + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" + + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" + + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" + + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" + + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" + + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005" + + val results = windowedTable.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = { + // please keep this test in sync with the DataStream variant + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 10.milli every 5.milli on 'long as 'w) + .groupBy('string, 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = + "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" + + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" + + "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" + + "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" + + "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" + + "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" + + "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" + + "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" + + "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" + + "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01" + + val results = windowedTable.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = { + // please keep this test in sync with the DataStream variant + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 4.milli on 'long as 'w) + .groupBy('string, 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + + "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" + + "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" + + "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" + + "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" + + "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + + "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005" + + val results = windowedTable.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = { + // please keep this test in sync with the DataStream variant + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 10.milli on 'long as 'w) + .groupBy('string, 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + + "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005" + + val results = windowedTable.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = { + // please keep this test in sync with the DataStream variant + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 3.milli every 10.milli on 'long as 'w) + .groupBy('string, 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003" + + val results = windowedTable.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala new file mode 100644 index 0000000000000..85a237331e2e2 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.datastream + +import java.math.BigDecimal + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.TimestampWithEqualWatermark +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { + + val data = List( + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"), + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) + + // ---------------------------------------------------------------------------------------------- + // Sliding windows + // ---------------------------------------------------------------------------------------------- + + @Test + def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { + // please keep this test in sync with the DataSet variant + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 2.milli on 'rowtime as 'w) + .groupBy('w) + .select('int.count, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013", + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017", + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019", + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021", + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003", + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011", + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007", + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = { + // please keep this test in sync with the DataSet variant + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", + "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", + "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015", + "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02", + "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025", + "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015", + "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", + "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", + "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = { + // please keep this test in sync with the DataSet variant + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 4.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", + "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013", + "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017", + "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021", + "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = { + // please keep this test in sync with the DataSet variant + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 5.milli every 10.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = { + // please keep this test in sync with the DataSet variant + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + + val windowedTable = table + .window(Slide over 3.milli every 10.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003", + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } +} + +object DataStreamAggregateITCase { + class TimestampWithEqualWatermark + extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] { + + override def checkAndGetNextWatermark( + lastElement: (Long, Int, Double, Float, BigDecimal, String), + extractedTimestamp: Long) + : Watermark = { + new Watermark(extractedTimestamp) + } + + override def extractTimestamp( + element: (Long, Int, Double, Float, BigDecimal, String), + previousElementTimestamp: Long): Long = { + element._1 + } + } +}